Code Monkey home page Code Monkey logo

Comments (5)

natebosch avatar natebosch commented on June 12, 2024

Nothing like this exists in this package or any Dart team authored package today.

I could imagine an implementation of a special stream type, and potentially an extension on Stream to return one. Each new listener on a this stream would receive the most recently emitted event when it starts listening.

I do worry a little about having a class which implements Stream and has this different behavior, but I also don't think it would make sense to be a standalone type. @lrhn - what are your thoughts?

It might be as simple as

import 'dart:async';

class _ReplayLatestStream<T> extends StreamView<T> {
  T _latest;

  _ReplayLatestStream(this._latest, Stream<T> stream) : super(stream) {
    if (!stream.isBroadcast) {
      throw ArgumentError.value(stream, "stream", "Must be a broadcast stream");
    }
    stream.listen((e) {
      _latest = e;
    });
  }

  @override
  StreamSubscription<T> listen(void Function(T) onData,
      {Function onError, void Function() onDone, bool cancelOnError}) {
    Future(() {
      onData(_latest);
    });
    return super.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}

extension ReplayLatest<T> on Stream<T> {
  Stream<T> replayLatest(T startWith) =>
      _ReplayLatestStream(startWith, isBroadcast ? this : asBroadcastStream());
}

from async.

lrhn avatar lrhn commented on June 12, 2024

That approach might cause another event to be sent before onData is called, and it will fail if onData is null, and not be updated if onData is changed before the event is delivered. I fairly often find myself doing:

var sub = stream.listen(null);
sub.onData((v) { .... });

It's implementable, but not this easily, sadly.

I do have a plan for a stream implementation where each listen causes a callback with a new controller, so all listeners can be managed individually.

With such a multi-listener stream, you should be able to implement the requested behavior easily:

Stream<T> repeatLast<T extends Object>(Stream<T> source) {
  var listeners = <StreamControlller>{};
  T? last = null;
  bool done = false;
  source.listen((event) {
     last = event;
     for (var listener in [...listeners]) listener.add(event);
   }, onError: (Object e, StackTrace s) {
     for (var listener in [...listeners]) listener.addError(e, s);
   }, onDone: () {
      done = true;
      last = null;
      for (var listener in listeners) listener.close();
   });
  return Stream.multi((StreamController c)  {
    if (done) {
      c.close();
    } else {
      var lastEvent = last;
      if (lastEvent != null) c.add(lastEvent);
      listeners.add(c);
    }
    c.onCancel = () {
      listeners.remove(c);
    };
  });
}

I think I'll re-prioritize that implementation.

from async.

nikhilro avatar nikhilro commented on June 12, 2024

Thank you so much for answering @natebosch and @lrhn!

There are 3 broadcast streams A, B and C at the top level. They are initialized at the start of the app connecting to 3 different Firestore collections for the User.

Since we didn't want to cook up a hacky solution, for now, we are just creating these top-level streams anew for every listener which is expensive and unnecessary (but still pretty clean). If you have a better temporary solution, I would love to use that. Other than that, I look forward to using your solution @lrhn 🙏 . Would you happen to have an ETA?

from async.

lrhn avatar lrhn commented on June 12, 2024

The Stream.multi constructor has now landed, so you can do something like:

// Copyright 2020 Google LLC.
// SPDX-License-Identifier: BSD-3-Clause
Stream<T> repeatLast<T>(Stream<T> source) {
  List<MultiStreamController<T>> controllers = [];
  void forEachController(void action(MultiStreamController)) {
    var i = 0, j = 0;
    while (i < controllers.length) {
      var controller = controllers[i++];
      if (controller.hasListener) {
        action(controller);
        if (controller.hasListener) {
          controllers[j++] = controller;
        }
      }
    }
    if (j < controllers.length) controllers.length = j;
  }

  bool done = false; // Set to true when [source] closes.
  /*late*/ T mostRecent; // Most recent event value.
  bool hasMostRecent = false; // Whether mostRecent is available.

  StreamSubscription<T> /*?*/ sub;
  void ensureSubscribed() {
    sub ??= source.listen((value) {
      mostRecent = value;
      hasMostRecent = true;
      forEachController((c) {
        c.addSync(value);
      });
    }, onError: (e, s) {
      forEachController((c) {
        c.addErrorSync(e, s);
      });
    }, onDone: () {
      done = true;
      forEachController((c) {
        c.closeSync();
      });
    });
  }

  // Subscribe early if it's a broadcast stream.
  if (source.isBroadcast) ensureSubscribed();
  return Stream<T>.multi((controller) {
    if (hasMostRecent) controller.add(mostRecent /*as T*/);
    if (done) {
      controller.close();
      return;
    }
    controllers.add(controller);
    ensureSubscribed();
  }, isBroadcast: source.isBroadcast);
}

from async.

nikhilro avatar nikhilro commented on June 12, 2024

Thanks! found a workaround for the project that needed this. will keep in mind for the future. gonna close this issue and re-open if I run into any issues with your suggested approach in the future :)

from async.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.