Code Monkey home page Code Monkey logo

Comments (8)

mcatanzariti avatar mcatanzariti commented on August 27, 2024 1

@richardhenry You're right, it is worth thinking about splitting the pub sub stream.

from rustis.

mcatanzariti avatar mcatanzariti commented on August 27, 2024

Hi @richardhenry,

The design behind PubSubStream is that it is created on a first subscription because under the hood, Redis switches an existing connection to pub sub mode only when a first subscription is received by the server.
On the opposite, the connection is switched back to the regular mode when the last unsubscription is received by the server, which is represented by the closing or dropping of the PubSubStream instance.
One of the pillars of Rustis is to stick to Redis' design in an as thin as possible layer.

I don't sync your design is bad. If what you need is a background task to subscribe, unsubscribe and listen to events, it is totally ok to keep a stream inside the task and send commands to the task via a mpsc. I think you don't need to create a fake channel subscription. You could hold the stream as on option and actually create it whenever the first subscription command is received.

I'm not sure the design you are requesting would be that much simpler. Would you mind rewriting your example with the design you propose ?

Cheers,

Michaël

from rustis.

richardhenry avatar richardhenry commented on August 27, 2024

Thanks, this is good feedback around not needing the "void" channel, you're right. My implementation now looks like below.

let mut stream: PubSubStream;
let mut channels: HashSet<String> = HashSet::new();

loop {
    match self.cmd_rx.recv().await {
        Some(FetchCmd::Subscribe(channel)) => {
            channels.insert(channel.clone());
            stream = redis_con().await?.subscribe(channel).await?;
            break;
        }
        Some(_) => (),
        None => return Ok(()),
    }
}

loop {
    let cmd: Option<PubSubCmd>;

    loop {
        select! {
            value = stream.next() => {
                if let Some(msg) = value {
                    // Handle msg...
                } else {
                    // The Redis pub/sub connection was dropped. Exit.
                    cmd = None;
                    break;
                }
            }
            value = self.cmd_rx.recv() => {
                // If value is Some: Break out of the inner loop, update the connection, then listen again.
                // If value is None: The cmd_tx owned by FetchPub was dropped. Exit.
                cmd = value;
                break;
            }
        }
    }

    match cmd {
        Some(FetchCmd::Subscribe(channel)) => {
            if channels.insert(channel.clone()) {
                stream.subscribe(channel).await?;
            }
        }
        Some(FetchCmd::Unsubscribe(channel)) => {
            if channels.remove(&channel) {
                stream.unsubscribe(channel).await?;
            }
        }
        None => return Ok(()),
    };
}

I guess I agree that this isn't bad, but it does feel like this interface could be quite a bit nicer if the library allowed you to split the PubSubStream. I was imagining something like:

let mut stream = redis_con().await?.subscribe("channel").await?;
let (sink, mut split_stream) = stream.split();

// Retain the sink somewhere else and allow calling subscribe() or unsubscribe() directly on it from another async task.

// We can then occupy this task listening for messages, and not worry about subscribing/unsubscribing.
while let Some(Ok(msg)) = split_stream.next().await {
    // Handle message.
}

The split() method is part of the StreamExt trait. Here's an example of a split stream in tokio-tungstenite, where you have one half for sending websocket messages and another half for receiving them: https://github.com/snapview/tokio-tungstenite/blob/master/examples/client.rs#L32

The docs on split say "Splits this Stream + Sink object into separate Sink and Stream objects. This can be useful when you want to split ownership between tasks..." which is exactly what I'd like to do here, I think.

Btw I appreciate that you want to keep the library as thin as possible and also think that that's a good goal.

from rustis.

mcatanzariti avatar mcatanzariti commented on August 27, 2024

Hi @richardhenry,

Could you please check 395014c and let me know if all is ok for you ?

I finally added a Client::create_pub_sub() method.

from rustis.

richardhenry avatar richardhenry commented on August 27, 2024

Cool, thanks! I'll try migrating my code over to this and let you know how it goes.

from rustis.

mcatanzariti avatar mcatanzariti commented on August 27, 2024

ok I will wait for your feedback before publishing a new release

from rustis.

richardhenry avatar richardhenry commented on August 27, 2024

Just implemented these changes, this is a huge improvement. My code for managing this is about 1/5th the size now. This is the nicest Redis pub/sub API I've used in Rust!

from rustis.

mcatanzariti avatar mcatanzariti commented on August 27, 2024

Is it possible that you paste here a sample of your code to see the result, please?

from rustis.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.