Comments (5)
Nothing like this exists in this package or any Dart team authored package today.
I could imagine an implementation of a special stream type, and potentially an extension on Stream
to return one. Each new listener on a this stream would receive the most recently emitted event when it starts listening.
I do worry a little about having a class which implements Stream
and has this different behavior, but I also don't think it would make sense to be a standalone type. @lrhn - what are your thoughts?
It might be as simple as
import 'dart:async';
class _ReplayLatestStream<T> extends StreamView<T> {
T _latest;
_ReplayLatestStream(this._latest, Stream<T> stream) : super(stream) {
if (!stream.isBroadcast) {
throw ArgumentError.value(stream, "stream", "Must be a broadcast stream");
}
stream.listen((e) {
_latest = e;
});
}
@override
StreamSubscription<T> listen(void Function(T) onData,
{Function onError, void Function() onDone, bool cancelOnError}) {
Future(() {
onData(_latest);
});
return super.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}
extension ReplayLatest<T> on Stream<T> {
Stream<T> replayLatest(T startWith) =>
_ReplayLatestStream(startWith, isBroadcast ? this : asBroadcastStream());
}
from async.
That approach might cause another event to be sent before onData
is called, and it will fail if onData
is null
, and not be updated if onData
is changed before the event is delivered. I fairly often find myself doing:
var sub = stream.listen(null);
sub.onData((v) { .... });
It's implementable, but not this easily, sadly.
I do have a plan for a stream implementation where each listen causes a callback with a new controller, so all listeners can be managed individually.
With such a multi-listener stream, you should be able to implement the requested behavior easily:
Stream<T> repeatLast<T extends Object>(Stream<T> source) {
var listeners = <StreamControlller>{};
T? last = null;
bool done = false;
source.listen((event) {
last = event;
for (var listener in [...listeners]) listener.add(event);
}, onError: (Object e, StackTrace s) {
for (var listener in [...listeners]) listener.addError(e, s);
}, onDone: () {
done = true;
last = null;
for (var listener in listeners) listener.close();
});
return Stream.multi((StreamController c) {
if (done) {
c.close();
} else {
var lastEvent = last;
if (lastEvent != null) c.add(lastEvent);
listeners.add(c);
}
c.onCancel = () {
listeners.remove(c);
};
});
}
I think I'll re-prioritize that implementation.
from async.
Thank you so much for answering @natebosch and @lrhn!
There are 3 broadcast streams A, B and C at the top level. They are initialized at the start of the app connecting to 3 different Firestore collections for the User.
Since we didn't want to cook up a hacky solution, for now, we are just creating these top-level streams anew for every listener which is expensive and unnecessary (but still pretty clean). If you have a better temporary solution, I would love to use that. Other than that, I look forward to using your solution @lrhn 🙏 . Would you happen to have an ETA?
from async.
The Stream.multi
constructor has now landed, so you can do something like:
// Copyright 2020 Google LLC.
// SPDX-License-Identifier: BSD-3-Clause
Stream<T> repeatLast<T>(Stream<T> source) {
List<MultiStreamController<T>> controllers = [];
void forEachController(void action(MultiStreamController)) {
var i = 0, j = 0;
while (i < controllers.length) {
var controller = controllers[i++];
if (controller.hasListener) {
action(controller);
if (controller.hasListener) {
controllers[j++] = controller;
}
}
}
if (j < controllers.length) controllers.length = j;
}
bool done = false; // Set to true when [source] closes.
/*late*/ T mostRecent; // Most recent event value.
bool hasMostRecent = false; // Whether mostRecent is available.
StreamSubscription<T> /*?*/ sub;
void ensureSubscribed() {
sub ??= source.listen((value) {
mostRecent = value;
hasMostRecent = true;
forEachController((c) {
c.addSync(value);
});
}, onError: (e, s) {
forEachController((c) {
c.addErrorSync(e, s);
});
}, onDone: () {
done = true;
forEachController((c) {
c.closeSync();
});
});
}
// Subscribe early if it's a broadcast stream.
if (source.isBroadcast) ensureSubscribed();
return Stream<T>.multi((controller) {
if (hasMostRecent) controller.add(mostRecent /*as T*/);
if (done) {
controller.close();
return;
}
controllers.add(controller);
ensureSubscribed();
}, isBroadcast: source.isBroadcast);
}
from async.
Thanks! found a workaround for the project that needed this. will keep in mind for the future. gonna close this issue and re-open if I run into any issues with your suggested approach in the future :)
from async.
Related Issues (20)
- Change the default of `propagateCancel` argument in CancelableOperation.then HOT 2
- Reset method for AsyncMemoizer HOT 1
- Make it easier to safely hold a reference that can cancel an operation without holding the whole operation HOT 1
- Clarify `StreamQueue.next` will fail just after `hasNext` in API document.
- Consider to make second invocation of `streamQueue.hasNext` be postponed concluding the result until the first invocation of `q.next` , unless the stream is closed. HOT 6
- Deprecate StreamQueue.hasNext and StreamQueue.next
- Future.wait() but with Records HOT 4
- Bad State error while trying to reject a StreamQueueTransaction
- Dart 3 incompatibilty: `DelegatingStream<T> extends StreamView` but `StreamView` is `base class` HOT 5
- Add `whereNotNull` for `Stream`
- CancelableOperation value is not propagating errors, so they cannot be catched and app is crashing HOT 3
- There should be cancellable versions of Stream.firstWhere etc.
- Migrate the Result type to sealed classes HOT 2
- Make `ParallelWaitError` Include Error Details HOT 1
- Async Cache is caching exceptions HOT 5
- AsyncMemoizer is caching exceptions HOT 2
- Add an API wrapping runZonedGuarded that surfaces the first error HOT 4
- Clarify `CancelableOperation` docs HOT 4
- Inconsistent behavior of `Stream.listen` on broadcast streams HOT 1
- [Proposal] Add a CountDownLatch implementation
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from async.