First crack at a Streams proposal

Jake Verbaten raynos2 at
Mon Apr 15 16:24:01 PDT 2013

Would it help to split the sugar and combinator from the actual stream

callback StreamInit = void (StreamResolver resolver);
callback AnyCallback = any (optional any value);

[Constructor(StreamInit init)]
interface Stream {
  Stream listen(optional AnyCallback? listenCB = null, optional
AnyCallback? completeCB = null, optional AnyCallback? rejectCB =

interface StreamResolver {
  void push(optional any value);
  void complete(optional any value);
  void reject(optional any value);

One basic thing missing from this API is the ability to stop listening to a

Other things missing are the ability to abort / cancel / close a Stream,
the ability to pause or resume a stream.

It should also be made clear how and when a stream may emit values. Whether
it is at any arbitrary time and it will send to whomever is listening at
that moment. Whether it is only allowed to emit values after a listen call.
Does every call to listen get the entire history independently of other
calls? (Doing so would buffer all data and defeat the point of a stream).
What happens when you push a value into the resolved and no-one is

Another decision that needs to be made is whether it makes sense for a
stream to emit multiple errors? For a Future it doesn't make sense because
it can only be fulfilled to a single value. For a stream it may make sense
for multiple errors to occur.

I personally like the `listen(onChunk, onEnd)` syntax as it matches a
popular stream baseclass from the node community ( ) in terms of simplicity.

On Mon, Apr 15, 2013 at 1:31 PM, Tab Atkins Jr. <jackalmage at>wrote:

> Inspired by DOM adding Futures <>,
> I've begun reworking several APIs in terms of them.  While doing so,
> I've found several spots where Futures aren't appropriate, because I
> need something that can be "updated" multiple times, rather than just
> representing a single result like Futures do.  I believe formalizing
> this notion will be as valuable as Futures, so I began by sketching
> out an API on my blog: <>.
> I'd like to begin discussion of this API with es-discuss, because this
> feature is much larger than just DOM, and you people have a lot of
> relevant experience.
> Here's a reproduction of the current state of my proposal (I'll update
> my blog as things progress if necessary):
> ~~~~
> callback StreamInit = void (StreamResolver resolver);
> callback AnyCallback = any (optional any value);
> typedef (Stream or Future or Iterable) StreamLike;
> [Constructor(StreamInit init)]
> interface Stream {
>   Stream listen(optional AnyCallback? listenCB = null, optional
> AnyCallback? completeCB = null, optional AnyCallback? rejectCB =
> null);
>   Future complete(optional AnyCallback? completeCB = null);
>   Future catch(optional AnyCallback? rejectCB = null);
>   Future next(optional any filter = null)
>   static from(StreamLike value, optional transformerCB, optional thisArg);
>   static of(any... values);
>   static reject(optional any reason);
>   Stream filter(filterCB or primitive);
>   Stream map(mapCB);
>   void   forEach(mapCB);
>   Stream then(optional thenCB);
>   Stream switch(optional switchCB);
>   Stream throttle(milliseconds);
> }
> interface StreamResolver {
>   void push(optional any value);
>   void complete(optional any value);
>   void continueWith(StreamLike continuation);
>   void reject(optional any value);
> };
> ~~~~
> The overall API was designed to be similar to DOM Futures, where the
> constructor returns the stream and passes the resolver object to a
> callback argument. In several aspects it was also designed to be
> similar to the Array API, because streams can usefully be treated as a
> push-based ordered collection.
> The general form of a stream is zero or more updates, optionally
> followed by a single complete or reject event.  Streams might not ever
> complete, and they may complete/reject without pushing any updates.
> The resolver has a simple API.  'push' is used to put values into the
> stream.  complete/reject fullfill the stream with a completion or
> rejection signal; these both kill the resolver so that none of the
> methods have any effect (maybe they throw?).  'continueWith' also
> kills the resolver, but doesn't fulfill the stream, instead delegating
> the stream's future progress to the continuation stream, so the stream
> pushes updates when the continuation does, and fulfills when the
> continuation does.
> (I'm currently unsure whether completion should be just like an
> update, or not.  That is, if I know that I'm about to push the very
> last value, should I do a push() then a complete(), or just a
> complete() with the value?  This affects several APIs in the
> completion case.)
> The Stream's own API is also simple, and very similar to that of
> Future.  The basic API is the 'listen' function, which takes any of
> three callbacks, which are called when updates are pushed, the stream
> is completed, or the stream is rejected.  It returns the same Stream,
> for chaining.
> For convenience, Streams also have 'complete' and 'reject' functions
> which take just that particular callback.  They return a Future which
> resolves when the stream is fulfilled - the 'complete' future accepts
> when the stream completes and rejects when the stream rejects, while
> the 'reject' future does the opposite.
> The 'next' function returns a future for the next stream value.  If
> the stream updates or completes, the future accepts with the same
> value; if it rejects, it rejects with the same value.  (Alternately,
> should it reject if the stream completes?)  Optionally, it can take a
> filter argument, which is either a JS primitive or a filter callback.
> If a primitive, it only completes for the next update that is === to
> the primitive; if a callback, it completes for the next update that
> the filter function returns true for.
> Streams also have three convenience constructors: Stream.of() takes
> zero or more values, and produces a stream that pushes those values in
> order then completes (if passed a single value, this is the monadic
> lift operator); Stream.reject() takes a rejection reason and produces
> a rejected stream; Stream.from() takes a stream-like and turns it into
> a stream.  Just like Array.from, it takes an optional converter
> callback.
> (A stream is a stream-like, in which case Stream.from just creates a
> fresh stream that pushes the same values.  A future is a stream-like,
> which either pushes a single value and immediately completes, or
> pushes no values and rejects, whenever the future accepts/rejects.  An
> iterable is a stream-like, which immediately pushes all the values
> from the iterable into the stream.)
> Finally, I have a start on a stream combinator library.
> filter, map, and forEach do the same thing as they do on Array, just
> push-based rather than pull-based.  ('filter' takes the same
> "primitive or callback" parameter that 'next' does.)
> 'then' is the monadic operator - it takes a stream of stream-likes,
> and produces a new stream which interleaves their updates.  The
> optional callback is a transformer callback; "strm.then(cb)" is
> identical to "".
> 'switch' is another combiner - it also takes a stream of stream-likes
> and "interleaves" them, but stops listening to the updates from
> "earlier" streams as soon as "later" streams push their first update.
> A little hard to explain, but an example suffices - say you have a
> stream of text input values that's updated as the user types.  Using a
> future-based XHR API, you send each value to your server for
> autocomplete suggestions.  These futures may complete in any order,
> and once one completes, you no longer care about any pending futures
> from earlier values (and in fact, listening to them will screw up your
> UI, with suggestions showing up from old values).  This is easily
> achieved with switch:
> "watchTextInput(input).switch(t=>getJSON(url,t)).forEach(updateUI)".
> 'throttle' is a filterer, which can either be idle or throttling.
> When idle, if it receives an update from its input stream, it enters
> throttling mode for the specified duration.  While throttling, it
> remembers the value of the most recent update from its input stream,
> but doesn't update its output stream.  At the end of the throttle
> period, it pushes the most recent update.  This thins out a stream
> that may receive updates very quickly, so you don't end up doing
> expensive processing on lots of values that are already obsolete.
> So!  This was a long post, but hopefully it explains my proposal well.
>  As far as I can tell, this Stream API is close to C#'s IObservable
> interface, which seemed powerful, easy-to-use, and very general when I
> was reading up on things.  It also stays close to the existing Futures
> API, which it's intended to be a cousin of.  It's also properly
> monadic, using the of/then pair used by Futures and proposed by Brian
> McKenna at <>.
> This API is *not* designed to *directly* handle IO streams, like
> Node's ReadableStream class
> <>.
> However, such a thing should be implementable using streams, either as
> a subclass or something containing a stream.
> Thoughts? Suggestions?  Reasons I'm doing everything wrong?
> ~TJ
> _______________________________________________
> es-discuss mailing list
> es-discuss at
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <>

More information about the es-discuss mailing list