Code Monkey home page Code Monkey logo

chan's People

Contributors

azdlowry avatar burntsushi avatar coder543 avatar dflemstr avatar dsprenkels avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

chan's Issues

Broadcast to all consumers

I would like to send a message and have it be consumed by all consumers, not just the first available one. Is there a way to perform this with the current API? If not, this would be a very useful feature.

[feature request] re-expose try_recv()

I'm looking at converting some code from std::sync::mpsc to chan, and the lack of try_recv kind of makes my code ugly. For example, consuming all current items on an asynchronous channel without blocking in std looks like the following:

while let Some(item) = item.containing.chan.try_recv() {
    // handle item
}

In chan, this ends up looking like

let receiver = item.containing.chan
loop {
    chan_select! {
        default => {
            break;
        },
        receiver.recv() -> item => {
             // handle item
        }
    }
}

If you're in a vaguely-OO context and the receiver is on self, this gets even uglier because there are gross contortions required to get a reference to the receiver whose lifetime doesn't spread through the entire method.

I appreciate that it's cleaner to just have select instead of also having the .try_* methods, but it seems that it might be more pragmatic to keep both interfaces, especially since it seems like upstream is never going to stabilize Select in std::sync::mpsc, so chan is going to be increasingly popular for people who want CSP semantics.

crossbeam-channel โ€“ EOL chan

#@BurntSushi I would suggest checking out crossbeam-channel and if you feel it's a replacement for chan, point people to it in your README to defragment the mpmc situation a little bit. Thanks for all you do.

Select integration with std lib mpsc api

First off that you so much for this crate it provides everything im missing from the std lib. Ive recently ran into some pain points though. In particular Im interfacting with other crates that use the std lib's mpsc api. Are there suggested patterns for adapting. What I really want is a select api that's compatible with stable rust.

Relicense under dual MIT/Apache-2.0

Why?

The MIT license requires reproducing countless copies of the same copyright
header with different names in the copyright field, for every MIT library in
use. The Apache license does not have this drawback, and has protections from
patent trolls and an explicit contribution licensing clause. However, the
Apache license is incompatible with GPLv2. This is why Rust is dual-licensed as
MIT/Apache (the "primary" license being Apache, MIT only for GPLv2 compat), and
doing so would be wise for this project. This also makes this crate suitable
for inclusion in the Rust standard distribution and other project using dual
MIT/Apache.

How?

To do this, get explicit approval from each contributor of copyrightable work
(as not all contributions qualify for copyright) and then add the following to
your README:

## License

Licensed under either of
 * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
 * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you shall be dual licensed as above, without any
additional terms or conditions.

and in your license headers, use the following boilerplate (based on that used in Rust):

// Copyright (c) 2015 t developers
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT
// license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.

And don't forget to update the license metadata in your Cargo.toml!

Contributor checkoff

Sender::send(t) -> Result<(), SendError>

Is there a performance reason that this functionality is not exposed? It'd give more power to the user to decide exactly how they want to handle it if the dropped receiver case were exposed.

I suppose I have a few reasons...

  • To become a drop-in replacement for std::sync::mpsc by closer mimicing its design.
  • Working on a memory constrained system (~32MB total RAM), being able to leak threads is bad, as even their stack space alone is super expensive. If a server does contain a bug and a receiving thread panics, leaving a sender in limbo is a very expensive cost for something that would otherwise be recoverable.
  • I use rendezvous channels in pipe to back a Read/Write interface that sends Vec<u8> buffers across threads. Imagine a case where a Read consumer doesn't finish reading to EOF (an error is encountered when processing the data), and drops the Read/Receiver end. The writer would normally encounter a broken pipe error, but deadlocking would occur with chan, and leak resources from the writing thread:
let mut pipe_read = ..;
let mut some_file = try!(File::create("..."));
try!(::std::io::copy(&mut pipe_read, &mut some_file));

(also side note, the pipe thing is why I'm somewhat concerned with throughput. It's not a terribly huge issue though, especially when dealing with large enough buffers.)

doesn't seem to work with `fork()`

I'm trying to mix this chan module together with libc::fork() but somehow it doesn't work.
This is probably due to some difference between thread::spawn() and fork() but I just don't understand why it should not work.

Here's the example from the README adapted to fork(). Interestingly the INT signal works but the TERM not even though sdone is really moved into run

Using chan_select! requires values to be 'static

Using chan_select! forces values to be static for no apparent reason. At the time of writing, here:

pub fn recv<'r: 'c, T: 'static>(

This is very inconvenient while using a scoped thread pool where threads communicate with each other via pointers into a big buffer. All of the normal channel operations work fine, except chan_select!

remove hyper dev dependency

Is it possible to remove hyper from the dev dependencies?
Hyper includes openssl which is a pain to build on Windows

Thanks

// Cargo.toml
[dev-dependencies]
hyper = { version = "*" }

How to chan_select over multiple chans that will terminate?

Sorry, not sure where to put this so I'm opening up an issue... hopefully you're okay with that.

I'll just use this simple example and hopefully you will understand what I'm getting at...

#[macro_use]
extern crate chan;

use std::thread;

fn main() {
    let (s1, r1) = chan::async();
    let (s2, r2) = chan::async();

    thread::spawn(move || run(5, s1));
    thread::spawn(move || run(10, s2));

    let mut found1 = 0;
    let mut found2 = 0;

    // How do you get of this loop safely?  I suppose you could check how many
    // start returning `None` and when all channels have then you can break but
    // your cpu is going to be pegged for all the channels that continuously
    // return None.  It seems the only reasonable way for this to happen is if
    // there was a mechanism to "un-subscribe" a channel once it has returned
    // None but I don't know how to do this.
    loop {
        chan_select! {
        r1.recv() -> n => {
            found1 += 1;
            println!("Got r1: {:?} ({})", n, found1);
        },

        r2.recv() -> n => {
            found2 += 1;
            println!("Got r2: {:?} ({})", n, found2);
        },
    }
    }

    // We'll obviously never get here
    println!("R1s: {} / R2s: {}", found1, found2);
}

fn run(n: u8, s: chan::Sender<u8>) {
    for i in 0..n {
        s.send(i * n);
    }
}

Possible memory leak in `tick`

Hello,

I might have discovered a memory leak in pub fn tick, but I'm not sure what exactly is leaking.

My call site code looks like this:

    pub fn spawn(mut self) -> JoinHandle<()> {
        let timer = chan::tick(Duration::from_secs(TICK_INTERVAL_SEC));

        thread::spawn(move || {
            info!("thread started.");

            let from_main_rx = self.from_main_rx;
            loop {
                chan_select! {
                    timer.recv() => {
                        trace!("I've been ticked. Current sample queue length is {:#?}", &self.queue.len());
                    },
                    from_main_rx.recv() -> msg => {
                        match msg {
                            Some(BosunRequest::Shutdown) => {
                                debug!("Received message to shut down.");
                                break
                            },
                            None => {
                                error!("Channel unexpectedly shut down.");
                            }
                        }
                    }
                }
            }

            info!("thread finished.");
        })
    }

Running this function from a binary compiled with

#![feature(alloc_system)]
extern crate alloc_system;

valgrind reports this:

> RUST_LOG=rs_collector=debug valgrind --leak-check=full ./target/debug/rs-collector -c examples/rs-collector.conf
...
==6839==
==6839== HEAP SUMMARY:
==6839==     in use at exit: 12,608 bytes in 54 blocks
==6839==   total heap usage: 427 allocs, 373 frees, 412,517 bytes allocated
==6839==
==6839== 288 bytes in 1 blocks are possibly lost in loss record 46 of 53
==6839==    at 0x4C2CC70: calloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==6839==    by 0x4012E14: allocate_dtv (dl-tls.c:296)
==6839==    by 0x4012E14: _dl_allocate_tls (dl-tls.c:460)
==6839==    by 0x5043D92: allocate_stack (allocatestack.c:589)
==6839==    by 0x5043D92: pthread_create@@GLIBC_2.2.5 (pthread_create.c:500)
==6839==    by 0x495473: std::sys::thread::Thread::new::h07b2378eb374ee61 (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==    by 0x305169: std::thread::Builder::spawn::h288a4774d44d4b77 (mod.rs:284)
==6839==    by 0x304DD6: std::thread::spawn::hb4242ef8f989958c (mod.rs:314)
==6839==    by 0x31D955: chan_signal::init::hbe6da6429d2f5cec (lib.rs:200)
==6839==    by 0x31E65F: __static_ref_initialize (lib.rs:132)
==6839==    by 0x31E65F: _$LT$chan_signal..HANDLERS$u20$as$u20$core..ops..Deref$GT$::deref::__stability::_$u7b$$u7b$closure$u7d$$u7d$::hf2f0995e986957db (lib.rs:132)
==6839==    by 0x31E62D: std::sync::once::Once::call_once::_$u7b$$u7b$closure$u7d$$u7d$::h71656142c75f7bcc (once.rs:210)
==6839==    by 0x49177A: std::sync::once::Once::call_inner::hb33997e0feb5bfa3 (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==    by 0x304B2F: std::sync::once::Once::call_once::h8aee10744d4e6d39 (once.rs:210)
==6839==    by 0x31E142: _$LT$chan_signal..HANDLERS$u20$as$u20$core..ops..Deref$GT$::deref::hf1cfd10973a74520 (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==
==6839== 288 bytes in 1 blocks are possibly lost in loss record 47 of 53
==6839==    at 0x4C2CC70: calloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==6839==    by 0x4012E14: allocate_dtv (dl-tls.c:296)
==6839==    by 0x4012E14: _dl_allocate_tls (dl-tls.c:460)
==6839==    by 0x5043D92: allocate_stack (allocatestack.c:589)
==6839==    by 0x5043D92: pthread_create@@GLIBC_2.2.5 (pthread_create.c:500)
==6839==    by 0x495473: std::sys::thread::Thread::new::h07b2378eb374ee61 (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==    by 0x454CD1: std::thread::Builder::spawn::ha2e63bc9d679d800 (mod.rs:284)
==6839==    by 0x45481F: std::thread::spawn::h0f20e9934c579029 (mod.rs:314)
==6839==    by 0x46E962: chan::tick::hb2f8b60ea167ed37 (lib.rs:624)
==6839==    by 0x2C3D0C: rs_collector::scheduler::run::hfc41016f1d458fd3 (scheduler.rs:20)
==6839==    by 0x12B9ED: rs_collector::run::h7830496d2d7a82c6 (main.rs:70)
==6839==    by 0x12B2FC: rs_collector::main::heb526eece5bee66c (main.rs:53)
==6839==    by 0x49F596: __rust_maybe_catch_panic (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==    by 0x496221: std::rt::lang_start::h14cbded5fe3cd915 (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==
==6839== 288 bytes in 1 blocks are possibly lost in loss record 48 of 53
==6839==    at 0x4C2CC70: calloc (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
==6839==    by 0x4012E14: allocate_dtv (dl-tls.c:296)
==6839==    by 0x4012E14: _dl_allocate_tls (dl-tls.c:460)
==6839==    by 0x5043D92: allocate_stack (allocatestack.c:589)
==6839==    by 0x5043D92: pthread_create@@GLIBC_2.2.5 (pthread_create.c:500)
==6839==    by 0x495473: std::sys::thread::Thread::new::h07b2378eb374ee61 (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==    by 0x454CD1: std::thread::Builder::spawn::ha2e63bc9d679d800 (mod.rs:284)
==6839==    by 0x45481F: std::thread::spawn::h0f20e9934c579029 (mod.rs:314)
==6839==    by 0x46E962: chan::tick::hb2f8b60ea167ed37 (lib.rs:624)
==6839==    by 0x2C0E1E: rs_collector::bosun::Bosun::spawn::he31e8b4effd8edea (bosun.rs:50)
==6839==    by 0x2C42EC: rs_collector::scheduler::run::hfc41016f1d458fd3 (scheduler.rs:29)
==6839==    by 0x12B9ED: rs_collector::run::h7830496d2d7a82c6 (main.rs:70)
==6839==    by 0x12B2FC: rs_collector::main::heb526eece5bee66c (main.rs:53)
==6839==    by 0x49F596: __rust_maybe_catch_panic (in /vagrant/rs-collector/target/debug/rs-collector)
==6839==
==6839== LEAK SUMMARY:
==6839==    definitely lost: 0 bytes in 0 blocks
==6839==    indirectly lost: 0 bytes in 0 blocks
==6839==      possibly lost: 864 bytes in 3 blocks
==6839==    still reachable: 11,744 bytes in 51 blocks
==6839==         suppressed: 0 bytes in 0 blocks
==6839== Reachable blocks (those to which a pointer was found) are not shown.
==6839== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==6839==
==6839== For counts of detected and suppressed errors, rerun with: -v
==6839== ERROR SUMMARY: 3 errors from 3 contexts (suppressed: 0 from 0)

Any help is appreciated,
thanks in advance,
Lukas

Async channel is blocking program

I use channel createt widh chan::async.
In my laptop perfect working. On my second computer blocks when sending function data using chan::Sender.

It looks like a bug. Please help :)

Allow chan_select to generate no-op None variants

I'm not sure if this is easily doable or not, but would it be possible for the select_chan macro to fill in no-op variants of uncovered patterns?

For example, trying to compile this results in an error refutable pattern in local binding: ``None`` not covered:

    chan_select! {
               msg_rx.recv() -> Some(user) => {
                   //do something with user
               }
    }

and it is necessary to be overly verbose:

    chan_select! {
               msg_rx.recv() -> x => {
                       if let Some(user) = x {
                           //do something with user
                        }
               }
    }

Deadlock with select-send and select-recv

Code is worth more than a thousand words. :)

#[macro_use]
extern crate chan;
extern crate crossbeam;

fn main() {
    let (tx, rx) = chan::sync(0);

    // Good.
    crossbeam::scope(|s| {
        let (tx, rx) = (tx.clone(), rx.clone());
        s.spawn(move || tx.send(()));
        s.spawn(move || chan_select! { rx.recv() => {} });
    });

    // Good.
    crossbeam::scope(|s| {
        let (tx, rx) = (tx.clone(), rx.clone());
        s.spawn(move || chan_select! { tx.send(()) => {} });
        s.spawn(move || rx.recv());
    });

    // Deadlock.
    crossbeam::scope(|s| {
        let (tx, rx) = (tx.clone(), rx.clone());
        s.spawn(move || chan_select! { tx.send(()) => {} });
        s.spawn(move || chan_select! { rx.recv() => {} });
    });
}

Add timeout-able methods

I suppose a workaround would be to create a "timer" channel and select on it, but it sounds quite contrived.

All Notifier subscriptions made in select.rs appear to be leaked

Hello! I have been debugging a gradual misbehavior of some code that's using the chan module.

I seem to have tracked this issue down to the following bug in select.rs (line numbers reference https://github.com/BurntSushi/chan/blob/955d8a5aecef77c1a380bd2c8265b9efed83103b/src/select.rs ):

in Choice::subscribe on line 281 (and 329), we discard the return value from Notifier::subscribe.

in Choice::unsubscribe on line 285 (and 333), we match against self.id to find the id of our subscription; but we have never saved the id of our subscription, and self.id is always None.

I haven't verified the dynamic behavior that self.id is always None, but I have verified using a breakpoint that Notifier::unsubscribe seems never to be called. I also find that after more than a week of operation, our subscription hashtable has 445k entries:

(gdb) p (*notify.__data).value.subscriptions.table.size
$10 = 445594

and we were spending a lot of CPU cycles iterating over the subscription list, which is what lead me to take a closer look at subscriptions in the first place.

Let me know if you'd like any further info.

Thanks!

chan::async() conflicts with Rust 2018 Edition

In Rust 2018, async will become a keyword and conflict with the async() function that chan currently defines. I only noticed this because cargo fix is currently unable to handle this case. The workaround right now is to add this line to the top of the crate:

#![feature(raw_identifiers)]

and replace chan::async() with chan::r#async(). However, in the long term, it might be a good idea to pick a different name. The chan::sync() function would likely have to be renamed as well, in order to stay symmetric.

Despite this small issue, thank you for your work on this crate! I've used it several times now and it has always been a pleasure to work with. ๐Ÿ™‚

Return value from `chan_select!` expression

Could chan_select! return a value if all branches return the same type?

For instance:

let result = chan_select! {
    default => {
        None
    },
    input_a.recv() => {
        Some(true)
    },
    input_b.recv() => {
        Some(false)
    },
};

(Right now chan_select! doesn't return anything and result would be ().)

Currently a workaround is to wrap the chan_select! in a function and return in each branch.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.