I'm an experienced software developer.
- Contact me
["aikorsky", "gmail.com"].join("@")
Asyncronous Rust Mysql driver based on Tokio.
License: Apache License 2.0
I have a wrapper around a Pool
object (using the new async update) which has a disconnect
method:
pub async fn disconnect(&self) -> Result<()> {
self.pool.clone().disconnect().await?;
Ok(())
}
When I want to exit my program, I run this on my tokio runtime with ::block_on
. However, it never returns.
cc @jonhoo
I need these to be able to do:
extern crate mysql_async as my;
use my::{QueryResult, Protocol, ConnectionLike};
use my::prelude::FromRow;
use tokio_async_await::compat::forward::IntoAwaitable;
pub async fn get_all_results<TupleType, T, P>(result : QueryResult<T, P>) -> Result<Vec<TupleType>, my::error::Error>
where TupleType: FromRow + Send + 'static,
P: Protocol + Send + 'static,
T: ConnectionLike + Sized + Send + 'static, {
let mut fut = result.collect();
let (_, data) :(_, Vec<TupleType>) = match await!(fut.into_awaitable()) {
Ok(r) => r,
Err(e) => {return Err(e);}
};
Ok(data)
}
pub async fn get_single_result<TupleType, T, P>(result : QueryResult<T, P>) -> Result<TupleType, my::error::Error>
where TupleType: FromRow + Send + 'static,
P: Protocol + Send + 'static,
T: ConnectionLike + Sized + Send + 'static, {
let mut data : Vec<TupleType> = await!(get_all_results(result))?;
if data.len() != 1 {
Err(my::error::Error::Io(std::io::Error::from(std::io::ErrorKind::InvalidData)))
}else {
Ok(data.remove(0))
}
}```
right now I need to use modified version to to make them public exported.
I'm trying to understand some of the concepts in mysql_async
and what is the best way for me to use it.
The first thing is what happens when I prepare a faulty statement leading to an error? Is there a possibility of having the connection object open and available for other queries, or do I always need to reconnect in case of an error?
What is the best pooling strategy? How are the connections returned to the pool after using them? Are the connections even up for multiple queries, or will they just be dropped and the pool will reconnect when asking another connection?
I've found some great resources to help with my plans, such as:
But eventually I'm worried that when our users use MySQL as their database, the performance in cases of faulty queries might be quite bad. What I'm after here is a common JDBC/Slick level abstraction over different databases and serving a common interface for the user. I'm still having a bit of trouble bending mysql_async
into this model.
While trying to minimize a test case for #22, I discovered that the issue really originated further "up" in my code. Specifically, if you use any of the drop_*
methods to execute multiple ;
-separated statements, only the first actually gets dropped. Consider the following code:
let mut core = tokio_core::reactor::Core::new().unwrap();
let mut opts = my::OptsBuilder::new();
opts.ip_or_hostname("localhost")
.user(Some("root"))
.pass(Some("password"))
.tcp_nodelay(true)
.db_name(Some("mysql"));
let c = my::Pool::new(opts, &core.handle());
let c = c.get_conn()
.and_then(|c| {
c.drop_query(
"UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1; \
UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1;",
)
})
.and_then(|c| c.prep_exec("SELECT 1", ()));
core.run(c).unwrap();
This code hangs since the SELECT
is waiting for the next resultset, but is instead met with the response to the UPDATE
. To see a crash, put the update in a transaction:
let c = c.get_conn()
.and_then(|c| {
c.start_transaction(my::TransactionOptions::new())
.and_then(|t| {
t.drop_query(
"UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1; \
UPDATE time_zone SET Time_zone_id = 1 WHERE Time_zone_id = 1;",
)
})
.and_then(|t| t.commit())
})
.and_then(|c| c.prep_exec("SELECT 1", ()));
core.run(c).unwrap();
Crashes with:
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error(Io(Custom { kind: UnexpectedEof, error: StringError("failed to fill whole buffer") }), State { next_error: None, backtrace: Some(stack backtrace:
0: 0x563b61cd96ce - backtrace::backtrace::libunwind::trace::h8654a9d0555d5a01
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/backtrace/libunwind.rs:53
- backtrace::backtrace::trace::h97b4908a240a2f61
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/backtrace/mod.rs:42
1: 0x563b61cd8035 - backtrace::capture::Backtrace::new_unresolved::hec1ca9ceaee1db45
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/capture.rs:88
- backtrace::capture::Backtrace::new::h7fa080eeed827e9d
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/capture.rs:63
2: 0x563b61cd7a7a - error_chain::make_backtrace::hffb33833ee042156
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/error-chain-0.11.0/src/lib.rs:616
3: 0x563b61cd7b18 - <error_chain::State as core::default::Default>::default::h1a16302fda4394fd
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/error-chain-0.11.0/src/lib.rs:710
4: 0x563b61be4b10 - mysql_async::errors::Error::from_kind::ha3dc499d9ed61c6a
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/<impl_error_chain_processed macros>:53
- <mysql_async::errors::Error as core::convert::From<std::io::error::Error>>::from::hc00dcfd143ecae64
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/src/errors.rs:16
- mysql_async::queryable::stmt::InnerStmt::new::ha7a178ed950633da
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/src/queryable/stmt.rs:50
5: 0x563b61b64a42 - mysql_async::connection_like::ConnectionLike::prepare_stmt::{{closure}}::hb5b23a2d9383914c
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/src/connection_like/mod.rs:280
- <futures::future::and_then::AndThen<A, B, F> as futures::future::Future>::poll::{{closure}}::{{closure}}::hcd418c8f3cc4d1a8
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.19/src/future/and_then.rs:34
- <core::result::Result<T, E>>::map::h73212ff31b4c4af9
at /checkout/src/libcore/result.rs:468
- <futures::future::and_then::AndThen<A, B, F> as futures::future::Future>::poll::{{closure}}::h52ec85bfee8271dc
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.19/src/future/and_then.rs:33
- <futures::future::chain::Chain<A, B, C>>::poll::h189cc1106a92156f
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.19/src/future/chain.rs:39
The test 'conn::test::should_reset_the_connection' fails on mariadb 10.1.19 since it's not implemented yet and I was wondering if I could still use mysql_async with mariadb.
I want to use SSL for my connection but without the PKCS12 Client authentication. What would need to happen in order to make the client certificate optional so I can just use password authentication but over an SSL connection?
Hi, I'm wondering how to rollback when an error happens mid-transaction. I couldn't find a way to rollback already executed statements because the connection handle cannot be accessed when a future errors.
Here's an example:
db
.start_transaction(Default::default())
.map_err(|_| String::new("transaction error")
.and_then(|tx|
// First insertion. This succeeds.
tx.drop_exec("INSERT ...", ())
.map_err(|_| String::new("insert1 error"))
)
.and_then(|tx|
// Second insertion. This fails.
tx.drop_exec("INSERT ...", ())
.map_err(|_| String::new("insert2 error")) // Couldn't rollback prior insertion
)
In this case, the first insertion succeeds and the second one fails. There is now way to rollback the first insertion.
Furthermore, I couldn't find much information about what happens to the connection after such situation happens. When that same connection is used to carry out another operation, will there be a conflict because the connection has an open transaction which has not been finished?
Am I missing something?
Does this crate have support for SSL encryption and X.509 authentication?
There is a comment for OptsBuilder
that implies that there is SSL support, but it doesn't look like this is actually implemented in this crate. (I'm guessing that this example was perhaps accidentally copied over from rust-mysql-simple
?)
If that was in fact an error, are there plans to add SSL support to this crate? I'm working on a project that requires X.509 authentication. I'd be sending SQL queries from within a Future
so I'd prefer to use an async MySQL driver. I suspect that adding support would just involve adapting the appropriate pieces of SSL code from rust-mysql-simple
. If there are no immediate plans, I'm happy to give this a shot. Thanks!
In issue #28 the GetConn
type was supposedly published, but as of version 0.19.0 it is not published.
Besides the GetConn
type, there's also a DisconnectPool
future, which similarly deserves to be published.
Hello. I'm using your crate with futures 0.3 and a custom threadpool. When updating my dependencies, I saw you pushed a commit which uses tokio::spawn
: 5dd7d45.
Am I correct in thinking this will panic since I haven't set up a tokio executor?
Are there any other places that are hardcoded to use tokio's threadpool? (Couldn't find any.)
This would cripple the flexibility of the crate by requiring a fut 0.1 tokio executor to be used.
With the change for OptsBuilder to take PoolConstraints I am unable to set the pool min and max since the struct is not exposed in lib.rs and the opts module is not public.
Not sure if u8
is too conservative for seq_id
or if I really goofed something.
Sorta steps to reproduce:
select field1, field2 from table1;
pub fn fetch_choices(handle: &Handle, pool: &Pool, query: &str)
-> Receiver<Result<Vec<(String, String)>>> {
let (tx, rx) = oneshot::channel::<Result<Vec<_>>>();
let pool = pool.clone();
let query = query.to_owned();
let future = pool.get_conn()
.and_then(|conn| conn.query(query))
.and_then(|result| result.map(|row| mysql_async::from_row::<(_, _)>(row)))
.map(|(rows, _ /* conn */)| rows)
.map_err(Error::from)
.then(|result| {
tx.send(result)
.map_err(|e| {
error!("fetch_choices: tx channel error: {:?}", e);
e
})
});
handle.spawn(future.then(|_| Ok(())));
rx
}
poll
in that backtrace; not sure if that is relevant.If it's not obvious what I screwed up here, I can try to whittle my code down to a simple example you should be able to run to reproduce.
Cheers
MySqlError(Server(ServerError { code: 1064, message: "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near \'SELECT * FROM B\' at line 1", state: "42000" }))
my sql is
SELECT * FROM A;
SELECT * FROM B;
if I understand the API correctly I should be able to call collect
twice on QueryResult
to get rows from A
on first call and rows from B
on the second call.
Did I understand it wrongly?, or is it a bug?
I am using mysql async to insert rows. one of column has auto increment id. How do i obtain it>
The crate should not expect a tokio executor to be present.
Perhaps the Pool can contain an (optional) reference to an Executor, similar to how Hyper does it. This gives users the freedom to provide their own executor, or use the default one.
As far as I could see, this is the only place where a future is spawned. Without an executor present, the cleanup is never run.
It took me more than a few hours track this bug down, and even then I'm not sure I found the cause. This issue is mostly to document/discuss how I troubleshot this panic, and whether we can patch the code to improve the errors, or provide a notice in the docs/comments.
I'm running this bit of code against a query with a large result set (approx 109,000 rows)
pool.get_conn()
.and_then(move |conn| conn.query(&sql))
.and_then(|result| {
result.map(|row|{
let (id, text) : (String, String) = mysql_async::from_row(row); // panic here
Data::new(id, text)
})
})
//...
The table consists of an primary integer(11)
field, id
, and a varchar(255)
field, text
.
The backtrace for this error did little to help me identify the cause, and I wasted time trying to figure out why src/libcore/option.rs
was panicking.
thread '<unnamed>' panicked at 'Could not convert row to (T1,T2)', /checkout/src/libcore/option.rs:823
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
stack backtrace:
0: std::sys::imp::backtrace::tracing::imp::unwind_backtrace
at /checkout/src/libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::_print
at /checkout/src/libstd/sys_common/backtrace.rs:71
2: std::panicking::default_hook::{{closure}}
at /checkout/src/libstd/sys_common/backtrace.rs:60
at /checkout/src/libstd/panicking.rs:355
3: std::panicking::default_hook
at /checkout/src/libstd/panicking.rs:371
4: std::panicking::rust_panic_with_hook
at /checkout/src/libstd/panicking.rs:549
5: std::panicking::begin_panic
at /checkout/src/libstd/panicking.rs:511
6: std::panicking::begin_panic_fmt
at /checkout/src/libstd/panicking.rs:495
7: rust_begin_unwind
at /checkout/src/libstd/panicking.rs:471
8: core::panicking::panic_fmt
at /checkout/src/libcore/panicking.rs:69
9: core::option::expect_failed
at /checkout/src/libcore/option.rs:823
10: <core::option::Option<T>>::expect
at /checkout/src/libcore/option.rs:302
11: <(T1, T2) as mysql_async::value::FromRow>::from_row
at /home/charetjc/files/development/rust/crates/mysql_async/src/value.rs:380
12: mysql_async::value::from_row
at /home/charetjc/files/development/rust/crates/mysql_async/src/value.rs:280
13: fuzzyd::database::fetch_choices::{{closure}}::{{closure}}
at src/bin/fuzzyd/database.rs:101
----snipped----
Eventually I found Option
is panicking because of code in mysql_async/src/value.rs
like line 380
FromRow::from_row_opt(row).ok().expect("Could not convert row to (T1,T2)")
While from_row_opt
returns a Result
, .ok()
converts the Result
into an Option
that discards the Err
, so that .expect()
doesn't print the Err
??
I can understand this somewhat from a security perspective, since the Err
contains potentially sensitive data that shouldn't find its way into logs. Also, the Err
is pretty complex and will have a duplicate stack trace (if RUST_BACKTRACE=1
), which can be confusing due to the poor formatting of Debug
output.
That said, I forked v0.9.3
and dropped the call to .ok()
, found...
thread '<unnamed>' panicked at 'Could not convert row to (T1,T2):
Error
( FromRow
( Row
{ values: [ Some( Bytes( [ 85, 80, 87, 45, 53, 55, 45, 49, 48, 48, 48, 45, 50 ] ) )
, Some( NULL )
]
, columns:
[ Column
{ payload: [REDACTED]
, schema: (5, 14)
, table: (20, 9)
, org_table: (30, 9)
, name: (40, 2)
, org_name: (43, 3)
, default_values: None
, column_length: 96
, character_set: 45
, flags: NOT_NULL_FLAG | MULTIPLE_KEY_FLAG | PART_KEY_FLAG
, column_type: MYSQL_TYPE_VAR_STRING
, decimals: 0
}
, Column
{ payload: [REDACTED]
, schema: (5, 14)
, table: (20, 9)
, org_table: (30, 9)
, name: (40, 8)
, org_name: (49, 17)
, default_values: None
, column_length: 1020
, character_set: 45
, flags:
, column_type: MYSQL_TYPE_VAR_STRING
, decimals: 0
}
]
}
)
, State
{ next_error: None
, backtrace: None
}
)
', /checkout/src/libcore/result.rs:859
...and with this, I can see that one of my two values is NULL
, which is a problem when my code expects a String
.
From here, I'm not certain where to go. I will try using let (id, text) : (String, Option<String>) = from_row()
and see if that works, but I didn't see anything in the docs that indicated this will work or how to handle NULL
values in a SQL field.
So, with that said, what to do? I present options!
.ok()
from src/value.rs
?NULL
values, if so where?I'm happy to take a stab at PR's for these. Also, I know v0.10.0
is out, and I'm fairly certain I would have the same issue there. src/value.rs
didn't change much. I hope to update to v0.10.0
soon...ish, but I lost a bit of time on my project trying to offload the database routines into a separate thread, and racing to catch up.
Cheers!
Hey,
So I've been taking the new pool code from #66 for a spin, and I encountered the following panic.
It's worth noting that the pool is completely idle at this point after an initial startup. I'm to my knowledge not sending any traffic through it.
CC: @jonhoo
thread 'tokio-runtime-worker-4' panicked at 'assertion failed: `(left == right)`
left: `1`,
right: `0`', C:\Users\udoprog\.cargo\git\checkouts\mysql_async-a79c69708a1aa355\e614691\src\conn\pool\mod.rs:161:13
stack backtrace:
0: backtrace::backtrace::trace_unsynchronized
at C:\Program Files\Rust\.cargo\registry\src\github.com-1ecc6299db9ec823\backtrace-0.3.29\src\backtrace\mod.rs:66
1: std::sys_common::backtrace::_print
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\sys_common\backtrace.rs:47
2: std::sys_common::backtrace::print
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\sys_common\backtrace.rs:36
3: std::panicking::default_hook::{{closure}}
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\panicking.rs:200
4: std::panicking::default_hook
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\panicking.rs:214
5: std::panicking::rust_panic_with_hook
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\panicking.rs:477
6: std::panicking::continue_panic_fmt
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\panicking.rs:384
7: std::panicking::begin_panic_fmt
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libstd\panicking.rs:339
8: mysql_async::conn::pool::{{impl}}::poll
at C:\Users\udoprog\.cargo\git\checkouts\mysql_async-a79c69708a1aa355\e614691\src\conn\pool\mod.rs:161
9: futures::future::{{impl}}::poll<Future>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\future\mod.rs:113
10: futures::task_impl::{{impl}}::poll_future_notify::{{closure}}<alloc::boxed::Box<Future>,alloc::sync::Arc<tokio_threadpool::notifier::Notifier>>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\task_impl\mod.rs:329
11: futures::task_impl::{{impl}}::enter::{{closure}}<alloc::boxed::Box<Future>,closure,core::result::Result<futures::poll::Async<()>, ()>>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\task_impl\mod.rs:399
12: futures::task_impl::std::set<closure,core::result::Result<futures::poll::Async<()>, ()>>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\task_impl\std\mod.rs:83
13: futures::task_impl::Spawn<alloc::boxed::Box<Future>>::enter<alloc::boxed::Box<Future>,closure,core::result::Result<futures::poll::Async<()>, ()>>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\task_impl\mod.rs:399
14: futures::task_impl::Spawn<alloc::boxed::Box<Future>>::poll_fn_notify<alloc::boxed::Box<Future>,alloc::sync::Arc<tokio_threadpool::notifier::Notifier>,closure,core::result::Result<futures::poll::Async<()>, ()>>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\task_impl\mod.rs:291
15: futures::task_impl::Spawn<alloc::boxed::Box<Future>>::poll_future_notify<alloc::boxed::Box<Future>,alloc::sync::Arc<tokio_threadpool::notifier::Notifier>>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-0.1.28\src\task_impl\mod.rs:329
16: tokio_threadpool::task::{{impl}}::run::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\task\mod.rs:145
17: core::ops::function::FnOnce::call_once<closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libcore\ops\function.rs:231
18: std::panic::{{impl}}::call_once<core::result::Result<futures::poll::Async<()>, ()>,closure>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\panic.rs:315
19: std::panicking::try::do_call<std::panic::AssertUnwindSafe<closure>,core::result::Result<futures::poll::Async<()>, ()>>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\panicking.rs:296
20: panic_unwind::__rust_maybe_catch_panic
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\/src\libpanic_unwind\lib.rs:82
21: std::panicking::try<core::result::Result<futures::poll::Async<()>, ()>,std::panic::AssertUnwindSafe<closure>>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\panicking.rs:275
22: std::panic::catch_unwind<std::panic::AssertUnwindSafe<closure>,core::result::Result<futures::poll::Async<()>, ()>>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\panic.rs:394
23: tokio_threadpool::task::Task::run
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\task\mod.rs:130
24: tokio_threadpool::worker::Worker::run_task2
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:567
25: tokio_threadpool::worker::Worker::run_task
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:459
26: tokio_threadpool::worker::Worker::try_steal_task
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:416
27: tokio_threadpool::worker::Worker::try_run_task
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:301
28: tokio_threadpool::worker::Worker::run
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:241
29: tokio::runtime::threadpool::builder::{{impl}}::build::{{closure}}::{{closure}}::{{closure}}::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-0.1.22\src\runtime\threadpool\builder.rs:390
30: tokio_timer::timer::handle::with_default::{{closure}}<closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-timer-0.2.11\src\timer\handle.rs:101
31: std::thread::local::LocalKey<core::cell::RefCell<core::option::Option<tokio_timer::timer::handle::HandlePriv>>>::try_with<core::cell::RefCell<core::option::Option<tokio_timer::timer::handle::HandlePriv>>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:257
32: std::thread::local::LocalKey<core::cell::RefCell<core::option::Option<tokio_timer::timer::handle::HandlePriv>>>::with<core::cell::RefCell<core::option::Option<tokio_timer::timer::handle::HandlePriv>>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:234
33: tokio_timer::timer::handle::with_default<closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-timer-0.2.11\src\timer\handle.rs:84
34: tokio::runtime::threadpool::builder::{{impl}}::build::{{closure}}::{{closure}}::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-0.1.22\src\runtime\threadpool\builder.rs:382
35: tokio_timer::clock::clock::with_default::{{closure}}<closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-timer-0.2.11\src\clock\clock.rs:137
36: std::thread::local::LocalKey<core::cell::Cell<core::option::Option<const tokio_timer::clock::clock::Clock*>>>::try_with<core::cell::Cell<core::option::Option<const tokio_timer::clock::clock::Clock*>>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:257
37: std::thread::local::LocalKey<core::cell::Cell<core::option::Option<const tokio_timer::clock::clock::Clock*>>>::with<core::cell::Cell<core::option::Option<const tokio_timer::clock::clock::Clock*>>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:234
38: tokio_timer::clock::clock::with_default<closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-timer-0.2.11\src\clock\clock.rs:117
39: tokio::runtime::threadpool::builder::{{impl}}::build::{{closure}}::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-0.1.22\src\runtime\threadpool\builder.rs:381
40: tokio_reactor::with_default::{{closure}}<closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-reactor-0.1.9\src\lib.rs:237
41: std::thread::local::LocalKey<core::cell::RefCell<core::option::Option<tokio_reactor::HandlePriv>>>::try_with<core::cell::RefCell<core::option::Option<tokio_reactor::HandlePriv>>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:257
42: std::thread::local::LocalKey<core::cell::RefCell<core::option::Option<tokio_reactor::HandlePriv>>>::with<core::cell::RefCell<core::option::Option<tokio_reactor::HandlePriv>>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:234
43: tokio_reactor::with_default<closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-reactor-0.1.9\src\lib.rs:217
44: tokio::runtime::threadpool::builder::{{impl}}::build::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-0.1.22\src\runtime\threadpool\builder.rs:380
45: tokio_threadpool::callback::Callback::call
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\callback.rs:22
46: tokio_threadpool::worker::{{impl}}::do_run::{{closure}}::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:127
47: tokio_executor::global::with_default::{{closure}}<tokio_threadpool::sender::Sender,closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-executor-0.1.8\src\global.rs:209
48: std::thread::local::LocalKey<core::cell::Cell<tokio_executor::global::State>>::try_with<core::cell::Cell<tokio_executor::global::State>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:257
49: std::thread::local::LocalKey<core::cell::Cell<tokio_executor::global::State>>::with<core::cell::Cell<tokio_executor::global::State>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:234
50: tokio_executor::global::with_default<tokio_threadpool::sender::Sender,closure,()>
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-executor-0.1.8\src\global.rs:178
51: tokio_threadpool::worker::{{impl}}::do_run::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:125
52: std::thread::local::LocalKey<core::cell::Cell<const tokio_threadpool::worker::Worker*>>::try_with<core::cell::Cell<const tokio_threadpool::worker::Worker*>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:257
53: std::thread::local::LocalKey<core::cell::Cell<const tokio_threadpool::worker::Worker*>>::with<core::cell::Cell<const tokio_threadpool::worker::Worker*>,closure,()>
at /rustc/78ca1bda3522b14bc0336bc01dd1d49fdba2cda7\src\libstd\thread\local.rs:234
54: tokio_threadpool::worker::Worker::do_run
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\worker\mod.rs:116
55: tokio_threadpool::pool::{{impl}}::spawn_thread::{{closure}}
at C:\Users\udoprog\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-threadpool-0.1.15\src\pool\mod.rs:344
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
I am working with version 14.1 and gave a incorrect database url. return type should have been Result instead of Pool.
Looking here:
mysql_async/src/queryable/query_result/mod.rs
Line 232 in 2c261cf
Compare to the standard Iterator::collect method: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.collect
a collection of Result<T, E> can be thought of as single Result<Collection, E>. See the examples below for more.
The collect
method will, however, always panic if one of the rows fails to convert. This makes the trait unusable in any application where you're listening for incoming connections.
Pool::get_conn
returns a GetConn
, but that type is not public, and so it's impossible to write a function that takes whatever Pool::get_conn
returns.
hello, I came across a bug in the pool implementation in clickhouse-rs, which after patching, the author mentioned that he drew from mysql_async
pool implementation and you might have the same issue.
So, I'm linking the issue (which also has a link to the patch). I'm pretty sure I diagnosed the problem correctly and patched correctly, since the errors disappeared. But would be helpful if you could look over it also.
As a very general description for here, it looks like checking the new connections for readiness and then moving them to the ready/idle queue needs to hold the lock longer to avoid a race condition.
I can see that when i print values read from mysql in rust, string values are enclosed within single quotes. Is it due to some mysql configuration or something which this library is introducing?
For e.g. name column has value suraj. But in rust it is printing it as 'suraj'
Am i missing something here?
If a connection is put in the connection pool cleanup queue, and that connection errors, the error is thrown away and the handle_futures method just returns Ok(()).
src/conn/pool/mod.rs:254-266
// Handle dirty connections.
handle!(queue {
Ok(Ready(conn)) => {
if inner.closed {
crate::conn::disconnect(conn);
} else {
returned_conns.push(conn);
}
handled = true;
Ok(())
},
Err(_) => { Ok(()) },
});
Since the connection is not passed through return_conn, while the connection is removed from the queue, the ongoing count is not decremented.
The result is that:
Once I corrected the add overflow in #6, I subsequently ran into a stack overflow with the same code unless I limited my query to 341 rows.
select field1, field2 from table1 limit 342; # doesn't work
select field1, field2 from table1 limit 341; # works
The backtrace from #6 indicates the problem lies with src/conn/futures/query_result/futures/map.rs
where it calls self.poll()
. I've patched this code in my fork to use loop {}
instead of recursion, and this has resolved the stack overflow.
Perhaps I made a mistake in my code, but it's derived from the example in the documentation, if so I can submit a PR to improve the docs instead.
Othewise, before I submit the patch I have, shall I convert all the self.poll()
calls to use loop {}
?
The existing example of using LocalInfileHandler
doesn't actually read a file. Below, I've included an example that does.
mysql_async/src/local_infile_handler/mod.rs
Lines 45 to 51 in 54dbffa
struct InfileHandler;
impl LocalInfileHandler for InfileHandler {
fn handle(
&self,
file_name: &[u8],
) -> Box<Future<Item = Box<AsyncRead + Send>, Error = my::error::Error> + Send> {
let path = std::str::from_utf8(file_name)
.unwrap() // TODO: not panic
.to_owned();
let future = tokio::fs::File::open(path)
.map(|file| Box::new(file) as Box<_>)
.map_err(|e| e.into());
Box::new(future)
}
}
// let mut opts = my::OptsBuilder::from_opts(&*database_url);
// opts.local_infile_handler(Some(InfileHandler {}));
I'm running some queries over the database created by the lobste.rs Rails app.
In particular, I'm trying to run the following:
pool.get_conn()
.and_then(|c| {
c.query("SELECT * FROM stories LIMIT 26")
})
.and_then(|stories| {
assert!(!stories.is_empty());
stories.reduce(
HashSet::new(),
|mut stories, story| {
println!("found story {:?}", story);
stories.insert(story.get::<u32, _>("id").unwrap());
stories
},
).and_then(|(q, s)| {
assert!(q.is_empty());
q.drop_result().map(move |c| (c, s))
}).map(|(c, s)| {
unimplemented!();
})
})
Running the query directly against my database, I get 26 rows (as expected). However, when I run it with mysql_async
as above, "found story" is never printed, and the HashSet
is empty. I know the future is driven correctly as it does crash on the unimplemented!()
.
If I replace c.query(..)
with c.prep_exec(.., ())
, mysql_async
crashes with:
Error(Io(Custom { kind: UnexpectedEof, error: StringError("failed to fill whole buffer") }), State { next_error: None, backtrace: Some(stack backtrace:
0: 0x55e1b7fabdee - backtrace::backtrace::libunwind::trace::h8654a9d0555d5a01
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/backtrace/libunwind.rs:53
- backtrace::backtrace::trace::h97b4908a240a2f61
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/backtrace/mod.rs:42
1: 0x55e1b7faa755 - backtrace::capture::Backtrace::new_unresolved::hec1ca9ceaee1db45
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/capture.rs:88
- backtrace::capture::Backtrace::new::h7fa080eeed827e9d
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.5/src/capture.rs:63
2: 0x55e1b7faa19a - error_chain::make_backtrace::hffb33833ee042156
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/error-chain-0.11.0/src/lib.rs:616
3: 0x55e1b7faa238 - <error_chain::State as core::default::Default>::default::h1a16302fda4394fd
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/error-chain-0.11.0/src/lib.rs:710
4: 0x55e1b7e978cf - mysql_async::errors::Error::from_kind::ha3dc499d9ed61c6a
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/<impl_error_chain_processed macros>:53
- <mysql_async::errors::Error as core::convert::From<std::io::error::Error>>::from::hc00dcfd143ecae64
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/src/errors.rs:16
- mysql_async::queryable::stmt::InnerStmt::new::ha7a178ed950633da
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/src/queryable/stmt.rs:48
5: 0x55e1b7d35ad2 - mysql_async::connection_like::ConnectionLike::prepare_stmt::{{closure}}::hef881f6c93fc1fee
at /home/jon/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.14.0/src/connection_like/mod.rs:280
...
The MySQL (well, MariaDB) version is:
mysql Ver 15.1 Distrib 10.1.31-MariaDB, for Linux (x86_64) using readline 5.1
If it's useful, the layout of stories
is:
+------------------------+------------------+------+-----+--------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------------------+------------------+------+-----+--------------+----------------+
| id | int(10) unsigned | NO | PRI | NULL | auto_increment |
| created_at | datetime | YES | MUL | NULL | |
| user_id | int(10) unsigned | YES | MUL | NULL | |
| url | varchar(250) | YES | MUL | | |
| title | varchar(150) | NO | MUL | | |
| description | mediumtext | YES | MUL | NULL | |
| short_id | varchar(6) | NO | UNI | | |
| is_expired | tinyint(1) | NO | MUL | 0 | |
| upvotes | int(10) unsigned | NO | | 0 | |
| downvotes | int(10) unsigned | NO | | 0 | |
| is_moderated | tinyint(1) | NO | MUL | 0 | |
| hotness | decimal(20,10) | NO | MUL | 0.0000000000 | |
| markeddown_description | mediumtext | YES | | NULL | |
| story_cache | mediumtext | YES | MUL | NULL | |
| comments_count | int(11) | NO | | 0 | |
| merged_story_id | int(11) | YES | MUL | NULL | |
| unavailable_at | datetime | YES | | NULL | |
| twitter_id | varchar(20) | YES | MUL | NULL | |
| user_is_author | tinyint(1) | YES | | 0 | |
+------------------------+------------------+------+-----+--------------+----------------+
As demonstrated in blackbeam/rust-mysql-simple#134, it's pretty important to be able to set TCP_NODELAY
for the TCP sockets used for MySQL connections when queries and responses are small. It'd be great if an OptsBuilder::tcp_nodelay
was available like in the synchronous mysql
crate.
The conversion from AsRef<str> + Sized
to Opts
panics if from_url
returns None
. But, according to the documentation of the From
trait, From
conversions should not fail.
Since this conversion can fail, when the TryFrom
trait becomes stable (rust-lang/rust#33417), it should be used instead. That way, the caller can handle the error instead of experiencing a panic.
std::future
is now in 1.36.0 and async/await is going to (hopefully) be stabilized in 1.38.0. Opening up this issue to track progress on upgrading mysql_async to the new syntax.
Is anybody working on this already? We might be able to put some hours to the refactoring in Prisma later on.
I accidentally set PoolConstraints::min
to 0
(and max to 1), and this causes the library to continuously drop and start connections as new queries are issued. That seems counterintuitive. Should this be asserted against in PoolConstraints::new
?
Looking at commit that removes it this doesn't look like intentional breaking change to the API. I believe this should warrant the major version bump.
I have a query that needs to pass a Vec to the query as a parameter in order to use IN. I haven't found a way to express this. How can this be done or does there need to be an enhancement made?
Currently, it is impossible to establish a Conn
, or even just construct an Opts
, in one thread, and then pass that to another, because Opts
is not Send
. This kind of setup is fairly common in applications using worker-pools, but is impossible with mysql_async
today. In particular, it looks like Opts
is not Send
due to its Option<LocalInfileHandlerObject>
. LocalInfileHandlerObject
is not Send
because the wrapped LocalInfileHandler
is not Sync
(Arc<T>
is only Send
if T: Sync + Send
). I think it should be sufficient to have LocalInfileHandler
require Self: Sync + Send
, though I don't know if that restriction is okay to require?
Pool::get_conn
(or, more precisely, the GetConn
future) forwards onto Pool::poll
. Pool::poll
in turn calls Pool::with_inner
, which takes this std::sync::Mutex
. This is hugely problematic in asynchronous code, as blocking on taking that lock will hold up the I/O reactor and the processing of any other futures scheduled on the same worker thread. This manifests especially at very high load, as this flamegraph shows.
There are many possible ways to fix this, but the best one would probably be to find a way to avoid the lock altogether and switch to something like a crossbeam
queue of "available" connections and an AtomicBool
for the closed
flag. An alternative would be to use a futures-aware Mutex
like tokio_sync::Lock
, though we'd then have to figure out how to make that work nicely with the synchronous interface on Pool
(such as take_conn
). Speaking of -- why do we even have those methods? In an asynchronous context, synchronous methods should probably not exist.
http://blackbeam.org/mysql_async/mysql_async/ currently isn't working.
Sorry if this is not the correct channel to use for something like this. If it is not, what is a better way to get answers?
For your crates.io page you say that I should put mysql_async in the dependencies, but in your documentation page on https://docs.rs/mysql_async/0.12.2/mysql_async/ you say I should but mysql in the dependencies. I don't really understand which one I should use. Also you ask for extern crate futures and tokio_core. Do I add those in my dependencies as well?
I'm using this library with async-await, and I found that the futures created by my database implementation are gigantic. Is it normal that .drop_exec
creates a 1kB future? The entirety of my manual tcp communication is less than half of that.
Hello,
Would it be ok to get a reference to columns in QueryResult
, like https://docs.rs/mysql/14.2.0/mysql/struct.QueryResult.html#method.columns_ref ? This would be very useful to me, to be able to inspect the column types.
Disclaimer: This issue is supposed to be a discussion.
I think, with the upcoming async/await, other runtimes will appear. There are already 3 "large" ones: tokio, futures and async-std. Therefore in my opinion it's a bad thing to have a hard dependency on tokio for task spawning. I believe it's runtime is used for some background cleanup.
What do you think about this?
mysql_pool : &Pool;
const SQL : &str = "INSERT INTO cards (owner, ismastercard, cardtemplate, upgradecount, \
upgradelevel, istradeable, isstarter, idcardpool, idlimitedpool, \
chargeCard1, chargeCard2, chargeCard3) VALUES \
(:owner, :ismastercard, :cardtemplate, 0, 0, 0, :isstarter, \
:idcardpool, :idlimitedpool, 0, 0, 0)";
for d in stater_decks {
let len = d.cards.len();
let is_tutorial= (d.card_pool == CardPool::Tutorial as u8) as u8;
let card_pool= d.card_pool;
let limited_pool= d.limited_pool;
let c : Vec<(u32, u8, u32, u8, u8, u8)> = d.cards.iter().map(|c| (guid, 0, c.card_id, is_tutorial, card_pool, limited_pool)).collect();
let con = await!(mysql_pool.get_conn().into_awaitable())?;
trace!("{:?}", c);
let result = await!(con.batch_exec(SQL, c).into_awaitable())?;
if result.affected_rows() != len as u64 {
error!("can not create starter deck cards, affected rows:{:?} expected:{}", result.affected_rows(), len);
logs:
TRACE [(55, 0, 253, 1, 2, 2), (55, 0, 287, 1, 2, 2), (55, 0, 288, 1, 2, 2), (55, 0, 354, 1, 2, 2), (55, 0, 379, 1, 2, 2), (55, 0, 673, 1, 2, 2), (55, 0, 700, 1, 2, 2)]
ERROR can not create starter deck cards, affected rows:1 expected:7
Can you please point me out to what I am doing wrong? Or is this a bug?
Is there any better way to ask questions specific to this crate, to not create issues here if the problem can be on my side?
If any connection is in the queue (dropping
, rollback
, new
, or disconnecting
) the pool will not return any connections when polled, even if there are idle connections available. If any of the connections in the queue take a long time to resolve the pool will stop serving new connections for that entire duration.
My server seems to occasionally get stuck on dropping the results of a connection, leaving a single connection stuck in the dropping
queue. I haven't found the cause of that and it might be an issue on my end. But because of this issue the entire server locks up until that one stuck connection gets resolved.
Here's the server not serving any new connections for 5 minutes while 1 connection is stuck in dropping
, with plenty of idle connections available:
My suggested change would be to allow Pool::take_conn
to return a connection from the idle pool even if there are connections in the queue, but to restrict Pool::poll
from creating a new connection if there are connections in the queue.
thread 'tokio-runtime-worker-1' panicked at 'Couldn't convert Row { member_id: Int(24478), member_group_id: Int(2), mgroup_others: Bytes(""), members_pass_hash: Null, temp_ban: Int(0), warn_count: Int(0), validating: Int(0) } to type (T1, .., T7). (see FromRow documentation)', /home/kubik/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_common-0.17.0/src/row/convert.rs:368:39
note: Run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
thread 'tokio-runtime-worker-2' panicked at 'internal error: entered unreachable code', /home/kubik/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.19.0/src/queryable/query_result/mod.rs:167:26
I think it is because I expected members_pass_hash
to be String
, not Option<String>
, but I would expect returning error instead of panic, that cause all that it was no longer possible to do any select probably it was not even able to get connection.
While connecting to the same server referenced for this mysql simple issue blackbeam/rust-mysql-simple#160 I get an error about unexpected packet order using mysql_async. My first thought is perhaps the feature that was added in mysql simple just hasn't been implement in mysql_async. I can get any extra details you need. This is using SSL without a server public certificate and with domain verification off.
The following (new) test case occasionally hangs for me:
#[test]
fn can_handle_the_pressure() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let pool = Pool::new(&**DATABASE_URL);
for _ in 0..100 {
use futures::{Sink, Stream};
let (tx, rx) = futures::sync::mpsc::unbounded();
for i in 0..10_000 {
let pool = pool.clone();
let tx = tx.clone();
runtime.spawn(futures::future::lazy(move || {
pool.get_conn()
.map_err(|e| unreachable!("{:?}", e))
.and_then(move |_| {
tx.send(i).map_err(|e| unreachable!("{:?}", e))
}).map(|_| ())
}));
}
drop(tx);
runtime.block_on(rx.fold(0, |_, _i| {
Ok(0)
})).unwrap();
}
drop(pool);
runtime.shutdown_on_idle().wait().unwrap();
}
It specifically tries to set up contention over the Pool
(notice that it doesn't actually issue any queries, it just repeatedly "gets" connections and then return them immediately), and it seems like occasionally it hits a deadlock. This may be related to #64, but I'm not sure.
error[E0432]: unresolved import tokio::net::tcp
--> /home/suraj.prakash/.cargo/registry/src/github.com-1ecc6299db9ec823/mysql_async-0.17.1/src/io/futures/connecting_stream.rs:16:18
|
16 | use tokio::net::{tcp::ConnectFuture, TcpStream};
| ^^^ could not find tcp
in net
error: aborting due to previous error
It has been deprecated: announcement.
Is there any functionality you would require from Serde or another library before this would be possible?
QueryResult::affected_rows()
always returns 1
only.
I explored some codes, but I couldn't find out what's wrong. It seems ok ๐ค :
Lines 589 to 590 in 410653d
Any idea?
I want to listen on a receiver of a channel and for each element, i want to fire my query to database. Here is my code
fn sql_conn(handle : &tokio_core::reactor::Handle, rx:futures::sync::mpsc::Receiver<ChannelHolder>){
let database_url = "mysql://suraj:[email protected]:9106/yuka";
let pool = my::Pool::new(database_url, handle);
println!("Amount is 123");
let f2 = rx.for_each(move|res| {
let future = pool.clone().get_conn().and_then(|conn| {
// Load payments from database.
// conn.prep_exec("SELECT txnid, amount FROM txn_update where txnid=6959177461", ())
conn.prep_exec("SELECT `key`, `value` FROM `config` where isactive=1", ())
}).and_then(|result| {
// Collect payments
result.map_and_drop(|row| {
let (txnid, value):(String, Option<String>) = my::from_row(row);
match value.clone(){
Some(x) => {
println!("key:value is {}: {}", txnid, x);
GLOBAL_CONFIG_CACHE.write().unwrap().insert(txnid.clone(), value.clone());
},
None => println!("null value"),
}
})
}).and_then(|(_ /* conn */, value1)| {
// The destructor of a connection will return it to the pool,
// but pool should be disconnected explicitly because it's
// an asynchronous procedure.
pool.disconnect().map(|_| value1)
});
handle.spawn(future.then(|_| Ok(())));
Ok(())
});
}
I am getting below error
error[E0507]: cannot move out of captured outer variable in an FnMut
closure
--> src/main.rs:559:17
|
540 | let pool = my::Pool::new(database_url, handle);
| ---- captured outer variable
...
559 | }).and_then(|(_ /* conn */, value1)| {
| ^^^^^^^^^^^^^^^^^^^^^^^^ cannot move out of captured outer variable in an FnMut
closure
I've got a bit of code that uses a mysql_async::Pool
in its own thread. A futures::sync::mscp
is used to transfer queries in, and data extracted from QueryResult
s out.
After updating mysql_async from 0.11.1 to 0.12.2, I noticed random parsing errors in my logs:
EOF while reading length-encoded string
Invalid length-encoded integer value
After either of these errors occurs, my "query queue" freezes on the next query while waiting for a new connection from the Pool
. I'm not entirely clear on what's going on here, because I'm finding high CPU usage and a backtrace on my threads shows one thread is polling mysql_async for data on a TcpStream
. I also seem to be stuck at one connection despite having an Opts::pool_max
set to 8.
(gdb) bt
#0 0x00007f85c9509dda in recv () from /usr/lib/libpthread.so.0
#1 0x000055b42e09b2b4 in std::sys::imp::net::Socket::recv_with_flags () at /checkout/src/libstd/sys/unix/net.rs:231
#2 std::sys::imp::net::Socket::read () at /checkout/src/libstd/sys/unix/net.rs:240
#3 std::sys_common::net::TcpStream::read () at /checkout/src/libstd/sys_common/net.rs:248
#4 std::net::tcp::{{impl}}::read () at /checkout/src/libstd/net/tcp.rs:563
#5 0x000055b42e07f187 in <mio::net::tcp::TcpStream as std::io::Read>::read ()
#6 0x000055b42e014022 in <tokio_core::net::tcp::TcpStream as std::io::Read>::read ()
#7 0x000055b42dff76cd in <mysql_async::io::Stream as futures::stream::Stream>::poll ()
#8 0x000055b42dfe154c in <futures::stream::future::StreamFuture<S> as futures::future::Future>::poll ()
#9 0x000055b42dff33af in <mysql_async::connection_like::read_packet::ReadPacket<T> as futures::future::Future>::poll ()
#10 0x000055b42dfdafc2 in <futures::future::map::Map<A, F> as futures::future::Future>::poll ()
#11 0x000055b42dfc2c66 in <futures::future::and_then::AndThen<A, B, F> as futures::future::Future>::poll ()
#12 0x000055b42dfef203 in mysql_async::conn::pool::Pool::handle_futures ()
#13 0x000055b42dfefd77 in mysql_async::conn::pool::Pool::poll ()
#14 0x000055b42dfed459 in <mysql_async::conn::pool::futures::get_conn::GetConn as futures::future::Future>::poll ()
#15 0x000055b42df08623 in <futures::future::then::Then<A, B, F> as futures::future::Future>::poll ()
#16 0x000055b42debd595 in <futures::task_impl::Spawn<T>>::enter::{{closure}} ()
#17 0x000055b42deb9203 in std::sys_common::backtrace::__rust_begin_short_backtrace ()
#18 0x000055b42debca1c in std::panicking::try::do_call ()
#19 0x000055b42e0af85f in panic_unwind::__rust_maybe_catch_panic () at /checkout/src/libpanic_unwind/lib.rs:101
#20 0x000055b42dee89d7 in <F as alloc::boxed::FnBox<A>>::call_box ()
#21 0x000055b42e0a5c6c in alloc::boxed::{{impl}}::call_once<(),()> () at /checkout/src/liballoc/boxed.rs:772
#22 std::sys_common::thread::start_thread () at /checkout/src/libstd/sys_common/thread.rs:24
#23 std::sys::imp::thread::{{impl}}::new::thread_start () at /checkout/src/libstd/sys/unix/thread.rs:90
#24 0x00007f85c950008a in start_thread () from /usr/lib/libpthread.so.0
#25 0x00007f85c902124f in clone () from /usr/lib/libc.so.6
If the future error'ed, I expect the future's state machine to move on and stop polling the connection. I've got a bit more work ahead to sort this out, and it's a bit over my head.
That work aside, I'm trying to determine what I'm expected to do when an error occurs, because my Pool
is losing Connection
s and the mysql_async::errors
I get don't deliver ownership of the connection held in the [somewhat?] active query, so I can disconnect or reset it, or even establish a new connection to replace it in the pool.
enum Msg {
SelectCategories(query : String, tx_chan : futures::sync::mpsc::Sender<Result<Vec<Category>>>)
}
fn runner(db_url : String, rx : futures::sync::mpsc::Receiver<Msg>) {
let mut core = Core::new()::expect("Unable to create event loop.");
let handle = core.handle();
let mut opts = OptsBuilder::from_opts(&db_url);
opts.pool_min(Some(1_usize));
opts.pool_max(Some(8_usize));
let pool = Pool.new(opts, &handle);
let msg_loop = rx.and_then(|msg| match msg {
Msg::SelectCategories(query, tx_chan) => {
let future = pool.get_conn()
.and_then(move |conn| conn.query(query.as_ref()))
.and_then(|qres : QueryResult<_, _>| {
qres.map_and_drop(|row| {
let (id, name) : (String, Option<String>) = mysql::from_row(row);
// errors appear to trigger in here.
Category::new(id, name)
})
.map(|(conn, rows)| rows)
})
.then(|result|
tx_chan.send(result);
// Debug statements here are reached, suggesting the current task has abandoned the qres.map_and_drop above.
// What happens to the connection that triggered the error now?
Ok(())
});
Box::new(future) as Box<Future<Item = (), Error = _>>
}
});
core.run(msg_loop.for_each(|_| OK(()))).unwrap();
}
Additional notes:
(I'm using Rust for the first time, so apologies if this is known or intentional or a useless bug. Don't be afraid to tell me if I'm just doing it wrong.)
I'm implementing a web app with hyper and my returns are a Stream. i.e. Body::wrap_stream(GenerateStreamForResponse::new());
One thing my GenerateStreamForResponse
does is use mysql_async and along the way that can generate an error. Body::wrap_stream
however requires errors generated by the stream passed to it implement the Sync
trait. I see from the docs, that mysql_async errors do not impl Sync (https://docs.rs/mysql_async/0.15.1/mysql_async/errors/struct.Error.html). Looking at the impl it looks like error_chain!
is used, and looking at error_chain's issue tracker, it looks like this is known, and maybe intentional, but causing problems for users (rust-lang-deprecated/error-chain#240).
Perhaps I should just be translating these errors anyway, but it also looks like some other people are moving away from error_chain. Should mysql_async move away from error_chain, should mysql_async add it's voice to request error_chain support Sync, or should I just translate my error?
One thing I can say for certain, is this is one of a million paper cuts I've hit as a new rust user.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.