Code Monkey home page Code Monkey logo

ringbuf's Introduction

ringbuf

Crates.io Docs.rs Gitlab CI License

Lock-free SPSC FIFO ring buffer with direct access to inner data.

Features

  • Lock-free operations - they succeed or fail immediately without blocking or waiting.
  • Arbitrary item type (not only Copy).
  • Items can be inserted and removed one by one or many at once.
  • Thread-safe direct access to the internal ring buffer memory.
  • Read and Write implementation.
  • Overwriting insertion support.
  • Different types of buffers and underlying storages.
  • Can be used without std and even without alloc (using only statically-allocated memory).
  • Async and blocking versions (see this section).

Usage

At first you need to create the ring buffer itself. HeapRb is recommended but you may choose another one.

After the ring buffer is created it may be splitted into pair of Producer and Consumer. Producer is used to insert items to the ring buffer, consumer - to remove items from it.

Types

There are several types of ring buffers provided:

  • LocalRb. Only for single-threaded use.
  • SharedRb. Can be shared between threads. Its frequently used instances:
    • HeapRb. Contents are stored in dynamic memory. Recommended for use in most cases.
    • StaticRb. Contents can be stored in statically-allocated memory.

You may also provide your own generic parameters.

Performance

SharedRb needs to synchronize CPU cache between CPU cores. This synchronization has some overhead. To avoid multiple unnecessary synchronizations you may use methods that operate many items at once (push_slice/push_iter, pop_slice/pop_iter, etc.) or you can freeze producer or consumer and then synchronize threads manually (see items in frozen module).

For single-threaded usage LocalRb is recommended because it is slightly faster than SharedRb due to absence of CPU cache synchronization.

Examples

Simple

use ringbuf::{traits::*, HeapRb};

let rb = HeapRb::<i32>::new(2);
let (mut prod, mut cons) = rb.split();

prod.try_push(0).unwrap();
prod.try_push(1).unwrap();
assert_eq!(prod.try_push(2), Err(2));

assert_eq!(cons.try_pop(), Some(0));

prod.try_push(2).unwrap();

assert_eq!(cons.try_pop(), Some(1));
assert_eq!(cons.try_pop(), Some(2));
assert_eq!(cons.try_pop(), None);

No heap

use ringbuf::{traits::*, StaticRb};

const RB_SIZE: usize = 1;
let mut rb = StaticRb::<i32, RB_SIZE>::default();
let (mut prod, mut cons) = rb.split_ref();

assert_eq!(prod.try_push(123), Ok(()));
assert_eq!(prod.try_push(321), Err(321));

assert_eq!(cons.try_pop(), Some(123));
assert_eq!(cons.try_pop(), None);

Overwrite

Ring buffer can be used in overwriting mode when insertion overwrites the latest element if the buffer is full.

use ringbuf::{traits::*, HeapRb};

let mut rb = HeapRb::<i32>::new(2);

assert_eq!(rb.push_overwrite(0), None);
assert_eq!(rb.push_overwrite(1), None);
assert_eq!(rb.push_overwrite(2), Some(0));

assert_eq!(rb.try_pop(), Some(1));
assert_eq!(rb.try_pop(), Some(2));
assert_eq!(rb.try_pop(), None);

Note that push_overwrite requires exclusive access to the ring buffer so to perform it concurrently you need to guard the ring buffer with mutex or some other lock.

Derived crates

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

ringbuf's People

Contributors

agerasev avatar andreacatania avatar congyuwang avatar dusterthefirst avatar kovaxis avatar mottl avatar najamelan avatar notgull avatar oblique avatar squ1dd13 avatar ted-logan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

ringbuf's Issues

False sharing?

From a quick-ish glance at RingBuffer, it looks like head and tail could possibly be falsely shared.

However, I tried to improve it by padding the atomics so they'd be on different cache lines, wrote a threaded benchmark, and failed to produce something faster than the current implementation (in fact, my version was usually slightly slower in benchmarks).

So, I guess this issue is either:

  • How is the current implementation getting around false sharing?
  • Or, if you haven't explicitly thought about false sharing before, perhaps look into it and see if you can make ringbuf even faster :)

Access to unitialized data in `read_from`

Hi. Nice library, thanks for putting effort into making this.

I think I've discovered an issue allowing use of uninitialized data from safe code in the read_from function:

/// Reads at most `count` bytes from `Read` instance and appends them to the ring buffer.
/// If `count` is `None` then as much as possible bytes will be read.
///
/// Returns:
/// + `None`: ring buffer is empty or `count` is `0`. In this case `read` isn't called at all.
/// + `Some(Ok(n))`: `read` succeeded. `n` is number of bytes been read. `n == 0` means that `read` also returned `0`.
/// + `Some(Err(e))` `read` is failed and `e` is original error. In this case it is guaranteed that no items was read from the reader.
/// To achieve this we read only one contiguous slice at once. So this call may read less than `vacant_len` items in the buffer even if the reader is ready to provide more.
fn read_from<S: Read>(&mut self, reader: &mut S, count: Option<usize>) -> Option<io::Result<usize>>
where
Self: Producer<Item = u8>,
{
let (left, _) = self.vacant_slices_mut();
let count = cmp::min(count.unwrap_or(left.len()), left.len());
if count == 0 {
return None;
}
let left_init = unsafe { slice_assume_init_mut(&mut left[..count]) };
let read_count = match reader.read(left_init) {
Ok(n) => n,
Err(e) => return Some(Err(e)),
};
assert!(read_count <= count);
unsafe { self.advance_write_index(read_count) };
Some(Ok(read_count))
}
}

The issue is that a (safe) implementation of Read can read the uninitialized data in the buffer. This test:

#[test]
fn test_read_touching_buf() {

    struct Reader;

    impl Read for Reader {
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
            assert!(buf[0] == 0);
            Ok(0)
        }
    }

    let (mut producer, _consumer) = HeapRb::<u8>::new(10).split();
    producer.read_from(&mut Reader, None).unwrap();
}

Triggers this error when executing in Miri:

test foo ... error: Undefined Behavior: using uninitialized data, but this operation requires initialized memory
  --> src/main.rs:73:21
   |
73 |             assert!(buf[0] == 0);
   |                     ^^^^^^ using uninitialized data, but this operation requires initialized memory
   |
   = help: this indicates a bug in the program: it performed an invalid operation, and caused Undefined Behavior
   = help: see https://doc.rust-lang.org/nightly/reference/behavior-considered-undefined.html for further information

Blocking `read_write` test failure

---- tests::write_read stdout ----
thread '<unnamed>' panicked at blocking/src/tests.rs:134:9:
assertion `left == right` failed
  left: 2300
 right: 2304
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tests::write_read' panicked at blocking/src/tests.rs:139:27:
called `Result::unwrap()` on an `Err` value: Any { .. }

The behaviour of `Consumer::discard` as documented is currently too vague.

The behaviour of discard is currently too vague as documented, though I suspect this is a documentation issue.

It documentation currently offers no guarantees about how many, if any, elements will be discarded. In a situation in which I know for sure that there will be no elements added to the FIFO, to adhere to the spec I still need a loop that may or may not run forever.

In my use case, in which the producer is not producing and I want to empty the fifo, it's clear from the implementation that I don't need a loop at all, and just a calling discard() with Consumer::capacity() will reliably empty the fifo.

This is a situation where a doc-test would be extremely valuable, showing in which situation it may discard less than n (i.e. when n is larger than the current number of elements or in which there is a producer race). I'm happy to contribute this if it is the desired policy for discard.

Performance vs other crates?

A basic performance comparison would be great to compare against other rust ringbuffers, as well as crossbeam channel.

Overwriting mode

Hello,

Would it be possible to introduce an overwriting mode where if the consumer gets too far behind it'll simply lose those unhandled values?

Thanks in advance

A version of `push` that overrides the oldest element if full

I have a use-case where I have a list of incoming events and want to retain the latest n items -- the producer pushes elements one by one, but the consumer will always iterate or fetch the full list without ever calling pop.

I'd like to know if there's a way for the producer to continue pushing items to the buffer, discarding old items if the buffer is full. Currently it seems the only way to do this is to have some coordination between the producer and consumer (check if is_full and then have the consumer pop, then the producer can push).

Not sure whether this fits with the design of this library, but is it feasible to have a version of push that overrides the oldest element if the buffer is full (or some other recommended approach)? On the consumer side, I'm not too concerned with receiving the items in the proper order, if that matters.

Miri gives a UB because of stacked borrows

I run Miri on your crate and it gave an UB on push_access function. I don't have a clear understanding on stacked borrows yet, so I don't know how it can be fixed.

This is the output:

% cargo +nightly miri test
   Compiling ringbuf v0.2.7 (/home/oblique/git/ringbuf)
    Finished test [unoptimized + debuginfo] target(s) in 0.09s
     Running unittests src/lib.rs (target/miri/x86_64-unknown-linux-gnu/debug/deps/ringbuf-e44ea49f80c6b586)

running 39 tests
test tests::access::discard ... error: Undefined Behavior: trying to reborrow <220885> for SharedReadWrite permission at alloc84260[0x0], but that tag does not exist in the borrow stack for this location
   --> src/producer.rs:99:19
    |
99  |         let n = f(slices.0, slices.1);
    |                   ^^^^^^^^
    |                   |
    |                   trying to reborrow <220885> for SharedReadWrite permission at alloc84260[0x0], but that tag does not exist in the borrow stack for this location
    |                   this error occurs as part of a reborrow at alloc84260[0x0..0xa]
    |
    = help: this indicates a potential bug in the program: it performed an invalid operation, but the rules it violated are still experimental
    = help: see https://github.com/rust-lang/unsafe-code-guidelines/blob/master/wip/stacked-borrows.md for further information

    = note: inside `producer::Producer::<i8>::push_access::<[closure@src/producer.rs:152:30: 159:14]>` at src/producer.rs:99:19
note: inside `producer::Producer::<i8>::push` at src/producer.rs:152:13
   --> src/producer.rs:152:13
    |
152 | /             self.push_access(|slice, _| {
153 | |                 if !slice.is_empty() {
154 | |                     mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
155 | |                     1
...   |
158 | |                 }
159 | |             })
    | |______________^
note: inside `tests::access::discard` at src/tests/access.rs:245:9
   --> src/tests/access.rs:245:9
    |
245 |         prod.push(i).unwrap();
    |         ^^^^^^^^^^^^
note: inside closure at src/tests/access.rs:237:1
   --> src/tests/access.rs:237:1
    |
236 |   #[test]
    |   ------- in this procedural macro expansion
237 | / fn discard() {
238 | |     // Initialize ringbuffer, prod and cons
239 | |     let rb = RingBuffer::<i8>::new(10);
240 | |     let (mut prod, mut cons) = rb.split();
...   |
279 | |     assert_eq!(cons.pop(), Some(0));
280 | | }
    | |_^
    = note: this error originates in the attribute macro `test` (in Nightly builds, run with -Z macro-backtrace for more info)

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to previous error

error: test failed, to rerun pass '--lib'

LocalRb Producer type alias?

Hi @agerasev ,

Question for you: is there a type alias to use when using a LocalRb for Producer?

HeapRb has HeapProducer but cannot figure out the correct type for a Producer of LocalRb

const initializer for `LocalRb<T, [MaybeUninit<T>, N]>`

I want to use Mutex<LocalRb<T, [MaybeUninit<T>, N]>> with static variable but there is no initializer method for LocalRb<T, [MaybeUninit<T>, N]>. Could you provide something like LocalRb<T, [MaybeUninit<T>, N]>::new_const()?

PS: I found another crate that supports this

`StaticRb` backed by a static variable?

Is something like this safe?

static mut rb: StaticRb<u8, BUFFER_LEN> = StaticRb::<u8, BUFFER_LEN>::default();
let (mut prod, mut cons) = unsafe { rb.split_ref() };

My use-case is I want to have a ring buffer on a no std environment, and I would prefer not to use dynamic allocation if possible. The producer would be used by an interrupt handler, and the consumer used by the main task.

Feature request: HeapRb fallible allocation

Currently HeapRb::new uses Vec::resize_with, which will panic if an allocation fails.

I propose adding a second function HeapRb::try_new which will return Result<Self, std::collections::TryReserveError> and uses the same logic as HeapRb::new but additionally calls Vec::try_reserve before the call to Vec::resize_with.

This would be helpful for embedded development where memory is limited and allocations may fail.

v0.2.0-rc Async Close Not Waking

Seems like the master branch async ringbuf is not yet complete? There is an async index.rs file not yet included in lib.rs.

Currently, the close() of AsyncProd and AsyncCons does not call wake() which can cause some problem. I see that close() of AsyncIndex calls wake().

The v0.1.3 Async-Ringbuf also wakes the other on close().

When will this be implemented?

`std::io::Error::new()` allocates

In the Write and Read implementations, calling Error::new() unnecessarily allocates memory (AFAIK):

ringbuf/src/producer.rs

Lines 278 to 281 in ec34c53

Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Ring buffer is full",
))

ringbuf/src/consumer.rs

Lines 395 to 398 in ec34c53

Err(io::Error::new(
io::ErrorKind::WouldBlock,
"Ring buffer is empty",
))

To avoid this, std::io::ErrorKind::WouldBlock.into() can be used for error type conversion: https://doc.rust-lang.org/std/io/struct.Error.html#impl-From%3CErrorKind%3E

Intended for use for errors not exposed to the user, where allocating onto the heap (for normal construction via Error::new) is too costly.

Have a blocking variant

This crate is non-blocking: when the buffer is full/empty, it returns Err(WouldBlock). I'd like to have an option with which the buffer blocks until more data/space is available. I can emulate this from the outside, but only with busy waiting, which I'd like to avoid.

`HeapProd<T>` and `HeapCons<T>` are only `Send` when `T` is `Sync`

When attempting to upgrade from version 0.3.3 to 0.4.0 I ran into this problem, where the recommended HeapRb<T> now requires T to be Sync. The following code generates 2 similar compiler errors:

fn is_send<T: Send>() {}
is_send::<HeapCons<Cell<i32>>>();
is_send::<HeapProd<Cell<i32>>>();
error[E0277]: `std::cell::Cell<i32>` cannot be shared between threads safely
  --> adae\src\engine\utils\ringbuffer.rs:13:15
   |
13 |     is_send::<HeapCons<Cell<i32>>>();
   |               ^^^^^^^^^^^^^^^^^^^ `std::cell::Cell<i32>` cannot be shared between threads safely
   |
   = help: the trait `std::marker::Sync` is not implemented for `std::cell::Cell<i32>`, which is required by `ringbuf::wrap::caching::Caching<std::sync::Arc<ringbuf::SharedRb<ringbuf::storage::Heap<std::cell::Cell<i32>>>>, false, true>: std::marker::Send`
   = note: if you want to do aliasing and mutation between multiple threads, use `std::sync::RwLock` or `std::sync::atomic::AtomicI32` instead
   = note: required for `ringbuf::storage::Heap<std::cell::Cell<i32>>` to implement `std::marker::Sync`
note: required because it appears within the type `ringbuf::SharedRb<ringbuf::storage::Heap<std::cell::Cell<i32>>>`
  --> C:\Users\Holger\.cargo\registry\src\index.crates.io-6f17d22bba15001f\ringbuf-0.4.0\src\rb\shared.rs:47:12
   |
47 | pub struct SharedRb<S: Storage + ?Sized> {
   |            ^^^^^^^^
   = note: required for `std::sync::Arc<ringbuf::SharedRb<ringbuf::storage::Heap<std::cell::Cell<i32>>>>` to implement `std::marker::Send`
note: required because it appears within the type `ringbuf::wrap::frozen::Frozen<std::sync::Arc<ringbuf::SharedRb<ringbuf::storage::Heap<std::cell::Cell<i32>>>>, false, true>`
  --> C:\Users\Holger\.cargo\registry\src\index.crates.io-6f17d22bba15001f\ringbuf-0.4.0\src\wrap\frozen.rs:22:12
   |
22 | pub struct Frozen<R: RbRef, const P: bool, const C: bool> {
   |            ^^^^^^
note: required because it appears within the type `ringbuf::wrap::caching::Caching<std::sync::Arc<ringbuf::SharedRb<ringbuf::storage::Heap<std::cell::Cell<i32>>>>, false, true>`
  --> C:\Users\Holger\.cargo\registry\src\index.crates.io-6f17d22bba15001f\ringbuf-0.4.0\src\wrap\caching.rs:17:12
   |
17 | pub struct Caching<R: RbRef, const P: bool, const C: bool> {
   |            ^^^^^^^
note: required by a bound in `engine::utils::ringbuffer::test_is_sync::is_send`
  --> adae\src\engine\utils\ringbuffer.rs:12:19
   |
12 |     fn is_send<T: Send>() {}
   |                   ^^^^ required by this bound in `is_send`

This seems to originate from the use of Arc within HeapProd and HeapCons.

Is this expected behavior?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.