dahomey-technologies / rustis Goto Github PK
View Code? Open in Web Editor NEWAn asynchronous Redis client for Rust
Home Page: https://docs.rs/rustis
License: MIT License
An asynchronous Redis client for Rust
Home Page: https://docs.rs/rustis
License: MIT License
Hi, I was facing some trouble using Redis Stack and the pool features provided by this package. Is there any example or sample code where I can learn how to use those features of this package?
Thank you.
Currently the field is of type Vec<(String, u16)>
. However, use a slice (e.g., &[(String, u16)]
) will result in better performance. In our specific scenario, the address remains fixed when used in production (using the service name in Docker Swarm cluster), so the instance addresses can be constant in the binary:
const INSTANCES: [(&str, u16); 3] = [
("redis-sentinel-1", 26379),
("redis-sentinel-2", 26379),
("redis-sentinel-3", 26379),
];
Even it could be &[(&str, u16)]
by add a trait IntoSentinelConfig
similar to IntoConfig
. Is it feasible for me to implement this?
When trying to connect to upstash redis I get an error on Client::connect(upstash_url) which says Client("Disconnected by peer")
. When running with RUST_BACKTRACE=1 I don't get any additional information. I have tokio-tls enabled as a feature flag however, using tokio-runtime returns the same error. I have made sure redis is up by connecting to it through redis-cli.
I'm getting a "Cannot send command XREADGROUP with mismatched key slots" error when using the xreadgroup function with a count provided by XReadGroupOptions against redis cluster. This is odd because I am only using a single key with this command.
My caller looks like this:
let read_result: Result<Vec<(String, Vec<StreamEntry<BulkString>>)>, _> = rc
.xreadgroup(
&self.group.name,
&self.name,
XReadGroupOptions::default().count(1),
[&stream],
[&cursor],
)
.await;
When I turn on debug logging for rustis, I see the following in my logs:
[...] Analyzing command Command { name: "XREADGROUP", args: CommandArgs { args: ["GROUP", "[group name]", "[consumer name]", "COUNT", "1", "STREAMS", "[stream name]", "0"] } }
[...] keys: ["COUNT", "1"], slots: [1092, 9842]
[...] Error while writing batch: Client error: [...] Cannot send command XREADGROUP with mismatched key slots
It looks like the keys are erroneously being extracted as ["COUNT", "1"] instead of the stream name.
Hi,
First of all I just wanted to say thanks for all the work that has been put into this library, from what I've seen it seems great.
My question is related to maturity and production readiness. I'm starting a new project and I'm studying the possiblity to do it in rust. I'll need reliable redis streams support.
How would you compare it in terms of maturity to the most popular out there redis-rs
?
Would you mind sharing the motivations or plans for this library?
Thanks!
Redis Version: 7.0.12
I use the connection with bb8 pool in cluster mode, the following is some code i written
static mut REDIS_POOL: Option<Pool<PooledClientManager>> = None;
/// this function will be invoked on app start
pub async fn init() {
let config =rustis::client::Config {
// cluster config
};
let manager = PooledClientManager::new(config)?;
unsafe {
REDIS_POOL = Some(
rustis::bb8::Pool::builder()
.idle_timeout(Some(redis_settings.get_idle_timeout()))
.max_size(redis_settings.pool_size)
.build(manager)
.await?,
)
}
}
#[inline]
pub async fn get_pooled_client<'a>(
) -> Result<PooledConnection<'a, PooledClientManager>, RunError<rustis::Error>> {
let pool = unsafe { REDIS_POOL.as_ref().expect("Can't read RedisPool") };
pool.get().await
}
pub async fn blpop_key(key: &str) -> AppResult<Option<(String, BulkString)>> {
let client = get_pooled_client().await?;
Ok(client.blpop(key, 25.0).await?)
}
pub async fn rpush(key: &str, value: &[u8]) -> AppResult<usize> {
let client = get_pooled_client().await?;
Ok(client.rpush(key, value).await?)
}
pub async fn subscribe() {
let channels = vec![ ];
//
let client = match get_pooled_client().await {
Ok(client) => client,
Err(err) => {
log::debug!(
"Error on get redis connection on subscribe default, {}",
err.to_string()
);
return;
}
};
match client.subscribe(channels).await {
Ok(mut pub_sub_stream) =>{
match pub_sub_stream.next().await {
.......
}
} ,
Err(err) => {
log::debug!("Error on subscribe , {}", err.to_string());
}
}
}
Time 1:all the connection are established to redis server, the following socket fd status is outputed by the
lsof -p
command
xnebula-h 30179 root 11u IPv4 221212 0t0 TCP TEST-EAP-APP-3:57752->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root 12u IPv4 221215 0t0 TCP TEST-EAP-APP-3:57758->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root 13u IPv4 221213 0t0 TCP TEST-EAP-APP-3:43418->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root 14u IPv4 221214 0t0 TCP TEST-EAP-APP-3:47798->192.168.3.70:redis (ESTABLISHED)
xnebula-h 30179 root 15u IPv4 221216 0t0 TCP TEST-EAP-APP-3:43424->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root 16u IPv4 221217 0t0 TCP TEST-EAP-APP-3:47804->192.168.3.70:redis (ESTABLISHED)
xnebula-h 30179 root 17u IPv4 208636 0t0 TCP TEST-EAP-APP-3:57766->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root 18u IPv4 208637 0t0 TCP TEST-EAP-APP-3:43432->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root 19u IPv4 208638 0t0 TCP TEST-EAP-APP-3:47812->192.168.3.70:redis (ESTABLISHED)
Time 2:after some seconds, the socket fd status, some socket was in
CLOSE_WAIT
status
xnebula-h 30179 root 11u IPv4 221212 0t0 TCP TEST-EAP-APP-3:57752->192.168.3.72:6383 (CLOSE_WAIT)
xnebula-h 30179 root 12u IPv4 220540 0t0 TCP TEST-EAP-APP-3:57780->192.168.3.72:6383 (ESTABLISHED)
xnebula-h 30179 root 13u IPv4 221213 0t0 TCP TEST-EAP-APP-3:43418->192.168.3.71:6381 (CLOSE_WAIT)
xnebula-h 30179 root 14u IPv4 221214 0t0 TCP TEST-EAP-APP-3:47798->192.168.3.70:redis (ESTABLISHED)
xnebula-h 30179 root 15u IPv4 220541 0t0 TCP TEST-EAP-APP-3:43446->192.168.3.71:6381 (ESTABLISHED)
xnebula-h 30179 root 16u IPv4 220542 0t0 TCP TEST-EAP-APP-3:47826->192.168.3.70:redis (ESTABLISHED)
when after Time 2
, the CPU will become high
the redis server has configured timeout with 60 seconds
Then use flamegraph to trace CPU, the following is the cpu usage svg captured by flamegraph
According the svg, there has about 96% CPU usage by
rustis::network::network_handler::NetworkHandler::connect::_{{closure}}::_{{closure}}
rustis::network::network_handler::NetworkHandler::network_loop::_{{closure}}::_{{closure}}::_{{closure}}
This issue can be reproduced in cluster mode, when run in singleton mode, the issue disappears
please help ~~~
thank you ~
Hey, first, thanks for the lib, much appreciated!
I did a connect to redis like this:
let redis_cli = Client::connect("127.0.0.1:6379").await;
which works just fine, if the redis server is running.
But if I kill redis (while still connected), I get error spamming in the console
2024-04-02T00:05:33.200878Z ERROR rustis::network::network_handler: [127.0.0.1:6379] Failed to reconnect: IO("[connection refused] Connection refused (os error 111)")
2024-04-02T00:05:33.200896Z ERROR rustis::network::network_handler: [127.0.0.1:6379] Failed to reconnect: IO("[connection refused] Connection refused (os error 111)")
2024-04-02T00:05:33.200914Z ERROR rustis::network::network_handler: [127.0.0.1:6379] Failed to reconnect: IO("[connection refused] Connection refused (os error 111)")
Is there some way around this?
Not sure how the reconnect logic is for that case, but it seems like a busy loop?
Hello there, impressive work!
I have problem launching example with pool through bb8
with dependencies like this
[dependencies]
rustis = { version = "0.11.0", features = ["pool", "redis-bloom", "tokio-runtime"] }
tokio = { version = "1.27.0", features = ["full"] }
and code
use rustis::Result;
#[cfg(feature = "pool")]
use rustis::{client::PooledClientManager, commands::StringCommands};
#[tokio::main]
async fn main() -> Result<()> {
#[cfg(feature = "pool")]
{
let manager = PooledClientManager::new("127.0.0.1:6379")?;
let pool = rustis::bb8::Pool::builder()
.max_size(10)
.build(manager)
.await?;
let client1 = pool.get().await.unwrap();
client1.set("key1", "value1").await?;
let value: String = client1.get("key1").await?;
println!("value: {value:?}");
let client2 = pool.get().await.unwrap();
client2.set("key2", "value2").await?;
let value: String = client2.get("key2").await?;
println!("value: {value:?}");
}
Ok(())
}
rust-analyzer wrote me that code is inactive since feature "pool" is disabled. Which is strange since i added it to the dependencies.
Currrently ts_mrange
is unusable, since I need to get raw ungrouped values, but the API forces a grouping.
Description
When the master node of redis-server crash, reconnecting will not obtain the address of the updated master node and generate an error:
Err(Client("Disconnected from server"))
To Reproduce
According to the document of redis.
+----+
| M1 |
| S1 |
+----+
|
+----+ | +----+
| R2 |----+----| R3 |
| S2 | | S3 |
+----+ +----+
Masters are called M1, M2, M3, ..., Mn.
Replicas are called R1, R2, R3, ..., Rn (R stands for replica).
Sentinels are called S1, S2, S3, ..., Sn.
Additional context
SentinelConnection::reconnect()
and found that reconnecting usingStandaloneConnection::reconnect()
does not re-obtain the host and port of the updated master nodeClusterConnection::connect_to_cluster()
in ClusterConnection::reconnect()
to re-obtain the host and port of the nodethe commands such as blpop, brpush, xread...block and so on
thank you ~
Is there a seamless way of deserializing a GraphResultSet into json?
The client application, which uses rustis to connect to a Redis cluster, sometimes doesn't receive a response to a command, or the command execution timeout is triggered.
rustis
to connect to redis cluster;Last commands does not receive response or times out
All commands run to completion without timeout
The problem feels like some Futute
is not properly polled, and computation proceeds as long as new commands sent to redis.
I've did some debugging and seems like I found out the problem, but can't figure out the easy fix for a problem.
I believe there is a problems with this network loop:
rustis/src/network/network_handler.rs
Lines 136 to 145 in a53ac5d
self.msg_receiver.next()
is safe to drop become it does not incur any computations (afaik), while the future returned by self.connection.read()
is not, in case of ClusterConnection
: rustis/src/network/cluster_connection.rs
Lines 426 to 552 in a53ac5d
ClusterConnection::read
it might be interrupted at the middle of execution, somewhere between rustis/src/network/cluster_connection.rs
Line 431 in a53ac5d
return
s, and between first await and exits from methods there are multiple suspension points (await
s).NetworkHandler::network_loop
the select!
will create two new futures for MsgReceiver::next
and ClusterConnection::read
, so the new future produced by ClusterConnection::read
will first wait for new bytes from socket with redis, while it should "resume" with handling of response from previous future which is cancelled.
As a workaround I've tried locally to move
rustis/src/network/cluster_connection.rs
Lines 451 to 459 in a53ac5d
loop
inside ClusterConnection::read
and it seems to "make" ClusterConnection::read
behave like "resumable" operation. But I'm not sure the change I've made correct at all, even so it seems to fix the app I've tested with.
The open question is what to do next, I see that either:
Connection
seems like kind of Stream
and it should be combined with MsgReceiver
using select
to produce combined stream which is later could be handled with while
loop with match
inside;Connection::read
on all types of connections must be cancel safe, so new calls to read
must "resume" previously cancelled read (the question here is how to make this robust to further changes, i.e. how to prevent cancel-safe future to become unsafe again without notice).Thanks for open sourcing this library!
My application has these requirements for Pub/Sub (which I don't think are particularly unusual):
I have a hacky solution built on top of Rustis right now that looks kind of like this:
enum PubSubCmd {
Subscribe(String),
Unsubscribe(String),
}
async fn listen() -> Result<(), Error> {
// We don't yet have any channels to subscribe to, so subscribe to a "void" channel just so that we can get a PubSubStream.
let mut stream = redis_con().await?.subscribe("void").await?;
// Retain cmd_tx somewhere and use it to send subscribe/unsubscribe commands.
let (cmd_tx, mut cmd_rx) = mpsc::channel::<PubSubCmd>(32);
loop {
let cmd: Option<PubSubCmd>;
loop {
select! {
value = stream.next() => {
if let Some(msg) = value {
// I'm handling a message here...
} else {
// The Redis pub/sub connection was dropped and we can exit.
cmd = None;
break;
}
}
value = cmd_rx.recv() => {
// If value is Some: Break out of the inner loop, update the connection, then listen again.
// If value is None: The cmd_tx was dropped and we can exit.
cmd = value;
break;
}
}
}
match cmd {
Some(PubSubCmd::Subscribe(channel)) => stream.subscribe(channel).await?,
Some(PubSubCmd::Unsubscribe(channel)) => stream.unsubscribe(channel).await?,
None => return Ok(()), // Exit
};
}
}
This seems a lot more complicated than necessary. Looking through the library code, it seems like ideally I could do something like this instead:
pub_sub()
method that returns a PubSubStream
without subscribing to anything immediately.PubSubStream
already uses an mpsc channel internally, and seems like it could fairly easily support a split()
method to allow listening in one async task while sending subscribe/unsubscribe commands from another.Does something like this make sense? Let me know what you think. Thanks!
I see rustis lacks the unsubscribe method. Yes there is close, but it is not remotely close to unsubscribing a pubsub from a specific queue.
So I went ahead and integrated it myself for now. I thought I'd help u out and share the code.
I see that my code is not correctly highlighted, but for some reason I am not doing it correct, that's what I could come up with.
pub_sub_stream.rs
pub async fn unsubscribe<C, CC>(&mut self, channels: CC) -> Result<()>
where
C: SingleArg + Send,
CC: SingleArgCollection<C>,
{
let channels = CommandArgs::default().arg(channels).build();
self.client
.unsubscribe_from_pub_sub_sender(&channels, &self.sender)
.await?;
let mut existing_channels = CommandArgs::default();
std::mem::swap(&mut existing_channels, &mut self.channels);
self.channels = existing_channels.arg(channels).build();
Ok(())
}
client.rs
pub(crate) async fn unsubscribe_from_pub_sub_sender(
&self,
channels: &CommandArgs,
pub_sub_sender: &PubSubSender,
) -> Result<()> {
let (result_sender, result_receiver): (ResultSender, ResultReceiver) = oneshot::channel();
let pub_sub_senders = channels
.into_iter()
.map(|c| (c.to_vec(), pub_sub_sender.clone()))
.collect::<Vec<_>>();
let message = Message::pub_sub(
cmd("UNSUBSCRIBE").arg(channels.clone()),
result_sender,
pub_sub_senders,
);
self.send_message(message)?;
result_receiver.await??.to::<()>()
}
If you call subscribe() on PubSubStream multiple times with the same channel the following assertion error will occur when close() is called on the PubSubStream (or when it is dropped):
thread 'rocket-worker-thread' panicked at .../src/index.crates.io-6f17d22bba15001f/rustis-0.12.8/src/network/network_handler.rs:545:17:
[localhost:6379] Received unexpected message: Ok(Push([BulkString("unsubscribe"), BulkString("[channel name]"), Integer(0)]))
This is fixed by maintaining a hash set of channels in my application code and checking if the channel is already subscribed before calling subscribe, which is a good thing to do anyway — but this would probably be good to fix in Rustis too.
When I run the xgroup create command against Redis Cluster with the mkstream option:
con.xgroup_create(..., XGroupCreateOptions::default().mk_stream()).await?
I am getting the following error:
Error::Redis(RedisError { kind: Moved { hash_slot: 7183, address: ("10.224.2.49", 6379) }, description: "" })
For some reason this redirect isn't being followed correctly.
Hey, very nice job here - I see this supports both sentinel, json and search modules which is perfect for my use. I was searching through tests and examples of how the api can be used most efficiently and found out one thing I'm not sure of - about how to map FtSearchResult
to a rust struct. Suppose I have a JSON
list of fruits and I want to filter them by color, size etc. So I create an index on desired fields and suppose that I know exactly, I'll be able to narrow down result to an Optional<Fruit>
. How's the best way/is it possible with some helpers to transform FtSearchResult
.results into an exact rust struct? I suppose I could map values
which is a Vec<String, String> for my needs? Is this vector some kind of a map of key->value? Any better way to map this, maybe by putting some directive on Fruit
struct?
Update:
A quick check in redis docs: (https://redis.io/docs/interact/search-and-query/indexing/) and I think that this response corresponds to Redis response and in my case, second string from the vec will be stringified Fruit
json granted that I specified I'm looking for the whole thing - $
. I suppose, I should unmarshal string into Fruit
by hand?
PS: Can you also support redis-sentinel
protocol in connection string? It seems that it panics when it's provided.
Thanks in advance.
Trying to wrap a pipeline in a function, but can't get it to work:
async fn test<T: serde::de::DeserializeOwned>(client: &rustis::client::Client) -> rustis::Result<T> {
let mut pipe = client.create_pipeline();
pipe.get::<_, ()>("key1").queue();
pipe.get::<_, ()>("key2").queue();
pipe.execute().await.unwrap()
}
Calling code: let r : (String, String) = test(client).await.unwrap();
I'm getting the following error:
the trait bound `rustis::Error: Deserialize<'_>` is not satisfied
the following other types implement trait `Deserialize<'de>`:
bool
char
isize
i8
i16
i32
i64
i128
and 193 others
required for `Result<T, rustis::Error>` to implement `for<'de> Deserialize<'de>`
required for `Result<T, rustis::Error>` to implement `DeserializeOwned`
I'm using a bb8 connection pool in my application, which was created like this:
let manager = PooledClientManager::new(REDIS_URL.to_string())
.expect("failed to create redis client manager");
rustis::bb8::Pool::builder()
.max_size(256)
.build_unchecked(manager)
Then a tokio task in my application needs to listen to pub/sub for a while, so it retrieves a connection from the pool:
let (sink, stream) = pool.get().await?.create_pub_sub()
When the task completes, the connection is returned to the pool. At the same time, the bb8 connection pool is being used for other parts of my application for non pub/sub related connections.
After my application has been running for a few moments and connections have been recycled in/out of the pool, another part of my code retrieves a connection from the pool and uses it to send a command (e.g. xadd). This will cause a panic in rustis internals. I have pasted the panic at the bottom of this issue.
If I don't use a connection pool for the pub/sub connections this issue goes away (so that is what I'm doing for now).
The same error also occurs if I use the previous version of rustis and call subscribe instead of create_pub_sub, so I do not think this issue is caused by the latest changes on main.
The error and backtrace:
thread 'rocket-worker-thread' panicked at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17:
[127.0.0.1:6381] Received unexpected message: Ok(SimpleString("PONG"))
stack backtrace:
0: rust_begin_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
2: rustis::network::network_handler::NetworkHandler::receive_result
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:545:17
3: rustis::network::network_handler::NetworkHandler::handle_result::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:410:25
4: rustis::network::network_handler::NetworkHandler::network_loop::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:153:47
5: rustis::network::network_handler::NetworkHandler::connect::{{closure}}::{{closure}}
at /Users/richard/.cargo/git/checkouts/rustis-30c4e1a4cdea814e/e59ced7/src/network/network_handler.rs:138:60
6: <core::pin::Pin<P> as core::future::future::Future>::poll
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/future/future.rs:125:9
7: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
8: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
9: tokio::runtime::task::core::Core<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
10: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
11: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
12: std::panicking::try::do_call
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
13: ___rust_try
14: std::panicking::try
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
15: std::panic::catch_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
16: tokio::runtime::task::harness::poll_future
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
17: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
18: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
19: tokio::runtime::task::raw::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
20: tokio::runtime::task::raw::RawTask::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
21: tokio::runtime::task::LocalNotified<S>::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:416:9
22: tokio::runtime::scheduler::multi_thread::worker::Context::run_task::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:576:13
23: tokio::runtime::coop::with_budget
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:107:5
24: tokio::runtime::coop::budget
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/coop.rs:73:5
25: tokio::runtime::scheduler::multi_thread::worker::Context::run_task
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:575:9
26: tokio::runtime::scheduler::multi_thread::worker::Context::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:526:24
27: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:491:21
28: tokio::runtime::context::scoped::Scoped<T>::set
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/scoped.rs:40:9
29: tokio::runtime::context::set_scheduler::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:26
30: std::thread::local::LocalKey<T>::try_with
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:270:16
31: std::thread::local::LocalKey<T>::with
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/thread/local.rs:246:9
32: tokio::runtime::context::set_scheduler
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context.rs:176:9
33: tokio::runtime::scheduler::multi_thread::worker::run::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:486:9
34: tokio::runtime::context::runtime::enter_runtime
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/context/runtime.rs:65:16
35: tokio::runtime::scheduler::multi_thread::worker::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:478:5
36: tokio::runtime::scheduler::multi_thread::worker::Launch::launch::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/scheduler/multi_thread/worker.rs:447:45
37: <tokio::runtime::blocking::task::BlockingTask<T> as core::future::future::Future>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/task.rs:42:21
38: tokio::runtime::task::core::Core<T,S>::poll::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:328:17
39: tokio::loom::std::unsafe_cell::UnsafeCell<T>::with_mut
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/loom/std/unsafe_cell.rs:16:9
40: tokio::runtime::task::core::Core<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/core.rs:317:13
41: tokio::runtime::task::harness::poll_future::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:485:19
42: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panic/unwind_safe.rs:272:9
43: std::panicking::try::do_call
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:552:40
44: ___rust_try
45: std::panicking::try
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:516:19
46: std::panic::catch_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panic.rs:142:14
47: tokio::runtime::task::harness::poll_future
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:473:18
48: tokio::runtime::task::harness::Harness<T,S>::poll_inner
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:208:27
49: tokio::runtime::task::harness::Harness<T,S>::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/harness.rs:153:15
50: tokio::runtime::task::raw::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:271:5
51: tokio::runtime::task::raw::RawTask::poll
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/raw.rs:201:18
52: tokio::runtime::task::UnownedTask<S>::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/task/mod.rs:453:9
53: tokio::runtime::blocking::pool::Task::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:159:9
54: tokio::runtime::blocking::pool::Inner::run
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:513:17
55: tokio::runtime::blocking::pool::Spawner::spawn_thread::{{closure}}
at /Users/richard/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.35.1/src/runtime/blocking/pool.rs:471:13
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
3|tracker_ | thread 'tokio-runtime-worker' panicked at /root/.cargo/registry/src/index.crates.io-6f17d22bba15001f/rustis-0.12.9/src/network/network_handler.rs:545:17:
3|tracker_ | [private:6379] Received unexpected message: Ok(Push([BulkString("unsubscribe"), BulkString("tracker:346240269"), Integer(0)]))
3|tracker_ | note: run with RUST_BACKTRACE=1
environment variable to display a backtrace
Info is somehow limited it seems to be coming from the rustis library itself? It's on 0.12.8.
If I call the xadd command without a return type I get an Error::Client("Expected nil")
runtime error:
con.xadd(...).await?;
This is fixed by specifying a return type:
let _: String = con.xadd(...).await?;
It's pretty easy to make this mistake, and it doesn't get caught until runtime which isn't great. I'm assuming that this is also an issue with other commands. If the caller expects ()
, maybe this shouldn't be an error at all?
Hi! In your sentinel connection flow you're overwriting the config username/password with sentinel ones, here:
rustis/src/network/sentinel_connection.rs
Lines 73 to 74 in 29b2b5f
Then, in the loop section, you're properly using creds to AUTH in HELLO command in the sentinel mode, then all good during obtaining master node address, but when connecting to the master node here:
rustis/src/network/sentinel_connection.rs
Line 112 in 29b2b5f
It all falls apart due to the credentials still being sentinel ones. In case that someone has the same creds for sentinel and standard nodes, all is fine, but in my case, I have a valid configuration, where sentinel_username and sentinel_password are different than standard username/password. When you connect to master node, using sentinel creds won't work and you should use standard user/pass combination here. I see two solutions for this problems:
[Solution 1]
Add optional sentinel_username and sentinel_password in the config object to differentiate them from standard credentials. In redis they are a separate things, where you specify sentinel_password as a query param in URL. This way, during sentinel phase, you can use sentinel creds and later on standard creds.
[Solution 2]
Store standard config.username, config.password somewhere else and after the sentinel phase is done, overwrite creds back to the standard creds.
IMHO the cleaner option is solution 1 as that's a common practice to differentiate these options. E.g see this: https://github.com/redis/rueidis/blob/main/sentinel.go#L24 where they differentiate between mOpts (master opts) and sOpts (sentinel opts) I believe.
Please, refer to this urgently.
code:
match REDIS_CONNECT.get().await {
Some(client) => {
let len = client.json_arrlen::<String,String, Vec<Option<usize>>>(STRATEGY_KEY.to_string(), ".".to_string()).await.unwrap();
println!("===handler_add_strategy==len={:?}",len);
},
_ => {}
}
error: called Result::unwrap()
on an Err
value: Client("Cannot parse to sequence a RESP value starting with :")
This library doesn't require all the dependencies pulled by futures
except for these 2 sub-modules.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.