actix / actix Goto Github PK
View Code? Open in Web Editor NEWActor framework for Rust.
License: Apache License 2.0
Actor framework for Rust.
License: Apache License 2.0
Address::call
is not usable in sync actor, because it returns futures
SyncContext
should provide wait
method
wrap actor.start() in thread? is there same config about that?
extern crate actix;
use actix::prelude::*;
use std::time::Duration;
use std::thread::sleep;
use std::thread;
#[derive(Debug)]
struct Event(i32);
impl Message for Event {
type Result = ();
}
struct ActorDemo {}
impl Actor for ActorDemo {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("started");
let address: Addr<Syn, _> = ctx.address();
thread::spawn(move || loop {
println!("loop");
address.send(Event(10));
thread::sleep(Duration::from_secs(3));
});
()
}
}
impl Handler<Event> for ActorDemo {
type Result = ();
fn handle(&mut self, event: Event, ctx: &mut Context<Self>) {
println!("recv event {:?}", event);
()
}
}
fn main() {
let system = System::new("test");
let actor = ActorDemo {};
let address: SyncAddress<_> = actor.start();
println!("start over");
system.run();
}
I'm pretty sure this is a result of budziq/rust-skeptic#25. It may be fixable by kvark/froggy#26 (comment), though that seems like a bit of a hack.
Hi,
I'm wondering if there is any way that I have missed for accessing the actor's own address, as it would prevent ugly hacks like https://github.com/kingoflolz/routing-actor/blob/66890955029b4e16e8b7aedaa6eedb001f019321/src/node.rs#L39-L53
Thanks in advance
Sync actors should be able to restart itself
If you upgrade an Address
to a SyncAddress
, then drop the SyncAddress
, then upgrade the address again, the SyncAddress
returned during the 2nd upgrade does not work.
I was able to create a minimal example showing this bug
https://github.com/fuchsnj/actix_bug/blob/master/src/main.rs
The example program upgrades an address twice, each time sending a message that should be printed. It should print "Handling message!" twice, but it's only printing once.
I believe this is because when the last reference to SyncAddress
is dropped, the underlying sender is closed. If you upgrade an address after this, it re-uses the tx/rx from the first one that was already closed.
In my example code, you can uncomment one of the lines which prevents the SyncAddress
from being dropped, and the bug goes away.
Also, if you comment out the line in actix that closes this sender, the bug also goes away.
https://github.com/actix/actix/blob/master/src/queue/sync.rs#L763
HI.
I encountered a actix
internal exception which shouldn't happen (I think). Basically I store SpawnHandles
that represent (kind of) timeouts in a map and try to cancel them upon certain events. The actix
version used is 0.5.5
along with rustc 1.25.0 (84203cac6 2018-03-25)
on HighSierra. The crash happens on every run.
I stripped down my case to:
extern crate actix;
use actix::prelude::*;
use std::time::Duration;
#[derive(Default)]
pub struct A;
struct M;
impl Message for M {
type Result = ();
}
impl Actor for A {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let t1 = ctx.run_later(Duration::new(2, 0), |_, _| { println!("t1") });
ctx.run_later(Duration::new(1, 0), move |_, ctx| {
println!("cancelling t1: {}", ctx.cancel_future(t1));
});
ctx.notify_later(M {}, Duration::new(5, 0));
}
}
impl Handler<M> for A {
type Result = ();
fn handle(&mut self, _: M, ctx: &mut Self::Context) -> Self::Result {
let t2 = ctx.run_later(Duration::new(2, 0), |_, _| { println!("t2") });
ctx.run_later(Duration::new(1, 0), move |_, ctx| {
println!("cancelling t2 in handler: {}", ctx.cancel_future(t2));
});
}
}
fn main() {
let sys = actix::System::new("");
let _ : Addr<Syn, _> = A::default().start();
let _ = sys.run();
}
This code results in
# RUST_BACKTRACE=1 cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/actix-cancel`
canceling t1: true
thread 'main' panicked at 'index out of bounds: the len is 1 but the index is 1', /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/smallvec-0.6.0/lib.rs:881:18
stack backtrace:
0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: std::sys_common::backtrace::print
at libstd/sys_common/backtrace.rs:71
at libstd/sys_common/backtrace.rs:59
2: std::panicking::default_hook::{{closure}}
at libstd/panicking.rs:207
3: std::panicking::default_hook
at libstd/panicking.rs:223
4: std::panicking::begin_panic
at libstd/panicking.rs:402
5: std::panicking::try::do_call
at libstd/panicking.rs:349
6: std::panicking::try::do_call
at libstd/panicking.rs:325
7: core::ptr::drop_in_place
at libcore/panicking.rs:72
8: core::ptr::drop_in_place
at libcore/panicking.rs:58
9: <smallvec::SmallVec<A> as core::ops::index::Index<usize>>::index
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/smallvec-0.6.0/lib.rs:881
10: <actix::contextimpl::ContextImpl<A>>::poll
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.5/src/contextimpl.rs:284
11: <actix::context::Context<A> as futures::future::Future>::poll
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.5/src/context.rs:125
12: <futures::future::map::Map<A, F> as futures::future::Future>::poll
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/future/map.rs:30
13: <futures::future::map_err::MapErr<A, F> as futures::future::Future>::poll
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/future/map_err.rs:30
14: <alloc::boxed::Box<F> as futures::future::Future>::poll
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/future/mod.rs:113
15: <futures::task_impl::Spawn<T>>::poll_future_notify::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:289
16: <futures::task_impl::Spawn<T>>::enter::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:363
17: futures::task_impl::std::set
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/std/mod.rs:78
18: <futures::task_impl::Spawn<T>>::enter
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:363
19: <futures::task_impl::Spawn<T>>::poll_future_notify
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.21/src/task_impl/mod.rs:289
20: <tokio::executor::current_thread::scheduler::Scheduled<'a, U>>::tick
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/scheduler.rs:343
21: <tokio::executor::current_thread::scheduler::Scheduler<U>>::tick::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/scheduler.rs:323
22: <tokio::executor::current_thread::Borrow<'a, U>>::enter::{{closure}}::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/mod.rs:657
23: tokio::executor::current_thread::CurrentRunner::set_spawn
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/mod.rs:689
24: <tokio::executor::current_thread::Borrow<'a, U>>::enter::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/mod.rs:656
25: <std::thread::local::LocalKey<T>>::try_with
at /Users/travis/build/rust-lang/rust/src/libstd/thread/local.rs:290
26: <std::thread::local::LocalKey<T>>::with
at /Users/travis/build/rust-lang/rust/src/libstd/thread/local.rs:244
27: <tokio::executor::current_thread::Borrow<'a, U>>::enter
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/mod.rs:655
28: <tokio::executor::current_thread::scheduler::Scheduler<U>>::tick
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/scheduler.rs:323
29: <tokio::executor::current_thread::Entered<'a, P>>::tick
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/mod.rs:541
30: <tokio::executor::current_thread::Entered<'a, P>>::turn
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.5/src/executor/current_thread/mod.rs:483
31: tokio_core::reactor::Core::poll::{{closure}}::{{closure}}::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.16/src/reactor/mod.rs:326
32: <scoped_tls::ScopedKey<T>>::set
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.1/src/lib.rs:155
33: tokio_core::reactor::Core::poll::{{closure}}::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.16/src/reactor/mod.rs:325
34: tokio_executor::global::with_default::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.2/src/global.rs:176
35: <std::thread::local::LocalKey<T>>::try_with
at /Users/travis/build/rust-lang/rust/src/libstd/thread/local.rs:290
36: <std::thread::local::LocalKey<T>>::with
at /Users/travis/build/rust-lang/rust/src/libstd/thread/local.rs:244
37: tokio_executor::global::with_default
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.1.2/src/global.rs:150
38: tokio_core::reactor::Core::poll::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.16/src/reactor/mod.rs:289
39: tokio_reactor::with_default::{{closure}}
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.1/src/lib.rs:207
40: <std::thread::local::LocalKey<T>>::try_with
at /Users/travis/build/rust-lang/rust/src/libstd/thread/local.rs:290
41: <std::thread::local::LocalKey<T>>::with
at /Users/travis/build/rust-lang/rust/src/libstd/thread/local.rs:244
42: tokio_reactor::with_default
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-reactor-0.1.1/src/lib.rs:199
43: futures::task_impl::Notify::clone_id
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.16/src/reactor/mod.rs:288
44: tokio_core::reactor::Core::run
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.16/src/reactor/mod.rs:264
45: <actix::context::Context<A> as futures::future::Future>::poll
at /Users/felix/.cargo/registry/src/github.com-1ecc6299db9ec823/actix-0.5.5/src/system.rs:100
46: actix_cancel::main
at src/main.rs:41
47: std::rt::lang_start::{{closure}}
at /Users/travis/build/rust-lang/rust/src/libstd/rt.rs:74
48: std::panicking::try::do_call
at libstd/rt.rs:59
at libstd/panicking.rs:306
49: panic_unwind::dwarf::eh::read_encoded_pointer
at libpanic_unwind/lib.rs:102
50: <std::io::Write::write_fmt::Adaptor<'a, T> as core::fmt::Write>::write_str
at libstd/panicking.rs:285
at libstd/panic.rs:361
at libstd/rt.rs:58
51: std::rt::lang_start
at /Users/travis/build/rust-lang/rust/src/libstd/rt.rs:74
52: <actix_cancel::A as core::default::Default>::default
Am I missing something?
Hi,
I have an actor whose address I got when starting it:
let addr: Addr<Unsync, _> = Actor(
...
).start();
I am trying to pass the address to a different service that receives messages via gRPC.
The service can accept data/references through a struct:
struct MyServiceImpl {
address: Addr<Unsync, _>
}
Firstly, should I be passing the actor's address around? If so, what should I replace the _
with on MyServiceImpl
?
I've been going through the documentation trying to figure out what to replace the _
with on MyServiceImpl
, as I can't use the type placeholder.
Thanks.
Any plans to support futures 0.2.0? There were some breaking changes that don't work with actix 0.5.6.
While studying Kabuki and Akka recently, I read mostly about their Actor model and its terms, but this list of concurrency models shows that it is only one family, among even Process Algebra, "a diverse family of related approaches for formally modelling concurrent systems". Many of them have non-descriptive names, so perhaps it would make the most sense to simply list which possible features of various Models for Programming Concurrency this project aims to implement.
Also, based on noxisacat's reply to Actix 0.1 release announcement advising to switch away from Actor Model terms, since this project differs from its model, I picked out (possibly more) general and familiar concurrency terms, oriented toward teamwork and logistics, for a starting point in this:
Actor
~> Worker
: takes a job for processing
Item
~> Work
: piece of work (job)
Cell
~> Schedule
: ordering of a set of jobs (similar to a spool of thread)
Supervisor
~> Delegate
: sends jobs to workers' schedules
Arbiter
~> Broker
: sets how to use a field and which processes take place in it
SyncArbiter
~> Dealer
: handles users by spreading out supply
System
~> Field
: an environment for performing activities
Context
~> Track
: a laid out, measurable path with activities to follow; eg. on track and fast track
State
~> Progress
: measure of movement towards or away from defined start and end points; eg. work-in-progress and progress report
I think message subscriber Subscriber<M>
should implement futures::Sink
trait
Hi,
I'm looking at the different actor systems that currently exist for rust and actix seems to be the most developed from what I can tell. To get a feeling for the system(s) I started to implement the ring benchmark and compare them to Erlang as a baseline [1].
I noticed that actix by default only uses a single core/scheduler and was wondering what the most straightforward way to spread out work over all the available cores. I've looked around and the only thing I could find so far is "run an actor on a different thread" but that seems like it has to be hard-coded on a per actor level.
Cheers,
and many thanks in advance!
It looks like sending SystemExit to the system arbiter causes it to send StopArbiter to all arbiters including itself, and a warning is logged whenever system arbiter receives StopArbiter.
System arbiter received StopArbiter
message.
To shutdown system, SystemExit
message should be
send to Addr<Syn, System>
Can system either not send StopArbiter to itself, or not log this warning when it is the process of shutting down after receiving SystemExit?
Do you foresee adding a Lua or Python scripting environment? For example, some of the higher level patterns mentioned in rust-lang/rfcs#613 and #16 which rely on this low level core might be easier to write using a simper syntax and glue code. This may also overlap with other libraries that provide Rust application environments with simpler, powerful interfaces, but Actix may also have help to add for those and other libraries implemented at a scripting level too. As one example, check out Rote.
I think actix should provide supervision. But I am not sure what it means within actix context :)
does anyone have ideas?
For my use case it would be very useful to be able to run a task every x seconds,
so just like run_later
for AsyncContext
but instead of running once, it would repeat
every time the duration has passed.
Thank you for this useful framework!
I'm getting errors when trying to build under Windows, all related to src/actors/signal.rs
unresolved import `tokio_signal::unix`
cannot find value `SIGHUP` in module `libc`
cannot find value `SIGTERM` in module `libc`
cannot find value `SIGQUIT` in module `libc`
cannot find value `SIGCHLD` in module `libc`
Should the use of unix-specific code be cfg-gated?
~/personal_projects/rust/vendor/actix-web (master)$ cargo run --example juniper
Updating registry `https://github.com/rust-lang/crates.io-index`
error: no matching package named `actix-web` found
location searched: file:///Users/romanfrolow/personal_projects/rust/vendor/actix-web
required by package `basics v0.1.0 (file:///Users/romanfrolow/personal_projects/rust/vendor/actix-web/examples/basics)`
~/personal_projects/rust/vendor/actix-web (master)$ cd examples/juniper/
~/personal_projects/rust/vendor/actix-web/examples/juniper (master)$ cargo run
Updating registry `https://github.com/rust-lang/crates.io-index`
error: no matching package named `actix-web` found
location searched: file:///Users/romanfrolow/personal_projects/rust/vendor/actix-web
required by package `basics v0.1.0 (file:///Users/romanfrolow/personal_projects/rust/vendor/actix-web/examples/basics)`
~/personal_projects/rust/vendor/actix-web/examples/juniper (master)$
I have the following transport flow:
Server receives incoming stream (or client makes outgoing connection).
Framed
is used with InboundHandshakeCodec
(or OutboundHandshakeCodec
) to perform a protocol handshake.
A new Framed
is created with a general post-handshake Codec
via:
let (parts, handshake_codec) = handshake_framed.into_parts_and_codec();
Framed::from_parts(parts, Codec::from(handshake_codec))
This is necessary because Codec
needs to grab encryption setup information from *HandshakeCodec
.
Looking at the code, FramedActor::framed(stream, InboundHandshakeCodec)
passes its arguments through to FramedContext::new()
, which calls stream.framed(codec)
and stores the result internally. It would be great to have a FramedContext
method that kills it and returns the underlying framed
, and a corresponding FramedActor::framed()
equivalent that takes a Framed
instead of stream, codec
.
I think there should be per Arbiter
registry, some kind of actor singleton system.
maybe even with auto actor spawn. so Arbiter
can provide per thread registry and System
can provide global registry.
interface could be very simple, something like this
trait Registry<A: Actor> {
fn get<A: Default>() -> Address<A>;
fn query() -> Option<Address<A>>;
fn register(addr: Address<A>);
}
Currently the documentation states:
This method is similar to
add_future
but works with streams.Information to consider. Actor wont receive next item from a stream until
Response
future resolves to a result.Self::reply
resolves immediately.This method is similar to
add_stream
but it skips result error.
However there is no add_future
method that search can find and it is unclear what Response
future it refers to. (I'm trying to look for method of temporarily stopping the processing of stream without stopping context - it looks like what I'm looking for but I cannot figure what the documentation intends to convey).
It would be nice to be able to have Actors running on completely different servers or different processes using something like tarpc so you could delegate work out horizontally.
Use case would be that you have certain actors that are running on special hardware, such as GPUs etc..
Hello folks, I have a WriteHalf<TcpStream>
but I'm not sure how to attach it to an Actor
. I wrote a small prototype with an actor that reads from a ReadHalf<TcpStream>
and writes to a WriteHalf<TcpStream>
, basically an echo server. The problem is that it's not writing/flushing immediately:
The desired output would be:
> hello\r\n
< hello\r\n
> world\r\n
< world\r\n
But the server is echoing the first line only when I send a second one.
Thanks!
While attempting to fire up the very core demo project from the actix.rs website, I'm getting a compile error:
error[E0432]: unresolved import `std::sync::atomic::spin_loop_hint`
--> /Users/jimwharton/.cargo/registry/src/github.com-1ecc6299db9ec823/parking_lot_core-0.2.14/src/spinwait.rs:14:5
|
14 | use std::sync::atomic::spin_loop_hint;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ no `spin_loop_hint` in `sync::atomic`
Compiling tempdir v0.3.7
error: aborting due to previous error
error: Could not compile `parking_lot_core`.
warning: build failed, waiting for other jobs to finish...
error: build failed
I've done a bit of looking around, but I'm not seeing any real leads. I'm using stable. The only dep in my Cargo.toml
is actix-web = "0.6"
I'm wondering if you will be interested in adding support for serializing the state of an entire actor system. How I see it could be implemented is first waiting for the completion of processing of all currently executing Handlers, preventing new ones from starting, then write out the state of each arbiter and the state of each actor within it, including their inboxes.
This will be pretty difficult to implement, but I will be interested in doing so.
I'd like to split actix into two packages
I got this:
thread 'main' panicked at 'Use Self::Context::notify() instead of direct use of address',
when I try to send a message:
struct FragmentLoader {
model_info_merger: Addr<Unsync, ModelInfoMergeController>
}
impl Actor for FragmentLoader {
type Context = actix::Context<Self>;
}
impl Handler<LoadFragment> for FragmentLoader {
type Result = ();
fn handle(&mut self, mut msg: LoadFragment, ctx: &mut actix::Context<FragmentLoader>) -> Self::Result {
// ...
self.model_info_merger.do_send(MergeModelInfo { model_info: msg.model_info });
}
}
How to send a message to another actor, not the Context < Self >?
Application panics after cancelling future created with actix::AsyncContext::add_stream
Code sample:
extern crate actix;
extern crate tokio_core;
extern crate futures;
use std::time::Duration;
use futures::Stream;
use tokio_core::reactor::Interval;
use actix::prelude::*;
use actix::{Actor, Context, StreamHandler, SpawnHandle, Arbiter};
struct ClosableActor {
source: SpawnHandle,
}
impl Actor for ClosableActor {
type Context = Context<Self>;
}
struct ClosablePacket;
impl<K> StreamHandler<ClosablePacket, K> for ClosableActor {
fn handle(&mut self, _: ClosablePacket, ctx: &mut Context<Self>) {
ctx.cancel_future(self.source);
}
}
fn main() {
let sys = actix::System::new("closable");
let _: () = ClosableActor::create(|ctx| {
ClosableActor {
source: ctx.add_stream(
Interval::new(
Duration::from_millis(1),
Arbiter::handle()
).unwrap().map(|_| ClosablePacket)
),
}
});
std::process::exit(sys.run());
}
Actix
's default behavior is to call StreamHandler::finished
whenever an stream finished. And also by default, it will stop the Actor
. But what if an Actor
is handling multiple streams? Is this by-design or a bug?
Reproduce steps:
0.0.0.0:12345
.extern crate futures;
extern crate tokio_io;
extern crate tokio_core;
extern crate actix;
extern crate bytes;
use std::net::SocketAddr;
use std::str::FromStr;
use std::io;
use tokio_io::AsyncRead;
use tokio_io::codec::{Encoder, Decoder, FramedRead};
use tokio_core::net::{TcpListener, TcpStream};
use actix::prelude::*;
use futures::Stream;
use bytes::{BufMut, BytesMut};
#[derive(Message)]
struct Message(u8);
struct MessageCodec {}
impl Encoder for MessageCodec {
type Item = Message;
type Error = io::Error;
fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
dst.put_u8(item.0);
Ok(())
}
}
impl Decoder for MessageCodec {
type Item = Message;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() > 1 {
let data = src.as_ref()[0];
src.advance(1);
Ok(Some(Message(data)))
} else {
Ok(None)
}
}
}
pub struct IOServer {}
impl IOServer {
pub fn new_io_server(listener: TcpListener) {
let _: Addr<Syn, _> = IOServer::create(move |ctx| {
// Handle incoming connection
ctx.add_message_stream(listener.incoming()
.map_err(|_| ()).map(|(st, addr)| TcpConnect(st, addr)));
IOServer {}
});
}
}
impl Actor for IOServer {
type Context = Context<Self>;
}
#[derive(Message)]
pub struct TcpConnect(pub TcpStream, pub SocketAddr);
impl Handler<TcpConnect> for IOServer {
type Result = ();
fn handle(&mut self, msg: TcpConnect, ctx: &mut Context<Self>) {
println!("IOServer receive connection: {:?}", msg.1);
let (r, _) = msg.0.split();
IOServer::add_stream(FramedRead::new(r, MessageCodec {}), ctx);
}
}
impl actix::io::WriteHandler<io::Error> for IOServer {}
impl StreamHandler<Message, io::Error> for IOServer {
fn handle(&mut self, item: Message, _ctx: &mut Self::Context) {
println!("IOServer received msg: {}", item.0);
}
}
fn main() {
let sys = System::new("ActorServer");
let addr = SocketAddr::from_str("0.0.0.0:12345").unwrap();
let listener = TcpListener::bind(&addr, Arbiter::handle()).unwrap();
println!("Server started on: {}", addr);
IOServer::new_io_server(listener);
println!("Running IOServer on {:?}", addr);
sys.run();
}
Trying the following example, copied verbatim from the README:
extern crate actix;
extern crate futures;
use futures::{future, Future};
use actix::*;
// this is our Message
struct Sum(usize, usize);
// we have to define the response type for `Sum` message
impl Message for Sum {
type Result = usize;
}
// Actor definition
struct Summator;
impl Actor for Summator {
type Context = Context<Self>;
}
// now we need to define `MessageHandler` for the `Sum` message.
impl Handler<Sum> for Summator {
type Result = usize; // <- Message response type
fn handle(&mut self, msg: Sum, ctx: &mut Context<Self>) -> Self::Result {
msg.0 + msg.1
}
}
fn main() {
let system = System::new("test");
let addr: Addr<Unsync, _> = Summator.start();
let res = addr.send(Sum(10, 5)); // <- send message and get future for result
system.handle().spawn(res.then(|res| {
match res {
Ok(result) => println!("SUM: {}", result),
_ => println!("Something wrong"),
}
Arbiter::system().do_send(msgs::SystemExit(0));
future::result(Ok(()))
}));
system.run();
}
Adding to Cargo.toml:
[dependencies]
actix = "0.5.6"
futures = "0.2.1"
Attempting to compile then fails with:
warning: unused import: `Future`
--> src/main.rs:3:23
|
3 | use futures::{future, Future};
| ^^^^^^
|
= note: #[warn(unused_imports)] on by default
error[E0599]: no method named `then` found for type `actix::dev::Request<actix::Unsync, Summator, Sum>` in the current scope
--> src/main.rs:36:31
|
36 | system.handle().spawn(res.then(|res| {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following trait is implemented but not in scope, perhaps add a `use` for it:
|
3 | use futures::future::Future;
|
error: aborting due to previous error
error: Could not compile `actix-test`.
To learn more, run the command again with --verbose.
What am I missing? Should this example work out-of-the-box?
do_send
on Addr
is documented as:
Send message unconditionally
This method ignores actor's mailbox capacity, it silently fails if mailbox is closed.
However do_send
on Recipient
is:
Send message
Deliver message even if recipient's mailbox is full
It seems strange to have 2 similarly named methods with subtly difference in semantics which cannot be easily explained by nature of calls. I had expected that addr.do_send(...)
and addr.recipient().do_send(...)
are equivalend.
can support other language as client?
Today you need to return data from Sync Actor back until another Async Actor is reached and then resume the flow. Depending on your original flow you will need a lot of refactoring.
This test case below fail today with thread '' panicked at 'System is not running', src/arbiter.rs:151:21
extern crate actix;
extern crate futures;
use actix::prelude::*;
use actix::SyncArbiter;
use futures::Future;
struct TestMessage;
impl Message for TestMessage {
type Result = Result<(), ()>;
}
#[test]
fn test_async_sync_async() {
struct AsyncActor {
pub addr: Addr<Syn, SyncActor>,
};
struct AsyncActor2;
impl Actor for AsyncActor {
type Context = Context<Self>;
}
impl Actor for AsyncActor2 {
type Context = Context<Self>;
}
impl Handler<TestMessage> for AsyncActor {
type Result = Result<(), ()>;
fn handle(&mut self, _: TestMessage, ctx: &mut Self::Context) -> Self::Result {
println!("got message inside AsyncActor");
let _ = ctx.spawn(self.addr.send(TestMessage{})
.map_err(|_| ())
.and_then(|_| {
println!("got response from SyncActor");
Arbiter::system().do_send(actix::msgs::SystemExit(0));
Ok(())
})
.into_actor(self)
);
Ok(())
}
}
impl Handler<TestMessage> for AsyncActor2 {
type Result = Result<(), ()>;
fn handle(&mut self, _: TestMessage, _ctx: &mut Self::Context) -> Self::Result {
println!("got message inside AsyncActor2");
Ok(())
}
}
struct SyncActor {
pub addr: Addr<Syn, AsyncActor2>,
};
impl Actor for SyncActor {
type Context = SyncContext<Self>;
}
impl Handler<TestMessage> for SyncActor {
type Result = Result<(), ()>;
fn handle(&mut self, _: TestMessage, _ctx: &mut Self::Context) -> Self::Result {
println!("got message inside SyncActor");
let _ = self.addr.send(TestMessage{})
.wait()
.map_err(|_| ())
.and_then(|_| {
println!("got response from AyncActor2");
Arbiter::system().do_send(actix::msgs::SystemExit(0));
Ok(())
});
Ok(())
}
}
let sys = System::new("test");
let async2_addr : Addr<Syn, _> = AsyncActor2::create(move |_ctx| {
AsyncActor2{}
});
let async2c = async2_addr.clone();
let sync_addr = SyncArbiter::start(2, move || {
SyncActor{addr: async2c.clone()}
});
let async_addr : Addr<Syn, _> = AsyncActor::create(move |_ctx| {
AsyncActor{addr: sync_addr}
});
async_addr.do_send(TestMessage{});
sys.run();
assert!(true);
}
I've noticed that I have a lot of repetition of impl ResponseType<...
since a new one is required for each signal. To DRY my code, I've defined the following macro:
macro_rules! response_type {
($Actor:ty, $Msg:ty, $item:ty, $err:ty) => (
impl ResponseType<$Msg> for $Actor {
type Item = $item;
type Error = $err;
}
);
($Actor:ty, $Msg:ty, $item:ty) => (
impl ResponseType<$Msg> for $Actor {
type Item = $item;
type Error = ();
}
);
($Actor:ty, $Msg:ty) => (
impl ResponseType<$Msg> for $Actor {
type Item = ();
type Error = ();
}
);
}
This seems to work, though there may be a more elegant approach. In a perfect world, it would be great to use an attribute as in the following example:
#[response_type(MyStatusEnum)]
impl Handler<PrivMsg> for Engine {
fn handle(&mut self, msg: PrivMsg, ctx: &mut Context<Self>) -> Response<Self, PrivMsg> {
println!("Source: {}, Target: {}, Text: {}", &msg.source, &msg.target, &msg.text);
MyStatusEnum::Ok
}
}
Is there a technique I missed that would make all of the above superfluous? Anyone else running into this repetition?
Are there any performance benchmarks for single node message latency/throughput (with varying number of actors) compared to Akka (or other actor frameworks)?
I bumped my actix dependency to 0.3.5, and cargo returned:
error: cyclic package dependency: package `actix v0.3.5` depends on itself
Hi fafhrd91,
Thanks for your work on actix, it is shaping up to be one of the most ergonomic and fleshed out actor libraries on Rust.
I'm working on an actor based routing algorithm simulator and would like to simulate large numbers of nodes, making use of multithreading to distribute the load across multiple threads.
I saw on the documentation that "Actors could run in multiple threads with support of Arbiter", and I was wondering if you could write a small example, showing how to spin up multiple arbiters on different cores, create new actors on different arbiters and how to send messages from an actor on one arbiter into an actor on another arbiter.
Thanks in advance
Hi,
I think I found a bug in the SystemRegistry, it does not actually write to the hashmap when it creates a service, thus every time a service is looked up, a new one is created.
I have attached a patched version of this function below:
pub fn get<A: SystemService + Actor<Context=Context<A>>>(&self) -> SyncAddress<A> {
if let Ok(hm) = self.registry.lock() {
if let Some(addr) = hm.borrow().get(&TypeId::of::<A>()) {
match addr.downcast_ref::<SyncAddress<A>>() {
Some(addr) =>{
return addr.clone()},
None =>
error!("Got unknown value: {:?}", addr),
}
}
let addr = Supervisor::start_in(Arbiter::system_arbiter(), false, |ctx| {
let mut act = A::default();
act.service_started(ctx);
act
});
hm.borrow_mut().insert(TypeId::of::<A>(), Box::new(addr.clone().unwrap()));
return addr.expect("System is dead")
}
panic!("System registry lock is poisoned");
}
Pretty similar code (or rather problem) to this:
https://www.reddit.com/r/rust/comments/7wkirc/using_actixactors_together_with_irccrate_over/du35k31/
call_fut
doesn't seem to exist anymore. Any hints how to do something like that now?
In this simple example an actor sends a message to itself repeatedly.
I would assume a constant memory usage here. However it keeps growing.
Maybe I missed some important detail from the docs or it's leaking somewhere.
extern crate actix;
use actix::*;
struct CounterActor;
struct Count {
pub i: u32,
pub sender: Address<CounterActor>
}
impl ResponseType for Count {
type Item = u32;
type Error = ();
}
impl Actor for CounterActor {
type Context = Context<Self>;
}
impl Handler<Count> for CounterActor {
fn handle(
&mut self,
msg: Count,
ctx: &mut Self::Context,
) -> Response<Self, Count> {
msg.sender.send(Count { i: msg.i + 1, sender: msg.sender.clone() });
println!("{}", msg.i);
Self::reply(msg.i)
}
}
fn main() {
let system = actix::System::new("test");
let addr: Address<_> = CounterActor.start();
addr.send(Count { i: 0, sender: addr.clone() });
system.run();
}
It is not possible to re-connect with FramedContext
Hi. Does it make sense to send messages over bounded channels and create blocking send
and send_timeout
in actor model? If so, is there a way to do it using this crate?
Continuing the conversation from #9, my last stumbling block was how to get the Address
and SyncAddress
of the sender in a handle(...)
. I believe it should be accessible from the ctx
, but is there an example of this?
https://actix.github.io/actix/guide/qs_04.html - now empty
The ml crate appears to provide this, eg for its own ML -Modeling Language- svg diagram.
If you are unaware, the tokio project just pushed tokio 0.1 which is a major reform of the scope of tokio. In relation to this project, this significantly changes actor scheduling and dispatching.
The first immediate concern is that actix can no longer use the tokio event loop as a executor; however I see this a wonderful opportunity to rewrite the entire actor execution stack to provide more varied methods of running an actor system.
One of the bigger paper-cuts of this library is the inflexibility of the current System
/Arbiter
methodology. You HAVE to run the System
before you can use Arbiter
's to manage other threads. Although you can run the system in another thread to begin with, it requires you to perform that thread management by yourself, which might be too much for a simple project.
I would love to see the ability to request actors be run on specific executors that can provide guarantees that suit the actors workload. Things like pinned thread pools, fork-join pools and single-threaded executors. This would broaden the set of applications Actix can work for and give more flexibility to developers to get the best performance out of their system. We can then also build test harnesses for actor systems and even start providing different mailbox types and supervised actor routing.
In either case, we are going to have to evolve because of this change. But it is a great diving board from which to make these changes.
example
impl Handler<IrcMessage> for System {
type Result = ();
fn handle(&mut self, msg: IrcMessage, ctx: &mut Context<Self>) -> Self::Result {
println!("Printing IRC Message")
}
}
full example https://www.reddit.com/r/rust/comments/7wkirc/using_actixactors_together_with_irccrate_over/
This is fairly self-explanatory. It would be nice to have Option implement MessageResponse, so that Option could be the result of a Handler.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.