First crack at a Streams proposal

Tab Atkins Jr. jackalmage at gmail.com
Mon Apr 15 13:31:11 PDT 2013


Inspired by DOM adding Futures <http://dom.spec.whatwg.org/#futures>,
I've begun reworking several APIs in terms of them.  While doing so,
I've found several spots where Futures aren't appropriate, because I
need something that can be "updated" multiple times, rather than just
representing a single result like Futures do.  I believe formalizing
this notion will be as valuable as Futures, so I began by sketching
out an API on my blog: <http://www.xanthir.com/b4PV0>.

I'd like to begin discussion of this API with es-discuss, because this
feature is much larger than just DOM, and you people have a lot of
relevant experience.

Here's a reproduction of the current state of my proposal (I'll update
my blog as things progress if necessary):

~~~~
callback StreamInit = void (StreamResolver resolver);
callback AnyCallback = any (optional any value);
typedef (Stream or Future or Iterable) StreamLike;

[Constructor(StreamInit init)]
interface Stream {
  Stream listen(optional AnyCallback? listenCB = null, optional
AnyCallback? completeCB = null, optional AnyCallback? rejectCB =
null);
  Future complete(optional AnyCallback? completeCB = null);
  Future catch(optional AnyCallback? rejectCB = null);
  Future next(optional any filter = null)

  static from(StreamLike value, optional transformerCB, optional thisArg);
  static of(any... values);
  static reject(optional any reason);

  Stream filter(filterCB or primitive);
  Stream map(mapCB);
  void   forEach(mapCB);
  Stream then(optional thenCB);
  Stream switch(optional switchCB);
  Stream throttle(milliseconds);

}

interface StreamResolver {
  void push(optional any value);
  void complete(optional any value);
  void continueWith(StreamLike continuation);
  void reject(optional any value);
};
~~~~

The overall API was designed to be similar to DOM Futures, where the
constructor returns the stream and passes the resolver object to a
callback argument. In several aspects it was also designed to be
similar to the Array API, because streams can usefully be treated as a
push-based ordered collection.

The general form of a stream is zero or more updates, optionally
followed by a single complete or reject event.  Streams might not ever
complete, and they may complete/reject without pushing any updates.

The resolver has a simple API.  'push' is used to put values into the
stream.  complete/reject fullfill the stream with a completion or
rejection signal; these both kill the resolver so that none of the
methods have any effect (maybe they throw?).  'continueWith' also
kills the resolver, but doesn't fulfill the stream, instead delegating
the stream's future progress to the continuation stream, so the stream
pushes updates when the continuation does, and fulfills when the
continuation does.

(I'm currently unsure whether completion should be just like an
update, or not.  That is, if I know that I'm about to push the very
last value, should I do a push() then a complete(), or just a
complete() with the value?  This affects several APIs in the
completion case.)

The Stream's own API is also simple, and very similar to that of
Future.  The basic API is the 'listen' function, which takes any of
three callbacks, which are called when updates are pushed, the stream
is completed, or the stream is rejected.  It returns the same Stream,
for chaining.

For convenience, Streams also have 'complete' and 'reject' functions
which take just that particular callback.  They return a Future which
resolves when the stream is fulfilled - the 'complete' future accepts
when the stream completes and rejects when the stream rejects, while
the 'reject' future does the opposite.

The 'next' function returns a future for the next stream value.  If
the stream updates or completes, the future accepts with the same
value; if it rejects, it rejects with the same value.  (Alternately,
should it reject if the stream completes?)  Optionally, it can take a
filter argument, which is either a JS primitive or a filter callback.
If a primitive, it only completes for the next update that is === to
the primitive; if a callback, it completes for the next update that
the filter function returns true for.

Streams also have three convenience constructors: Stream.of() takes
zero or more values, and produces a stream that pushes those values in
order then completes (if passed a single value, this is the monadic
lift operator); Stream.reject() takes a rejection reason and produces
a rejected stream; Stream.from() takes a stream-like and turns it into
a stream.  Just like Array.from, it takes an optional converter
callback.

(A stream is a stream-like, in which case Stream.from just creates a
fresh stream that pushes the same values.  A future is a stream-like,
which either pushes a single value and immediately completes, or
pushes no values and rejects, whenever the future accepts/rejects.  An
iterable is a stream-like, which immediately pushes all the values
from the iterable into the stream.)


Finally, I have a start on a stream combinator library.

filter, map, and forEach do the same thing as they do on Array, just
push-based rather than pull-based.  ('filter' takes the same
"primitive or callback" parameter that 'next' does.)

'then' is the monadic operator - it takes a stream of stream-likes,
and produces a new stream which interleaves their updates.  The
optional callback is a transformer callback; "strm.then(cb)" is
identical to "strm.map(cb).then()".

'switch' is another combiner - it also takes a stream of stream-likes
and "interleaves" them, but stops listening to the updates from
"earlier" streams as soon as "later" streams push their first update.
A little hard to explain, but an example suffices - say you have a
stream of text input values that's updated as the user types.  Using a
future-based XHR API, you send each value to your server for
autocomplete suggestions.  These futures may complete in any order,
and once one completes, you no longer care about any pending futures
from earlier values (and in fact, listening to them will screw up your
UI, with suggestions showing up from old values).  This is easily
achieved with switch:
"watchTextInput(input).switch(t=>getJSON(url,t)).forEach(updateUI)".

'throttle' is a filterer, which can either be idle or throttling.
When idle, if it receives an update from its input stream, it enters
throttling mode for the specified duration.  While throttling, it
remembers the value of the most recent update from its input stream,
but doesn't update its output stream.  At the end of the throttle
period, it pushes the most recent update.  This thins out a stream
that may receive updates very quickly, so you don't end up doing
expensive processing on lots of values that are already obsolete.




So!  This was a long post, but hopefully it explains my proposal well.
 As far as I can tell, this Stream API is close to C#'s IObservable
interface, which seemed powerful, easy-to-use, and very general when I
was reading up on things.  It also stays close to the existing Futures
API, which it's intended to be a cousin of.  It's also properly
monadic, using the of/then pair used by Futures and proposed by Brian
McKenna at <http://brianmckenna.org/blog/category_theory_promisesaplus>.

This API is *not* designed to *directly* handle IO streams, like
Node's ReadableStream class
<http://nodejs.org/api/stream.html#stream_class_stream_readable>.
However, such a thing should be implementable using streams, either as
a subclass or something containing a stream.

Thoughts? Suggestions?  Reasons I'm doing everything wrong?

~TJ


More information about the es-discuss mailing list