Code Monkey home page Code Monkey logo

rustis's People

Contributors

laptou avatar mcatanzariti avatar mstyura avatar orion-tran avatar rakshith-ravi avatar richardhenry avatar shedrachokonofua avatar yyaroshevich avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

rustis's Issues

Using Redis Stack and Pool Feature

Hi, I was facing some trouble using Redis Stack and the pool features provided by this package. Is there any example or sample code where I can learn how to use those features of this package?

Thank you.

Shall we use slice type for `instances` field for `SentinelConfig`?

Currently the field is of type Vec<(String, u16)>. However, use a slice (e.g., &[(String, u16)]) will result in better performance. In our specific scenario, the address remains fixed when used in production (using the service name in Docker Swarm cluster), so the instance addresses can be constant in the binary:

const INSTANCES: [(&str, u16); 3] = [
    ("redis-sentinel-1", 26379),
    ("redis-sentinel-2", 26379),
    ("redis-sentinel-3", 26379),
];

Even it could be &[(&str, u16)] by add a trait IntoSentinelConfig similar to IntoConfig. Is it feasible for me to implement this?

Client("Disconnected by peer")

When trying to connect to upstash redis I get an error on Client::connect(upstash_url) which says Client("Disconnected by peer"). When running with RUST_BACKTRACE=1 I don't get any additional information. I have tokio-tls enabled as a feature flag however, using tokio-runtime returns the same error. I have made sure redis is up by connecting to it through redis-cli.

[bug] XREADGROUP command erroneous "mismatched key slots" error with cluster

I'm getting a "Cannot send command XREADGROUP with mismatched key slots" error when using the xreadgroup function with a count provided by XReadGroupOptions against redis cluster. This is odd because I am only using a single key with this command.

My caller looks like this:

let read_result: Result<Vec<(String, Vec<StreamEntry<BulkString>>)>, _> = rc
    .xreadgroup(
        &self.group.name,
        &self.name,
        XReadGroupOptions::default().count(1),
        [&stream],
        [&cursor],
    )
    .await;

When I turn on debug logging for rustis, I see the following in my logs:

[...] Analyzing command Command { name: "XREADGROUP", args: CommandArgs { args: ["GROUP", "[group name]", "[consumer name]", "COUNT", "1", "STREAMS", "[stream name]", "0"] } }
[...] keys: ["COUNT", "1"], slots: [1092, 9842]
[...] Error while writing batch: Client error: [...] Cannot send command XREADGROUP with mismatched key slots

It looks like the keys are erroneously being extracted as ["COUNT", "1"] instead of the stream name.

Should I use this library in production?

Hi,

First of all I just wanted to say thanks for all the work that has been put into this library, from what I've seen it seems great.

My question is related to maturity and production readiness. I'm starting a new project and I'm studying the possiblity to do it in rust. I'll need reliable redis streams support.

How would you compare it in terms of maturity to the most popular out there redis-rs?
Would you mind sharing the motivations or plans for this library?

Thanks!

Help~, I have High CPU HZ problem when use in cluster mode

Redis Version: 7.0.12

I use the connection with bb8 pool in cluster mode, the following is some code i written

static mut REDIS_POOL: Option<Pool<PooledClientManager>> = None;
/// this function will be invoked on app start
pub async fn init() {
  let config =rustis::client::Config {
     // cluster config
  };
  let manager = PooledClientManager::new(config)?;
      unsafe {
          REDIS_POOL = Some(
              rustis::bb8::Pool::builder()
                  .idle_timeout(Some(redis_settings.get_idle_timeout()))
                  .max_size(redis_settings.pool_size)
                  .build(manager)
                  .await?,
          )
      }
}

#[inline]
pub async fn get_pooled_client<'a>(
) -> Result<PooledConnection<'a, PooledClientManager>, RunError<rustis::Error>> {
    let pool = unsafe { REDIS_POOL.as_ref().expect("Can't read RedisPool") };
    pool.get().await
}

pub async fn blpop_key(key: &str) -> AppResult<Option<(String, BulkString)>> {
    let client = get_pooled_client().await?;
    Ok(client.blpop(key, 25.0).await?)
}

pub async fn rpush(key: &str, value: &[u8]) -> AppResult<usize> {
    let client = get_pooled_client().await?;
    Ok(client.rpush(key, value).await?)
}

pub async fn subscribe() {
     let channels = vec![ ];
        // 
        let client = match get_pooled_client().await {
            Ok(client) => client,
            Err(err) => {
                log::debug!(
                    "Error on get redis connection on subscribe default, {}",
                    err.to_string()
                );
                return;
            }
        };
        match client.subscribe(channels).await {
            Ok(mut pub_sub_stream) =>{
                match pub_sub_stream.next().await { 
                  .......
               }
            } ,
            Err(err) => {
                log::debug!("Error on subscribe , {}", err.to_string());
            }
       }
}

After Process Start

Time 1:all the connection are established to redis server, the following socket fd status is outputed by the lsof -p command

xnebula-h 30179 root   11u     IPv4             221212      0t0      TCP TEST-EAP-APP-3:57752->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root   12u     IPv4             221215      0t0      TCP TEST-EAP-APP-3:57758->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root   13u     IPv4             221213      0t0      TCP TEST-EAP-APP-3:43418->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root   14u     IPv4             221214      0t0      TCP TEST-EAP-APP-3:47798->192.168.3.70:redis (ESTABLISHED)
xnebula-h 30179 root   15u     IPv4             221216      0t0      TCP TEST-EAP-APP-3:43424->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root   16u     IPv4             221217      0t0      TCP TEST-EAP-APP-3:47804->192.168.3.70:redis (ESTABLISHED)
xnebula-h 30179 root   17u     IPv4             208636      0t0      TCP TEST-EAP-APP-3:57766->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root   18u     IPv4             208637      0t0      TCP TEST-EAP-APP-3:43432->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root   19u     IPv4             208638      0t0      TCP TEST-EAP-APP-3:47812->192.168.3.70:redis (ESTABLISHED)

Time 2:after some seconds, the socket fd status, some socket was in CLOSE_WAIT status

xnebula-h 30179 root   11u     IPv4             221212      0t0      TCP TEST-EAP-APP-3:57752->192.168.3.72:6383 (CLOSE_WAIT)
xnebula-h 30179 root   12u     IPv4             220540      0t0      TCP TEST-EAP-APP-3:57780->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root   13u     IPv4             221213      0t0      TCP TEST-EAP-APP-3:43418->192.168.3.71:6381 (CLOSE_WAIT)
xnebula-h 30179 root   14u     IPv4             221214      0t0      TCP TEST-EAP-APP-3:47798->192.168.3.70:redis (ESTABLISHED)
xnebula-h 30179 root   15u     IPv4             220541      0t0      TCP TEST-EAP-APP-3:43446->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root   16u     IPv4             220542      0t0      TCP TEST-EAP-APP-3:47826->192.168.3.70:redis (ESTABLISHED)

when after Time 2, the CPU will become high
the redis server has configured timeout with 60 seconds

Then use flamegraph to trace CPU, the following is the cpu usage svg captured by flamegraph
003

According the svg, there has about 96% CPU usage by
rustis::network::network_handler::NetworkHandler::connect::_{{closure}}::_{{closure}}
rustis::network::network_handler::NetworkHandler::network_loop::_{{closure}}::_{{closure}}::_{{closure}}

This issue can be reproduced in cluster mode, when run in singleton mode, the issue disappears

please help ~~~
thank you ~

Busy loop at disconnect

Hey, first, thanks for the lib, much appreciated!

I did a connect to redis like this:

let redis_cli = Client::connect("127.0.0.1:6379").await;

which works just fine, if the redis server is running.
But if I kill redis (while still connected), I get error spamming in the console

2024-04-02T00:05:33.200878Z ERROR rustis::network::network_handler: [127.0.0.1:6379] Failed to reconnect: IO("[connection refused] Connection refused (os error 111)")    
2024-04-02T00:05:33.200896Z ERROR rustis::network::network_handler: [127.0.0.1:6379] Failed to reconnect: IO("[connection refused] Connection refused (os error 111)")    
2024-04-02T00:05:33.200914Z ERROR rustis::network::network_handler: [127.0.0.1:6379] Failed to reconnect: IO("[connection refused] Connection refused (os error 111)") 

Is there some way around this?
Not sure how the reconnect logic is for that case, but it seems like a busy loop?

feature "pool" is disabled

Hello there, impressive work!

I have problem launching example with pool through bb8

with dependencies like this

[dependencies]
rustis = { version = "0.11.0", features = ["pool", "redis-bloom", "tokio-runtime"] }
tokio = { version = "1.27.0", features = ["full"] }

and code

use rustis::Result;
#[cfg(feature = "pool")]
use rustis::{client::PooledClientManager, commands::StringCommands};

#[tokio::main]
async fn main() -> Result<()> {
    #[cfg(feature = "pool")]
    {
        let manager = PooledClientManager::new("127.0.0.1:6379")?;
        let pool = rustis::bb8::Pool::builder()
            .max_size(10)
            .build(manager)
            .await?;

        let client1 = pool.get().await.unwrap();
        client1.set("key1", "value1").await?;
        let value: String = client1.get("key1").await?;
        println!("value: {value:?}");

        let client2 = pool.get().await.unwrap();
        client2.set("key2", "value2").await?;
        let value: String = client2.get("key2").await?;
        println!("value: {value:?}");
    }

    Ok(())
}

rust-analyzer wrote me that code is inactive since feature "pool" is disabled. Which is strange since i added it to the dependencies.

Error reconnect in the master-replica model of Redis-Server with Redis Sentinel

Description
When the master node of redis-server crash, reconnecting will not obtain the address of the updated master node and generate an error:

Err(Client("Disconnected from server"))

To Reproduce
According to the document of redis.

  • Build the master-replica model of Redis-Server with Redis Sentinel
       +----+
       | M1 |
       | S1 |
       +----+
          |
+----+    |    +----+
| R2 |----+----| R3 |
| S2 |         | S3 |
+----+         +----+

Masters are called M1, M2, M3, ..., Mn.
Replicas are called R1, R2, R3, ..., Rn (R stands for replica).
Sentinels are called S1, S2, S3, ..., Sn.
  • Crash the master node of redis-server

Additional context

SentinelConnection
ClusterConnection

RedisGraph usage

Is there a seamless way of deserializing a GraphResultSet into json?

Ocassional command timeouts on cluster connection.

Problem description

The client application, which uses rustis to connect to a Redis cluster, sometimes doesn't receive a response to a command, or the command execution timeout is triggered.

Steps to reproduce

  1. Use rustis to connect to redis cluster;
  2. Send finite set of commands to redis cluster;

Actual result

Last commands does not receive response or times out

Expected result

All commands run to completion without timeout

More details

The problem feels like some Futute is not properly polled, and computation proceeds as long as new commands sent to redis.

I've did some debugging and seems like I found out the problem, but can't figure out the easy fix for a problem.
I believe there is a problems with this network loop:

loop {
select! {
msg = self.msg_receiver.next().fuse() => {
if !self.handle_message(msg).await { break; }
} ,
value = self.connection.read().fuse() => {
self.handle_result(value).await;
}
}
}

According to tokio tutorial select drops the futures which are not "selected" (completed). The future returned by self.msg_receiver.next() is safe to drop become it does not incur any computations (afaik), while the future returned by self.connection.read() is not, in case of ClusterConnection:
pub async fn read(&mut self) -> Option<Result<RespBuf>> {
let mut request_info: RequestInfo;
loop {
let read_futures = self.nodes.iter_mut().map(|n| n.connection.read().boxed());
let (result, node_idx, _) = future::select_all(read_futures).await;
if let Some(Ok(bytes)) = &result {
if bytes.is_push_message() {
return result;
}
}
let node_id = &self.nodes[node_idx].id;
if let Some(sub_request) = self.pending_requests.iter_mut().find_map(|r| {
r.sub_requests
.iter_mut()
.find(|sr| sr.node_id == *node_id && sr.result.is_none())
}) {
sub_request.result = Some(result);
} else {
return Some(Err(Error::Client("Received unexpected message".to_owned())));
};
if let Some(ri) = self.pending_requests.front() {
trace!("request_info: {ri:?}");
if ri.sub_requests.iter().all(|sr| sr.result.is_some()) {
if let Some(ri) = self.pending_requests.pop_front() {
request_info = ri;
break;
}
}
}
}
let mut sub_results =
Vec::<Result<RespBuf>>::with_capacity(request_info.sub_requests.len());
let mut retry_reasons = SmallVec::<[RetryReason; 1]>::new();
for sub_request in request_info.sub_requests.iter_mut() {
let result = sub_request.result.take()?;
if let Some(result) = result {
match &result {
Ok(resp_buf) if resp_buf.is_error() => match resp_buf.to::<()>() {
Err(Error::Redis(RedisError {
kind: RedisErrorKind::Ask { hash_slot, address },
description: _,
})) => retry_reasons.push(RetryReason::Ask {
hash_slot,
address: address.clone(),
}),
Err(Error::Redis(RedisError {
kind: RedisErrorKind::Moved { hash_slot, address },
description: _,
})) => retry_reasons.push(RetryReason::Moved {
hash_slot,
address: address.clone(),
}),
_ => sub_results.push(result),
},
_ => sub_results.push(result),
}
} else {
return None;
}
}
if !retry_reasons.is_empty() {
debug!(
"read failed and will be retried. reasons: {:?}",
retry_reasons
);
return Some(Err(Error::Retry(retry_reasons)));
}
let command_name = &request_info.command_name;
let command_info = self
.command_info_manager
.get_command_info_by_name(command_name);
let command_info = if let Some(command_info) = command_info {
command_info
} else {
return Some(Err(Error::Client(format!(
"Unknown command {}",
command_name
))));
};
let response_policy = command_info.command_tips.iter().find_map(|tip| {
if let CommandTip::ResponsePolicy(response_policy) = tip {
Some(response_policy)
} else {
None
}
});
// The response_policy tip is set for commands that reply with scalar data types,
// or when it's expected that clients implement a non-default aggregate.
if let Some(response_policy) = response_policy {
match response_policy {
ResponsePolicy::OneSucceeded => {
self.response_policy_one_succeeded(sub_results).await
}
ResponsePolicy::AllSucceeded => {
self.response_policy_all_succeeded(sub_results).await
}
ResponsePolicy::AggLogicalAnd => {
self.response_policy_agg(sub_results, |a, b| i64::from(a == 1 && b == 1))
}
ResponsePolicy::AggLogicalOr => {
self.response_policy_agg(
sub_results,
|a, b| if a == 0 && b == 0 { 0 } else { 1 },
)
}
ResponsePolicy::AggMin => self.response_policy_agg(sub_results, i64::min),
ResponsePolicy::AggMax => self.response_policy_agg(sub_results, i64::max),
ResponsePolicy::AggSum => self.response_policy_agg(sub_results, |a, b| a + b),
ResponsePolicy::Special => self.response_policy_special(sub_results).await,
}
} else {
self.no_response_policy(sub_results, &request_info).await
}
}
there is non-trivial future which is composition of several other futures. When network loop abandon the future produced by ClusterConnection::read it might be interrupted at the middle of execution, somewhere between
let (result, node_idx, _) = future::select_all(read_futures).await;
and one of the returns, and between first await and exits from methods there are multiple suspension points (awaits).
So on the next iteration of NetworkHandler::network_loop the select! will create two new futures for MsgReceiver::next and ClusterConnection::read, so the new future produced by ClusterConnection::read will first wait for new bytes from socket with redis, while it should "resume" with handling of response from previous future which is cancelled.

As a workaround I've tried locally to move

if let Some(ri) = self.pending_requests.front() {
trace!("request_info: {ri:?}");
if ri.sub_requests.iter().all(|sr| sr.result.is_some()) {
if let Some(ri) = self.pending_requests.pop_front() {
request_info = ri;
break;
}
}
}
to the beginning of the loop inside ClusterConnection::read and it seems to "make" ClusterConnection::read behave like "resumable" operation. But I'm not sure the change I've made correct at all, even so it seems to fix the app I've tested with.

The open question is what to do next, I see that either:

  1. Network loop should not drop future which is not yet completed and use same future across iteration until it completed (but quick experiment shown that it might be not trivial to make it work with borrow checker happy). On a high level Connection seems like kind of Stream and it should be combined with MsgReceiver using select to produce combined stream which is later could be handled with while loop with match inside;
  2. Connection::read on all types of connections must be cancel safe, so new calls to read must "resume" previously cancelled read (the question here is how to make this robust to further changes, i.e. how to prevent cancel-safe future to become unsafe again without notice).

PubSubStream subscribe/unsubscribe while iterating

Thanks for open sourcing this library!

My application has these requirements for Pub/Sub (which I don't think are particularly unusual):

  • I don't know which pub/sub channels I'm going to subscribe to ahead of time
  • I need to subscribe/unsubscribe to channels over time, while continuing to listen to messages on a connection

I have a hacky solution built on top of Rustis right now that looks kind of like this:

enum PubSubCmd {
    Subscribe(String),
    Unsubscribe(String),
}

async fn listen() -> Result<(), Error> {
    // We don't yet have any channels to subscribe to, so subscribe to a "void" channel just so that we can get a PubSubStream.
    let mut stream = redis_con().await?.subscribe("void").await?;

    // Retain cmd_tx somewhere and use it to send subscribe/unsubscribe commands.
    let (cmd_tx, mut cmd_rx) = mpsc::channel::<PubSubCmd>(32);

    loop {
        let cmd: Option<PubSubCmd>;

        loop {
            select! {
                value = stream.next() => {
                    if let Some(msg) = value {
                        // I'm handling a message here...
                    } else {
                        // The Redis pub/sub connection was dropped and we can exit.
                        cmd = None;
                        break;
                    }
                }
                value = cmd_rx.recv() => {
                    // If value is Some: Break out of the inner loop, update the connection, then listen again.
                    // If value is None: The cmd_tx was dropped and we can exit.
                    cmd = value;
                    break;
                }
            }
        }

        match cmd {
            Some(PubSubCmd::Subscribe(channel)) => stream.subscribe(channel).await?,
            Some(PubSubCmd::Unsubscribe(channel)) => stream.unsubscribe(channel).await?,
            None => return Ok(()), // Exit
        };
    }
}

This seems a lot more complicated than necessary. Looking through the library code, it seems like ideally I could do something like this instead:

  1. Connection could have a pub_sub() method that returns a PubSubStream without subscribing to anything immediately.
  2. PubSubStream already uses an mpsc channel internally, and seems like it could fairly easily support a split() method to allow listening in one async task while sending subscribe/unsubscribe commands from another.

Does something like this make sense? Let me know what you think. Thanks!

Unsubscribe method?

I see rustis lacks the unsubscribe method. Yes there is close, but it is not remotely close to unsubscribing a pubsub from a specific queue.

So I went ahead and integrated it myself for now. I thought I'd help u out and share the code.

I see that my code is not correctly highlighted, but for some reason I am not doing it correct, that's what I could come up with.

pub_sub_stream.rs

    pub async fn unsubscribe<C, CC>(&mut self, channels: CC) -> Result<()>
    where
        C: SingleArg + Send,
        CC: SingleArgCollection<C>,
    {
        let channels = CommandArgs::default().arg(channels).build();

        self.client
            .unsubscribe_from_pub_sub_sender(&channels, &self.sender)
            .await?;

        let mut existing_channels = CommandArgs::default();
        std::mem::swap(&mut existing_channels, &mut self.channels);
        self.channels = existing_channels.arg(channels).build();

        Ok(())
    }

client.rs

    pub(crate) async fn unsubscribe_from_pub_sub_sender(
        &self,
        channels: &CommandArgs,
        pub_sub_sender: &PubSubSender,
    ) -> Result<()> {
        let (result_sender, result_receiver): (ResultSender, ResultReceiver) = oneshot::channel();

        let pub_sub_senders = channels
            .into_iter()
            .map(|c| (c.to_vec(), pub_sub_sender.clone()))
            .collect::<Vec<_>>();

        let message = Message::pub_sub(
            cmd("UNSUBSCRIBE").arg(channels.clone()),
            result_sender,
            pub_sub_senders,
        );

        self.send_message(message)?;

        result_receiver.await??.to::<()>()
    }

Calling subscribe() multiple times on PubSubStream with the same channel results in an error on close()

If you call subscribe() on PubSubStream multiple times with the same channel the following assertion error will occur when close() is called on the PubSubStream (or when it is dropped):

thread 'rocket-worker-thread' panicked at .../src/index.crates.io-6f17d22bba15001f/rustis-0.12.8/src/network/network_handler.rs:545:17:
[localhost:6379] Received unexpected message: Ok(Push([BulkString("unsubscribe"), BulkString("[channel name]"), Integer(0)]))

This is fixed by maintaining a hash set of channels in my application code and checking if the channel is already subscribed before calling subscribe, which is a good thing to do anyway — but this would probably be good to fix in Rustis too.

[bug] xgroup_create mk_stream error with redis cluster

When I run the xgroup create command against Redis Cluster with the mkstream option:

con.xgroup_create(..., XGroupCreateOptions::default().mk_stream()).await?

I am getting the following error:

Error::Redis(RedisError { kind: Moved { hash_slot: 7183, address: ("10.224.2.49", 6379) }, description: "" })

For some reason this redirect isn't being followed correctly.

Redisearch object mapping - FtSearchResult to X

Hey, very nice job here - I see this supports both sentinel, json and search modules which is perfect for my use. I was searching through tests and examples of how the api can be used most efficiently and found out one thing I'm not sure of - about how to map FtSearchResult to a rust struct. Suppose I have a JSON list of fruits and I want to filter them by color, size etc. So I create an index on desired fields and suppose that I know exactly, I'll be able to narrow down result to an Optional<Fruit>. How's the best way/is it possible with some helpers to transform FtSearchResult.results into an exact rust struct? I suppose I could map values which is a Vec<String, String> for my needs? Is this vector some kind of a map of key->value? Any better way to map this, maybe by putting some directive on Fruit struct?

Update:
A quick check in redis docs: (https://redis.io/docs/interact/search-and-query/indexing/) and I think that this response corresponds to Redis response and in my case, second string from the vec will be stringified Fruit json granted that I specified I'm looking for the whole thing - $. I suppose, I should unmarshal string into Fruit by hand?

PS: Can you also support redis-sentinel protocol in connection string? It seems that it panics when it's provided.

Thanks in advance.

Question: How to wrap pipeline in function

Trying to wrap a pipeline in a function, but can't get it to work:

async fn test<T: serde::de::DeserializeOwned>(client: &rustis::client::Client) -> rustis::Result<T> {
    let mut pipe = client.create_pipeline();
    pipe.get::<_, ()>("key1").queue();
    pipe.get::<_, ()>("key2").queue();
    pipe.execute().await.unwrap()
}

Calling code: let r : (String, String) = test(client).await.unwrap();

I'm getting the following error:

the trait bound `rustis::Error: Deserialize<'_>` is not satisfied
the following other types implement trait `Deserialize<'de>`:
  bool
  char
  isize
  i8
  i16
  i32
  i64
  i128
and 193 others
required for `Result<T, rustis::Error>` to implement `for<'de> Deserialize<'de>`
required for `Result<T, rustis::Error>` to implement `DeserializeOwned`

[bug] Panic when using pub/sub with bb8 connection pooling

I'm using a bb8 connection pool in my application, which was created like this:

let manager = PooledClientManager::new(REDIS_URL.to_string())
    .expect("failed to create redis client manager");
rustis::bb8::Pool::builder()
    .max_size(256)
    .build_unchecked(manager)

Then a tokio task in my application needs to listen to pub/sub for a while, so it retrieves a connection from the pool:

let (sink, stream) = pool.get().await?.create_pub_sub()

When the task completes, the connection is returned to the pool. At the same time, the bb8 connection pool is being used for other parts of my application for non pub/sub related connections.

After my application has been running for a few moments and connections have been recycled in/out of the pool, another part of my code retrieves a connection from the pool and uses it to send a command (e.g. xadd). This will cause a panic in rustis internals. I have pasted the panic at the bottom of this issue.

If I don't use a connection pool for the pub/sub connections this issue goes away (so that is what I'm doing for now).

The same error also occurs if I use the previous version of rustis and call subscribe instead of create_pub_sub, so I do not think this issue is caused by the latest changes on main.

The error and backtrace:

thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: rustis::network::network_handler::NetworkHandler::receive_result
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17
   3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:410:25
   4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:153:47
   5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
             at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:138:60
   6: <core::pin::Pin<P> as core::future::future::Future>::poll
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/future/future.rs:125:9
   7: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
   8: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
   9: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
  10: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
  11: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  12: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  13: ___rust_try
  14: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  15: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  16: tokio::runtime::task::harness::poll_future
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
  17: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
  18: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
  19: tokio::runtime::task::raw::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
  20: tokio::runtime::task::raw::RawTask::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
  21: tokio::runtime::task::LocalNotified<S>::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:416:9
  22: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:576:13
  23: tokio::runtime::coop::with_budget
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:107:5
  24: tokio::runtime::coop::budget
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:73:5
  25: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:575:9
  26: tokio::runtime::scheduler::multi_thread::worker::Context::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:526:24
  27: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:491:21
  28: tokio::runtime::context::scoped::Scoped<T>::set
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/scoped.rs:40:9
  29: tokio::runtime::context::set_scheduler::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:26
  30: std::thread::local::LocalKey<T>::try_with
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:270:16
  31: std::thread::local::LocalKey<T>::with
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:246:9
  32: tokio::runtime::context::set_scheduler
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:9
  33: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:486:9
  34: tokio::runtime::context::runtime::enter_runtime
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/runtime.rs:65:16
  35: tokio::runtime::scheduler::multi_thread::worker::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:478:5
  36: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:447:45
  37: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/task.rs:42:21
  38: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
  39: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
  40: tokio::runtime::task::core::Core<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
  41: tokio::runtime::task::harness::poll_future::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
  42: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
  43: std::panicking::try::do_call
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
  44: ___rust_try
  45: std::panicking::try
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
  46: std::panic::catch_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
  47: tokio::runtime::task::harness::poll_future
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
  48: tokio::runtime::task::harness::Harness<T,S>::poll_inner
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
  49: tokio::runtime::task::harness::Harness<T,S>::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
  50: tokio::runtime::task::raw::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
  51: tokio::runtime::task::raw::RawTask::poll
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
  52: tokio::runtime::task::UnownedTask<S>::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:453:9
  53: tokio::runtime::blocking::pool::Task::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:159:9
  54: tokio::runtime::blocking::pool::Inner::run
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:513:17
  55: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
             at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:471:13
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

[Bug] Unsubscribe on drop implementation causes a panic when receiving malformed response?

3|tracker_ | thread 'tokio-runtime-worker' panicked at /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rustis-0.12.9/src/network/network_handler.rs:545:17:
3|tracker_ | [private:6379] Received unexpected message: Ok(Push([BulkString("unsubscribe"), BulkString("tracker:346240269"), Integer(0)]))
3|tracker_ | note: run with RUST_BACKTRACE=1 environment variable to display a backtrace

Info is somehow limited it seems to be coming from the rustis library itself? It's on 0.12.8.

Client("Expected nil") runtime errors when not specifying return type

If I call the xadd command without a return type I get an Error::Client("Expected nil") runtime error:

con.xadd(...).await?;

This is fixed by specifying a return type:

let _: String = con.xadd(...).await?;

It's pretty easy to make this mistake, and it doesn't get caught until runtime which isn't great. I'm assuming that this is also an issue with other commands. If the caller expects (), maybe this shouldn't be an error at all?

[Bug] Credentials overwrite during sentinel connection

Hi! In your sentinel connection flow you're overwriting the config username/password with sentinel ones, here:

config.username = sentinel_config.username.clone();
config.password = sentinel_config.password.clone();

Then, in the loop section, you're properly using creds to AUTH in HELLO command in the sentinel mode, then all good during obtaining master node address, but when connecting to the master node here:

StandaloneConnection::connect(&master_host, master_port, &config).await?;

It all falls apart due to the credentials still being sentinel ones. In case that someone has the same creds for sentinel and standard nodes, all is fine, but in my case, I have a valid configuration, where sentinel_username and sentinel_password are different than standard username/password. When you connect to master node, using sentinel creds won't work and you should use standard user/pass combination here. I see two solutions for this problems:

[Solution 1]
Add optional sentinel_username and sentinel_password in the config object to differentiate them from standard credentials. In redis they are a separate things, where you specify sentinel_password as a query param in URL. This way, during sentinel phase, you can use sentinel creds and later on standard creds.

[Solution 2]
Store standard config.username, config.password somewhere else and after the sentinel phase is done, overwrite creds back to the standard creds.

IMHO the cleaner option is solution 1 as that's a common practice to differentiate these options. E.g see this: https://github.com/redis/rueidis/blob/main/sentinel.go#L24 where they differentiate between mOpts (master opts) and sOpts (sentinel opts) I believe.

Please, refer to this urgently.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.