First crack at a Streams proposal

Kevin Gadd kevin.gadd at
Mon Apr 15 17:06:12 PDT 2013

If this is really about multiple-consumer streams, the semantics of this
proposed API are incredibly murky to me. What happens if each consumer
calls next()? Do they all get the same value out of their Future when it's
completed? Do they each randomly get one of the values pushed into the
stream? Is the stream implicitly required to buffer data in order to be
able to offer it at a slower rate to one consumer than it is offered to the
other consumers? Does each consumer have a different 'view' of the state of
the stream (i.e. its cancelled/ended state, when those concepts apply)?

On Mon, Apr 15, 2013 at 5:03 PM, Jake Verbaten <raynos2 at> wrote:

> > I think all of these are only suited for single-consumer streams.  My
> original proposal is explicitly for multi-consumer streams, as I
> designed it explicitly for DOM use-cases that I ran into.
> Most streams I've used have been single consumer streams. Whether I write
> FRP style programs with Signals or whether I write IO programs with streams
> the multi consumer use case comes up less.
> A far more regular case I have is a single "sink" consuming from multiple
> inputs in parallel due to some kind of merge(manyStreams) => stream
> function.
> The most common use-case for multiple consumers is a second consumer for
> printing / inspection purposes, but that is trivial to model as a map
> function that logs the value and returns it.
> You may want to ask other's who have used Stream like abstractions how
> common the multiple consumer case is.
> On Mon, Apr 15, 2013 at 4:35 PM, Tab Atkins Jr. <jackalmage at>wrote:
>> On Mon, Apr 15, 2013 at 4:24 PM, Jake Verbaten <raynos2 at> wrote:
>> > Would it help to split the sugar and combinator from the actual stream
>> > interface?
>> >
>> > ```
>> > callback StreamInit = void (StreamResolver resolver);
>> > callback AnyCallback = any (optional any value);
>> >
>> > [Constructor(StreamInit init)]
>> > interface Stream {
>> >   Stream listen(optional AnyCallback? listenCB = null, optional
>> > AnyCallback? completeCB = null, optional AnyCallback? rejectCB =
>> > null);
>> > }
>> >
>> > interface StreamResolver {
>> >   void push(optional any value);
>> >   void complete(optional any value);
>> >   void reject(optional any value);
>> > };
>> Thanks, I should have done that originally!
>> > One basic thing missing from this API is the ability to stop listening
>> to a
>> > stream.
>> Hm, yes, you need to be able to unlisten.  I'd forgotten about that.
>> > Other things missing are the ability to abort / cancel / close a
>> Stream, the
>> > ability to pause or resume a stream.
>> I think all of these are only suited for single-consumer streams.  My
>> original proposal is explicitly for multi-consumer streams, as I
>> designed it explicitly for DOM use-cases that I ran into.
>> If you have a single-consumer stream, though, I agree that you should
>> be able to pause and cancel a stream.  Same with Futures, for that
>> matter.
>> > It should also be made clear how and when a stream may emit values.
>> Whether
>> > it is at any arbitrary time and it will send to whomever is listening at
>> > that moment. Whether it is only allowed to emit values after a listen
>> call.
>> > Does every call to listen get the entire history independently of other
>> > calls? (Doing so would buffer all data and defeat the point of a
>> stream).
>> > What happens when you push a value into the resolved and no-one is
>> > listening?
>> By virtue of not defining this, I implicitly answered your questions.
>> ^_^  It's a purely async data source, and cares not for whether you're
>> listening.  (Again, the use-cases I was trying to solve are
>> DOM-oriented.)  If you're not listening when one comes through, too
>> bad.  Of course, recall that updates are processed async, so you can't
>> miss any updates within any particular tick.
>> > Another decision that needs to be made is whether it makes sense for a
>> > stream to emit multiple errors? For a Future it doesn't make sense
>> because
>> > it can only be fulfilled to a single value. For a stream it may make
>> sense
>> > for multiple errors to occur.
>> Good question.  My design assumes that once something errors, it's
>> because it's now invalid, and won't be producing any more.  If you can
>> emit errors and still keep updating, they probably shouldn't reject
>> the stream, but just be a special update value.
>> > I personally like the `listen(onChunk, onEnd)` syntax as it matches a
>> > popular stream baseclass from the node community (
>> > ) in terms of
>> simplicity.
>> That's my syntax, except that I allow you to distinguish between the
>> stream ending "normally" and with errors.  If you ignore the reject
>> callback, my listen() is *exactly* that syntax already.
>> ~TJ
> _______________________________________________
> es-discuss mailing list
> es-discuss at

-------------- next part --------------
An HTML attachment was scrubbed...
URL: <>

More information about the es-discuss mailing list