Code Monkey home page Code Monkey logo

atomic_queue's Introduction

C++14 MIT license Latest release Continuous Integrations
platform Linux x86_64 platform Linux ARM platform Linux RISC-V platform Linux PowerPC platform Linux IBM System/390

atomic_queue

C++14 multiple-producer-multiple-consumer lock-free queues based on circular buffer and std::atomic. Designed with a goal to minimize the latency between one thread pushing an element into a queue and another thread popping it from the queue.

It has been developed, tested and benchmarked on Linux, but should support any C++14 platforms which implement std::atomic. Reported as compatible with Windows, but the continuous integrations hosted by GitHub are currently set up only for x86_64 platform on Ubuntu-20.04 and Ubuntu-22.04. Pull requests to extend the continuous integrations to run on other architectures and/or platforms are welcome.

Design Principles

When minimizing latency a good design is not when there is nothing left to add, but rather when there is nothing left to remove, as these queues exemplify.

The main design principle these queues follow is minimalism, which results in such design choices as:

  • Bare minimum of atomic instructions. Inlinable by default push and pop functions can hardly be any cheaper in terms of CPU instruction number / L1i cache pressure.
  • Explicit contention/false-sharing avoidance for queue and its elements.
  • Linear fixed size ring-buffer array. No heap memory allocations after a queue object has constructed. It doesn't get any more CPU L1d or TLB cache friendly than that.
  • Value semantics. Meaning that the queues make a copy/move upon push/pop, no reference/pointer to elements in the queue can be obtained.

The impact of each of these small design choices on their own is barely measurable, but their total impact is much greater than a simple sum of the constituents' impacts, aka super-scalar compounding or synergy. The synergy emerging from combining multiple of these small design choices together is what allows CPUs to perform at their peak capacities least impeded.

These design choices are also limitations:

  • The maximum queue size must be set at compile time or construction time. The circular buffer side-steps the memory reclamation problem inherent in linked-list based queues for the price of fixed buffer size. See Effective memory reclamation for lock-free data structures in C++ for more details. Fixed buffer size may not be that much of a limitation, since once the queue gets larger than the maximum expected size that indicates a problem that elements aren't consumed fast enough, and if the queue keeps growing it may eventually consume all available memory which may affect the entire system, rather than the problematic process only. The only apparent inconvenience is that one has to do an upfront calculation on what would be the largest expected/acceptable number of unconsumed elements in the queue.
  • There are no OS-blocking push/pop functions. This queue is designed for ultra-low-latency scenarios and using an OS blocking primitive would be sacrificing push-to-pop latency. For lowest possible latency one cannot afford blocking in the OS kernel because the wake-up latency of a blocked thread is about 1-3 microseconds, whereas this queue's round-trip time can be as low as 150 nanoseconds.

Ultra-low-latency applications need just that and nothing more. The minimalism pays off, see the throughput and latency benchmarks.

Available containers are:

  • AtomicQueue - a fixed size ring-buffer for atomic elements.
  • OptimistAtomicQueue - a faster fixed size ring-buffer for atomic elements which busy-waits when empty or full. It is AtomicQueue used with push/pop instead of try_push/try_pop.
  • AtomicQueue2 - a fixed size ring-buffer for non-atomic elements.
  • OptimistAtomicQueue2 - a faster fixed size ring-buffer for non-atomic elements which busy-waits when empty or full. It is AtomicQueue2 used with push/pop instead of try_push/try_pop.

These containers have corresponding AtomicQueueB, OptimistAtomicQueueB, AtomicQueueB2, OptimistAtomicQueueB2 versions where the buffer size is specified as an argument to the constructor.

Totally ordered mode is supported. In this mode consumers receive messages in the same FIFO order the messages were posted. This mode is supported for push and pop functions, but for not the try_ versions. On Intel x86 the totally ordered mode has 0 cost, as of 2019.

Single-producer-single-consumer mode is supported. In this mode, no expensive atomic read-modify-write CPU instructions are necessary, only the cheapest atomic loads and stores. That improves queue throughput significantly.

Move-only queue element types are fully supported. For example, a queue of std::unique_ptr<T> elements would be AtomicQueue2B<std::unique_ptr<T>> or AtomicQueue2<std::unique_ptr<T>, CAPACITY>.

Role Models

Several other well established and popular thread-safe containers are used for reference in the benchmarks:

  • std::mutex - a fixed size ring-buffer with std::mutex.
  • pthread_spinlock - a fixed size ring-buffer with pthread_spinlock_t.
  • boost::lockfree::spsc_queue - a wait-free single-producer-single-consumer queue from Boost library.
  • boost::lockfree::queue - a lock-free multiple-producer-multiple-consumer queue from Boost library.
  • moodycamel::ConcurrentQueue - a lock-free multiple-producer-multiple-consumer queue used in non-blocking mode. This queue is designed to maximize throughput at the expense of latency and eschewing the global time order of elements pushed into one queue by different threads. It is not equivalent to other queues benchmarked here in this respect.
  • moodycamel::ReaderWriterQueue - a lock-free single-producer-single-consumer queue used in non-blocking mode.
  • xenium::michael_scott_queue - a lock-free multi-producer-multi-consumer queue proposed by Michael and Scott (this queue is similar to boost::lockfree::queue which is also based on the same proposal).
  • xenium::ramalhete_queue - a lock-free multi-producer-multi-consumer queue proposed by Ramalhete and Correia.
  • xenium::vyukov_bounded_queue - a bounded multi-producer-multi-consumer queue based on the version proposed by Vyukov.
  • tbb::spin_mutex - a locked fixed size ring-buffer with tbb::spin_mutex from Intel Threading Building Blocks.
  • tbb::concurrent_bounded_queue - eponymous queue used in non-blocking mode from Intel Threading Building Blocks.

Using the library

The containers provided are header-only class templates, no building/installing is necessary.

Install from GitHub

  1. Clone the project:
git clone https://github.com/max0x7ba/atomic_queue.git
  1. Add atomic_queue/include directory (use full path) to the include paths of your build system.
  2. #include <atomic_queue/atomic_queue.h> in your C++ source.

Install using vcpkg

vcpkg install atomic-queue

Benchmark build and run instructions

The containers provided are header-only class templates that require only #include <atomic_queue/atomic_queue.h>, no building/installing is necessary.

Building is necessary to run the tests and benchmarks.

git clone https://github.com/cameron314/concurrentqueue.git
git clone https://github.com/cameron314/readerwriterqueue.git
git clone https://github.com/mpoeter/xenium.git
git clone https://github.com/max0x7ba/atomic_queue.git
cd atomic_queue
make -r -j4 run_benchmarks

The benchmark also requires Intel TBB library to be available. It assumes that it is installed in /usr/local/include and /usr/local/lib. If it is installed elsewhere you may like to modify cppflags.tbb and ldlibs.tbb in Makefile.

API

The queue class templates provide the following member functions:

  • try_push - Appends an element to the end of the queue. Returns false when the queue is full.
  • try_pop - Removes an element from the front of the queue. Returns false when the queue is empty.
  • push (optimist) - Appends an element to the end of the queue. Busy waits when the queue is full. Faster than try_push when the queue is not full. Optional FIFO producer queuing and total order.
  • pop (optimist) - Removes an element from the front of the queue. Busy waits when the queue is empty. Faster than try_pop when the queue is not empty. Optional FIFO consumer queuing and total order.
  • was_size - Returns the number of unconsumed elements during the call. The state may have changed by the time the return value is examined.
  • was_empty - Returns true if the container was empty during the call. The state may have changed by the time the return value is examined.
  • was_full - Returns true if the container was full during the call. The state may have changed by the time the return value is examined.
  • capacity - Returns the maximum number of elements the queue can possibly hold.

Atomic elements are those, for which std::atomic<T>{T{}}.is_lock_free() returns true, and, when C++17 features are available, std::atomic<T>::is_always_lock_free evaluates to true at compile time. In other words, the CPU can load, store and compare-and-exchange such elements atomically natively. On x86-64 such elements are all the C++ standard arithmetic and pointer types.

The queues for atomic elements reserve one value to serve as an empty element marker NIL, its default value is 0. NIL value must not be pushed into a queue and there is an assert statement in push functions to guard against that in debug mode builds. Pushing NIL element into a queue in release mode builds results in undefined behaviour, such as deadlocks and/or lost queue elements.

Note that optimism is a choice of a queue modification operation control flow, rather than a queue type. An optimist push is fastest when the queue is not full most of the time, an optimistic pop - when the queue is not empty most of the time. Optimistic and not so operations can be mixed with no restrictions. The OptimistAtomicQueues in the benchmarks use only optimist push and pop.

See example.cc for a usage example.

TODO: full API reference.

Memory order of non-atomic loads and stores

push and try_push operations synchronize-with (as defined in std::memory_order) with any subsequent pop or try_pop operation of the same queue object. Meaning that:

  • No non-atomic load/store gets reordered past push/try_push, which is a memory_order::release operation. Same memory order as that of std::mutex::unlock.
  • No non-atomic load/store gets reordered prior to pop/try_pop, which is a memory_order::acquire operation. Same memory order as that of std::mutex::lock.
  • The effects of a producer thread's non-atomic stores followed by push/try_push of an element into a queue become visible in the consumer's thread which pop/try_pop that particular element.

Implementation Notes

Ring-buffer capacity

The available queues here use a ring-buffer array for storing elements. The capacity of the queue is fixed at compile time or construction time.

In a production multiple-producer-multiple-consumer scenario the ring-buffer capacity should be set to the maximum expected queue size. When the ring-buffer gets full it means that the consumers cannot consume the elements fast enough. A fix for that is any of:

  • Increase the queue capacity in order to handle temporary spikes of pending elements in the queue. This normally requires restarting the application after re-configuration/re-compilation has been done.
  • Increase the number of consumers to drain the queue faster. The number of consumers can be managed dynamically, e.g.: when a consumer observes that the number of elements pending in the queue keeps growing, that calls for deploying more consumer threads to drain the queue at a faster rate; mostly empty queue calls for suspending/terminating excess consumer threads.
  • Decrease the rate of pushing elements into the queue. push and pop calls always incur some expensive CPU cycles to maintain the integrity of queue state in atomic/consistent/isolated fashion with respect to other threads and these costs increase super-linearly as queue contention grows. Producer batching of multiple small elements or elements resulting from one event into one queue message is often a reasonable solution.

Using a power-of-2 ring-buffer array size allows a couple of important optimizations:

  • The writer and reader indexes get mapped into the ring-buffer array index using remainder binary operator % SIZE. Remainder binary operator % normally generates a division CPU instruction which isn't cheap, but using a power-of-2 size turns that remainder operator into one cheap binary and CPU instruction and that is as fast as it gets.
  • The element index within the cache line gets swapped with the cache line index, so that consecutive queue elements reside in different cache lines. This massively reduces cache line contention between multiple producers and multiple consumers. Instead of N producers together with M consumers competing on subsequent elements in the same ring-buffer cache line in the worst case, it is only one producer competing with one consumer (pedantically, when the number of CPUs is not greater than the number of elements that can fit in one cache line). This optimisation scales better with the number of producers and consumers, and element size. With low number of producers and consumers (up to about 2 of each in these benchmarks) disabling this optimisation may yield better throughput (but higher variance across runs).

The containers use unsigned type for size and internal indexes. On x86-64 platform unsigned is 32-bit wide, whereas size_t is 64-bit wide. 64-bit instructions utilise an extra byte instruction prefix resulting in slightly more pressure on the CPU instruction cache and the front-end. Hence, 32-bit unsigned indexes are used to maximise performance. That limits the queue size to 4,294,967,295 elements, which seems to be a reasonable hard limit for many applications.

While the atomic queues can be used with any moveable element types (including std::unique_ptr), for best throughput and latency the queue elements should be cheap to copy and lock-free (e.g. unsigned or T*), so that push and pop operations complete fastest.

Lock-free guarantees

Conceptually, a push or pop operation does two atomic steps:

  1. Atomically and exclusively claims the queue slot index to store/load an element to/from. That's producers incrementing head index, consumers incrementing tail index. Each slot is accessed by one producer and one consumer threads only.
  2. Atomically store/load the element into/from the slot. Producer storing into a slot changes its state to be non-NIL, consumer loading from a slot changes its state to be NIL. The slot is a spinlock for its one producer and one consumer threads.

These queues anticipate that a thread doing push or pop may complete step 1 and then be preempted before completing step 2.

An algorithm is lock-free if there is guaranteed system-wide progress. These queue guarantee system-wide progress by the following properties:

  • Each push is independent of any preceding push. An incomplete (preempted) push by one producer thread doesn't affect push of any other thread.
  • Each pop is independent of any preceding pop. An incomplete (preempted) pop by one consumer thread doesn't affect pop of any other thread.
  • An incomplete (preempted) push from one producer thread affects only one consumer thread poping an element from this particular queue slot. All other threads pops are unaffected.
  • An incomplete (preempted) pop from one consumer thread affects only one producer thread pushing an element into this particular queue slot while expecting it to have been consumed long time ago, in the rather unlikely scenario that producers have wrapped around the entire ring-buffer while this consumer hasn't completed its pop. All other threads pushs and pops are unaffected.

Preemption

Linux task scheduler thread preemption is something no user-space process should be able to affect or escape, otherwise any/every malicious application would exploit that.

Still, there are a few things one can do to minimize preemption of one's mission critical application threads:

  • Use real-time SCHED_FIFO scheduling class for your threads, e.g. chrt --fifo 50 <app>. A higher priority SCHED_FIFO thread or kernel interrupt handler can still preempt your SCHED_FIFO threads.
  • Use one same fixed real-time scheduling priority for all threads accessing same queue objects. Real-time threads with different scheduling priorities modifying one queue object may cause priority inversion and deadlocks. Using the default scheduling class SCHED_OTHER with its dynamically adjusted priorities defeats the purpose of using these queues.
  • Disable real-time thread throttling to prevent SCHED_FIFO real-time threads from being throttled.
  • Isolate CPU cores, so that no interrupt handlers or applications ever run on it. Mission critical applications should be explicitly placed on these isolated cores with taskset.
  • Pin threads to specific cores, otherwise the task scheduler keeps moving threads to other idle CPU cores to level voltage/heat-induced wear-and-tear accross CPU cores. Keeping a thread running on one same CPU core maximizes CPU cache hit rate. Moving a thread to another CPU core incurs otherwise unnecessary CPU cache thrashing.

People often propose limiting busy-waiting with a subsequent call to std::this_thread::yield()/sched_yield/pthread_yield. However, sched_yield is a wrong tool for locking because it doesn't communicate to the OS kernel what the thread is waiting for, so that the OS thread scheduler can never schedule the calling thread to resume at the right time when the shared state has changed (unless there are no other threads that can run on this CPU core, so that the caller resumes immediately). See notes section in man sched_yield and a Linux kernel thread about sched_yield and spinlocks for more details.

In Linux, there is mutex type PTHREAD_MUTEX_ADAPTIVE_NP which busy-waits a locked mutex for a number of iterations and then makes a blocking syscall into the kernel to deschedule the waiting thread. In the benchmarks it was the worst performer and I couldn't find a way to make it perform better, and that's the reason it is not included in the benchmarks.

On Intel CPUs one could use the 4 debug control registers to monitor the spinlock memory region for write access and wait on it using select (and its friends) or sigwait (see perf_event_open and uapi/linux/hw_breakpoint.h for more details). A spinlock waiter could suspend itself with select or sigwait until the spinlock state has been updated. But there are only 4 of these registers, so that such a solution wouldn't scale.

Benchmarks

View throughput and latency benchmarks charts.

Methodology

There are a few OS behaviours that complicate benchmarking:

  • CPU scheduler can place threads on different CPU cores each run. To avoid that the threads are pinned to specific CPU cores.
  • CPU scheduler can preempt threads. To avoid that real-time SCHED_FIFO priority 50 is used to disable scheduler time quantum expiry and make the threads non-preemptable by lower priority processes/threads.
  • Real-time thread throttling disabled.
  • Adverse address space randomisation may cause extra CPU cache conflicts, as well as other processes running on the system. To minimise effects of that benchmarks executable is run at least 33 times. The benchmark charts show the average; the standard deviation, minimum and maximum values are shown in the chart tooltips.

I only have access to a few x86-64 machines. If you have access to different hardware feel free to submit the output file of scripts/run-benchmarks.sh and I will include your results into the benchmarks page.

Huge pages

When huge pages are available the benchmarks use 1x1GB or 16x2MB huge pages for the queues to minimise TLB misses. To enable huge pages do one of:

sudo hugeadm --pool-pages-min 1GB:1
sudo hugeadm --pool-pages-min 2MB:16

Alternatively, you may like to enable transparent hugepages in your system and use a hugepage-aware allocator, such as tcmalloc.

Real-time thread throttling

By default, Linux scheduler throttles real-time threads from consuming 100% of CPU and that is detrimental to benchmarking. Full details can be found in Real-Time group scheduling. To disable real-time thread throttling do:

echo -1 | sudo tee /proc/sys/kernel/sched_rt_runtime_us >/dev/null

Throughput and scalability benchmark

N producer threads push a 4-byte integer into one same queue, N consumer threads pop the integers from the queue. All producers posts 1,000,000 messages in total. Total time to send and receive all the messages is measured. The benchmark is run for from 1 producer and 1 consumer up to (total-number-of-cpus / 2) producers/consumers to measure the scalability of different queues.

Ping-pong benchmark

One thread posts an integer to another thread through one queue and waits for a reply from another queue (2 queues in total). The benchmarks measures the total time of 100,000 ping-pongs, best of 10 runs. Contention is minimal here (1-producer-1-consumer, 1 element in the queue) to be able to achieve and measure the lowest latency. Reports the average round-trip time.

Contributing

Contributions are more than welcome. .editorconfig and .clang-format can be used to automatically match code formatting.

Reading material

Some books on the subject of multi-threaded programming I found instructive:

  • Programming with POSIX Threads by David R. Butenhof.
  • The Art of Multiprocessor Programming by Maurice Herlihy, Nir Shavit.

Copyright (c) 2019 Maxim Egorushkin. MIT License. See the full licence in file LICENSE.

atomic_queue's People

Contributors

andriy06 avatar jcelerier avatar jhasse avatar jpcima avatar jwakely avatar max0x7ba avatar mpoeter avatar musicinmybrain avatar paulfd avatar redskittlefox avatar xeonacid avatar xiejiss avatar yvan-xy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

atomic_queue's Issues

spin-locking

I'd like to raise a concern about spin-locking in atomic_queue implementation. I read README section about this, but I'm still not convinced. Let me describe an issue I saw:
An NDA platform with a non-fair OS scheduler that runs on millions of consumer devices. We had a spin-lock that was using _mm_pause/__yield instructions, same as atomic_queue. Which resulted in a low but statistically signficant number of deadlocks on only that platform. All cores were occupied by spinning threads, while the thread that was supposed to unlock them was never re-scheduled. The culprit here is a non-fair scheduler. It was giving a CPU core to a thread until it got blocked (e.g. on a mutex) or a higher-priority thread got ready for execution.
The fix was to add std::this_thread::yield() (not exactly but doing the same) into spin-locking. This solved the problem because while it wasn't communicating to the OS which thread it waits for, it was freeing a CPU core, and that was enough to eventually schedule the unlocking thread.
Now, std::this_thread::yield() is order of magnitudes slower than _mm_pause, at least on multiple platforms where I had a chance to benchmark it. A typical solution is to use a hybrid approach: spin on _mm_pause/__yield for a while (hundreds of times) for low-latency response and only then switch to spinning on std::this_thread::yield() as the last resort. This way it doesn't matter how slow it is.
To be clear, I don't think it's a reliable solution. I suspect it can be still vulnerable to the priority inversion problem, though I don't have data to back this claim. To avoid priority inversion, a proper blocking is required. So the most versatile solution I can think of would be:

  • spin on _mm_pause for N iterations;
  • spin on std::this_thread::yield() for another M iterations;
  • block the thread, e.g. by std::atomic<T>::wait(). right, I remember this library is C++14, so it would be rather std::mutex.
    The cost is that unlocking thread would need to check every time if there's a waiting thread that needs to be notified (a relaxed load), and 8 bytes of memory. The implementation would be a bit convoluted though. So, from practical point of view, I'd go with just adding std::this_thread::yield() to the spinning. it's simple enough, costs nothing and solves a real problem in many real cases.

Questions about evaluation graphs

  1. In the 'A wait-free queue as fast as fetch-and-add paper', it shows the performances nearly 100 mops/seconds. But, the benchmark in this repo. shows 10 msgs/seconds (in case of XEON).
    I know the environments are differ each others. But, why are they making differences?

  2. what is the major difference between AtomicQueue and OptimistAtomicQueue regarding to performance? It has a little change in the code, but it shows the performance large gap.

  3. Did you check the performance comparison with LRCQ?

  • LRCQ: Fast Concurrent Queues for x86 Processors, PPOPP '13

image -- figure in wait-free queue paper

Thanks...
Junghan

Ordering guarantees

I'm just about to check out your queue. I'm currently using MoodyCamel ConcurrentQueue, but in one of my projects I now come across the limitation that there are no ordering guarantees.

Are items enqueued to atomic_queue dequeued in (almost) the same order?

could you please add some user guide for beginners?

Hi, after check the benchmark page, I know this is another great high performance lock-free queue.
If I want to use this atomic_queue in my project, I found it's hard for me. It's better if you can provide a simple guide for beginners how to use atomic_queue just like moodycamel::ConcurrentQueue's doc include some basic use guide.
Thanks!

Unbounded queue possible with atomic_queue?

I'm looking to replace folly::UnboundedQueue. Is it possible to orchestrate multiple atomic_queues to basically grow without a fixed size limit? Where e.g. the last item in the queue point to a new fixed sized queue and is thrown away when poppen and created dynamically when pushed at the last index in the queue?

'try_push' variant for move-only types

I adapted the queue so that it's compatible with move-only types (std::unique_ptr for example)

I made minor modifications here and there (adding std::move on some assignments) and found that it would be usefull to have a try_push variant for moveable types, because if the push fails, for moveable types, the moved object is lost. In the variant I propose, the moved object is returned to the caller when the push fails:

    template<class T>
    std::optional<T> try_push_moveable(T&& element) noexcept {
        auto head = head_.load(X);
        if(Derived::spsc_) {
            if(static_cast<int>(head - tail_.load(X)) >= static_cast<int>(static_cast<Derived&>(*this).size_))
                return {std::forward<T>(element)};
            head_.store(head + 1, X);
        }
        else {
            do {
                if(static_cast<int>(head - tail_.load(X)) >= static_cast<int>(static_cast<Derived&>(*this).size_))
                    return {std::forward<T>(element)};
            } while(ATOMIC_QUEUE_UNLIKELY(!head_.compare_exchange_strong(head, head + 1, A, X))); // This loop is not FIFO.
        }

        static_cast<Derived&>(*this).do_push(std::forward<T>(element), head);
        return {};
    }

I'll submit a full pull request when I find some time to do it.

Implementations for ppc64le and s390x?

I’ve packaged this library for Fedora Linux as part of an effort to eventually package sfizz.

Both ppc64le (little-endian 64-bit PowerPC) and s390x (Linux on IBM Z) are supported primary architectures in Fedora. It’s fine that they aren’t supported here—we can just ensure that atomic_queue and any packages that depend on it aren’t built on those architectures.

Still, I wonder—is supporting these two architectures easy or difficult?

Unfortunately, I’m not entirely confident in the exact requirements of the spin_loop_pause() function, nor am I particularly familiar with the nuances of those two architectures, so it’s not easy for me to contribute implementations myself.

Optimization proposal

Hello,

I have a suggestion to improve the performance of the queues that are dynamically sized:

Each time an element is pushed or poped, there is a call to a modulo operator to put head and tail in the [0,size) range.

This modulo might be very fast in the case where the size is a constant power of 2 known at compile time (if the compiler converts this to a bitwise operation), but when the size is set a run-time, this modulo operator might be expensive...

Instead of using modulo we could simply keep head and tail in the range [0,size) (i.e reset them to 0 when they become equal to size).

how to put struct to queue

Can anyone help?

typedef struct {
    int a;
    char b[32];
} UserData;

compile error when:

constexpr unsigned CAPACITY = 1024;
AtomicQueue<UserData, CAPACITY> q; 

C++17 is needed in xenium

Thanks for the amazing work! While following the installation steps, in one of the required dependency, i.e. xenium, https://github.com/mpoeter/xenium currently only support C++17.

Can you suggest the previous version/commit you used for the benchmarks (with C++14)?

Could you add more apis such as Front() and PushFront()?

Dear author,
I have a requirement such as to inquire the first element of queue but not pop it. It's just like what Front() does in std::vector.
Also I want to push one element in the front of queue.
Do you think these two functions Front and PushFront can be added in? Thank you.

why compare_exchange_strong() in a loop?

out of curiousity, why compare_exchange_strong() is used in loops where I believe compare_exchange_weak() would be sufficient? based on benchmarks results? do you still have them or remember details?

and while I have you here :) why platform dependent definition of CACHE_LINE_SIZE? any known issues with std::hardware_destructive_interference_size?

thanks

The queue does not accept NIL value.

Hello @max0x7ba

I have a shared fixed-size buffer whose element's data type is a big size record. Instead of store elements of buffer directly to the queue directly, I just store indices of the buffer. One issue I have with the queue is, push() and try_push() do not accept a zero value which is the first index of the buffer.
I want to know if there is any way to make the queue accept a zero value.

Thanks

Benchmark comments

Can you please add some of your comments on the webpage? What do you think? Which to use for each case? And maybe an explanation of the difference in results if possible?

Consider adding moodycamel::ConcurrentQueue to the benchmarks

Hi,

Currently you only have the moodycamel::ReaderWriterQueue incorporated in the benchmarks - Is there any specific reason to exclude moodycamel::ConcurrentQueue? It's also a lock-free multiple-producer-multiple-consumer queue, just like this one.

I think it would be a great addition!

Question: busy waiting?

This repo is great.
But, what happens if a thread is waiting to enqueue/dequeue for more than say 100ms?
In a busy-waiting scenario, isn't that hogging a lot of CPU resources, which could be otherwise spent by another thread?
In which case, one might argue that a mutex-cvar based queue which puts threads to sleep might be more appropriate.
Is there a way to hybridize this?
So, would there be a way for a thread to busy-wait for say any time less than 1 micro second, then go to sleep for any time after that?

AtomicQueueB2 size does not work

I would expect the following :

atomic_queue::RetryDecorator<atomic_queue::AtomicQueueB2<float>> q(10);
    
    for (size_t i = 0 ; i < 100 ; i++)
    {
        cout << "enqueuing " << i << endl;
        q.push(i);
    }

to only print up to 10 and then hang. But it doesn't. it goes all the way up to 100

Throttling when idle

Is it possible to throttle CPU usage when the queue is basically idle? Like exponentially increase pause time until some upper limit in case no data is inflight?

Low latency programming?

I'm looking for a resource to learn low latency programming so I can write libraries such as these. Can you please recommend a book?

Please document the meaning of the NIL template parameter clearly

I tried out the atomic_queue today in the context of a bigger project. The first local tests where promising, but then some of our CI tests failed. It took me a few hours to finally figure out that the reason was, that under some circumstances a default constructed element was pushed into the queue which then compared to NIL. I wasn't aware of the meaning of that third template argument and only found a description of it in an old issue.

I was about to revert all the changes and use another queue implementation because of this. Now after setting it to a different value, everything works fine. A small hint in the readme, explaining this important parameter or some documentation in the code would have saved me some hours of working time. Could you please document that somewhere?

Problem of using atomic_queue with debug-version of malloc and free.

The problem occurs when I use the queue under DEBUG mode with memleak checking in MSVC. The excerpt of code like below:

#ifdef _DEBUG
#define _CRTDBG_MAP_ALLOC
#include <stdlib.h>
#include <crtdbg.h>

#undef THIS_FILE
static char THIS_FILE[] = __FILE__;
#define new new(_NORMAL_BLOCK, THIS_FILE, __LINE__)
#endif

class TestAtomicQueue {
	atomic_queue::AtomicQueue<int, 1, -1> m_queue;
};

int main()
{
#ifdef _DEBUG
	_CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF);
#endif
	auto pObject = new TestAtomicQueue;
	delete pObject; // <-- broken here
}

CPU load at 100%

Hi,

I'm writing a high-performance consumer of real-time messages and need a thread-safe, fast queue.
I've implemented the AtomicQueueB in my application and noticed that one or more CPUs are always at 100% load during runtime.
When running the example.cc with
unsigned constexpr N = std::numeric_limits<unsigned>::max() - 10; // Pass this many elements from producers to consumers.
I noticed similar behaviour.

Is this normal? If no, any ideas on how to set up the queue so it doesn't exhibit the load?
I didn't disable real-time thread throttling.

Also, if the tasks are pinned to a specific core, is it possible to disable this? My application needs to run in the cloud (only virtual cores available there...)

Compiled with g++ 9.3.1 with C++14

Bugfix : try_push blocks

Hello,

Using the following queue:

  using Queue = atomic_queue::AtomicQueueB2<
  /* T = */ float,
  /* A = */ std::allocator<float>,
  /* MAXIMIZE_THROUGHPUT */ true,
  /* TOTAL_ORDER = */ true,
  /* SPSC = */ true
  >;

when the queue is full, try_push blocks.

A fix for this behaviour is here : OlivierSohn/cpp.algorithms@37ccb8d

I verified that it solves my use case.

Feel free to use it!

Question: memory order in fetch_add()

In the push and pop functions, the memory order of fetch_add is set to memory_order_acquire if total order is not required. In that case, would it be wrong to set the memory order to relaxed? My reasoning is that fetch_add is not synchronized with any store, and also there is a data dependency between this evaluation and the do_push()/do_pop() that follows in the next line, so I don't see a risk of reading the wrong 'head' due to reordering of some read operation after fetch_add. I may be wrong and lacking some fundamental understanding of memory order, but I can’t seem to come up with any scenario where using relaxed memory order leads to error.

Elaborate in docs what "atomic elements" means.

I found this in source,
but it was not obvious from reading the docs:

assert(std::atomic<T>{NIL}.is_lock_free()); // This queue is for atomic elements only. AtomicQueue2 is for non-atomic ones.

I for one was thinking that maybe the T itself needs to be std::atomic<SomeOtherT>.

P.S. Not sure if that assert could be static_assert, IDK if there can be a type that is_lock_free but not constructible at compile time.

build failure with gcc-13

atomic_queue fails to build from source with Gcc 13 on today's Debian sid. The relevant part of the error message starts like:

In file included from /usr/include/c++/13/bits/atomic_base.h:39,
                 from /usr/include/c++/13/atomic:41,
                 from include/atomic_queue/defs.h:7,
                 from include/atomic_queue/atomic_queue.h:7,
                 from /<<PKGBUILDDIR>>/src/benchmarks.cc:5:
In function ‘std::_Require<std::__not_<std::__is_tuple_like<_Tp> >, std::is_move_constructible<_Tp>, std::is_move_assignable<_Tp> > std::swap(_Tp&, _Tp&) [with _Tp = thread::id]’,
    inlined from ‘void std::thread::swap(std::thread&)’ at /usr/include/c++/13/bits/std_thread.h:193:16,
    inlined from ‘std::thread& std::thread::operator=(std::thread&&)’ at /usr/include/c++/13/bits/std_thread.h:187:11,
    inlined from ‘{anonymous}::cycles_t {anonymous}::benchmark_throughput(atomic_queue::HugePages&, const std::vector<unsigned int>&, unsigned int, unsigned int, bool, sum_t*) [with Queue = atomic_queue::RetryDecorator<CapacityToConstructor<atomic_queue::AtomicQueueB<unsigned int, atomic_queue::HugePageAllocator<unsigned int>, 0, false, false, true>, 65536> >]’ at /<<PKGBUILDDIR>>/src/benchmarks.cc:252:43,
    inlined from ‘void {anonymous}::run_throughput_benchmark(const char*, atomic_queue::HugePages&, const std::vector<unsigned int>&, unsigned int, unsigned int, unsigned int) [with Queue = atomic_queue::RetryDecorator<CapacityToConstructor<atomic_queue::AtomicQueueB<unsigned int, atomic_queue::HugePageAllocator<unsigned int>, 0, false, false, true>, 65536> >]’ at /<<PKGBUILDDIR>>/src/benchmarks.cc:292:60:
/usr/include/c++/13/bits/move.h:198:7: error: array subscript 2 is outside array bounds of ‘long long int [1]’ [-Werror=array-bounds=]
  198 |       __a = _GLIBCXX_MOVE(__b);
      |       ^~~
In file included from /usr/include/x86_64-linux-gnu/c++/13/bits/c++allocator.h:33,
                 from /usr/include/c++/13/bits/allocator.h:46,
                 from /usr/include/c++/13/memory:65,
                 from include/atomic_queue/atomic_queue.h:13:
In member function ‘_Tp* std::__new_allocator<_Tp>::allocate(size_type, const void*) [with _Tp = long long int]’,
    inlined from ‘static _Tp* std::allocator_traits<std::allocator<_Tp1> >::allocate(allocator_type&, size_type) [with _Tp = long long int]’ at /usr/include/c++/13/bits/alloc_traits.h:482:28,
    inlined from ‘std::_Vector_base<_Tp, _Alloc>::pointer std::_Vector_base<_Tp, _Alloc>::_M_allocate(std::size_t) [with _Tp = long long int; _Alloc = std::allocator<long long int>]’ at /usr/include/c++/13/bits/stl_vector.h:378:33,
    inlined from ‘std::_Vector_base<_Tp, _Alloc>::pointer std::_Vector_base<_Tp, _Alloc>::_M_allocate(std::size_t) [with _Tp = std::thread; _Alloc = std::allocator<std::thread>]’ at /usr/include/c++/13/bits/stl_vector.h:375:7,
    inlined from ‘void std::_Vector_base<_Tp, _Alloc>::_M_create_storage(std::size_t) [with _Tp = std::thread; _Alloc = std::allocator<std::thread>]’ at /usr/include/c++/13/bits/stl_vector.h:395:44,
    inlined from ‘std::_Vector_base<_Tp, _Alloc>::_Vector_base(std::size_t, const allocator_type&) [with _Tp = std::thread; _Alloc = std::allocator<std::thread>]’ at /usr/include/c++/13/bits/stl_vector.h:332:26,
    inlined from ‘std::vector<_Tp, _Alloc>::vector(size_type, const allocator_type&) [with _Tp = std::thread; _Alloc = std::allocator<std::thread>]’ at /usr/include/c++/13/bits/stl_vector.h:554:47,
    inlined from ‘{anonymous}::cycles_t {anonymous}::benchmark_throughput(atomic_queue::HugePages&, const std::vector<unsigned int>&, unsigned int, unsigned int, bool, sum_t*) [with Queue = atomic_queue::RetryDecorator<CapacityToConstructor<atomic_queue::AtomicQueueB<unsigned int, atomic_queue::HugePageAllocator<unsigned int>, 0, false, false, true>, 65536> >]’ at /<<PKGBUILDDIR>>/src/benchmarks.cc:245:30,
    inlined from ‘void {anonymous}::run_throughput_benchmark(const char*, atomic_queue::HugePages&, const std::vector<unsigned int>&, unsigned int, unsigned int, unsigned int) [with Queue = atomic_queue::RetryDecorator<CapacityToConstructor<atomic_queue::AtomicQueueB<unsigned int, atomic_queue::HugePageAllocator<unsigned int>, 0, false, false, true>, 65536> >]’ at /<<PKGBUILDDIR>>/src/benchmarks.cc:292:60:
/usr/include/c++/13/bits/new_allocator.h:147:55: note: at offset 16 into object of size 8 allocated by ‘operator new’
  147 |         return static_cast<_Tp*>(_GLIBCXX_OPERATOR_NEW(__n * sizeof(_Tp)));
      |                                                       ^

This issue has initially been reported in Debian bug#1037715. I have been unable to identify what exactly in the porting guide to gcc-13 was relevant to the error at play.

Ever incresing queue size

Hi, I am trying to use the queue in my project, where opencv frames are queued. I needed to store a queue size of 5(for most recent frames), but what I observe is that if the consumer thread is not activated, and the producer will keep pushing the data in queue which leads to the size of queue ever increasing. At the rate of 30fps, my queue size is crossing 1gb in just few min, instead of few mb(s) for just 5 frames.

Code snippets are as below:

  1. Declaration:
    atomic_queue::AtomicQueueB2<FramePacket> m_LiveFrameBuff{5};
  2. Producer call:
    auto temp = framePacket;
    m_LiveFrameBuff.push(temp);
  1. Consumer call:
  void FrameBuffManager::getLiveFrame(FramePacket &ret)
{
   funcname();
   if (m_stopworkers.load())
       return;
   ret = m_LiveFrameBuff.pop();
   return;
}
  1. Size print check
std::cout << m_LiveFrameBuff.was_size() << std::endl;

The readme also mentions about other available queues like OptimistAtomicQueue2, but I am unable to find/use it. Can more simpler use case examples be provided for the respective available containers. It will be really helpful.

Create a queue with object ptrs as queue elements.

I am trying to create a queue with Class Object pointers as elements of Queue for my exploration and taken sample code from
src/example.c. The example.c uses uint32_t and also -1 as the special value that cannot be pushed/popped and what could
be it for queue of pointers of objects ?
For eg:

class file :
class Room {
    public:
        double length;
        double breadth;
        double height;

        double calculateArea(){
            return length * breadth;
        }

        double calculateVolume(){
            return length * breadth * height;
        }

};

main file : 
#include "atomic_queue/atomic_queue.h"

#include "Element.h"
#include <thread>
#include <cstdint>
#include <chrono>
#include <iostream>

int main() {
    using namespace std::chrono_literals;
    int constexpr PRODUCERS = 1; // Number of producer threads.
    int constexpr CONSUMERS = 1; // Number of consumer threads.
    unsigned constexpr N = 500; // Each producer pushes this many elements into the queue.
    unsigned constexpr CAPACITY = 400; // Queue capacity. Since there are more consumers than producers the queue doesn't need to be large.

    using Element = Room *; // Queue element type
    Element constexpr NILELEMENT = reinterpret_cast<Element>(0xdeaddead);
//    static constexpr Element invalid1 = reinterpret_cast<Element>(0xdeadead);
    //Element constexpr NILELEMENT = static_cast<Element>(&room); // Atomic elements require a special value that cannot be pushed/popped.
    using Queue = atomic_queue::AtomicQueueB<Element, std::allocator<Element>, NILELEMENT>; // Use heap-allocated buffer.

    // Create a queue object shared between all producers and consumers.
    Queue q{CAPACITY};

    // Start the consumers.
    uint64_t sums[CONSUMERS];
    std::thread consumers[CONSUMERS];
    for(int i = 0; i < CONSUMERS; ++i)
        consumers[i] = std::thread([&q, &sum = sums[i]]() {
            uint64_t s = 0; // New object with automatic storage duration. Not aliased or false-shared by construction
	    auto n = q.pop();
            while(n != NILELEMENT) { // Break the loop when 0 is pop'ed. 
                free(n);
		n = q.pop();
		s++;
		std::cout << " Consuming here " << n << " " << s << std::endl << std::flush;
		std::this_thread::sleep_for(20ms);
	    }
            // Store into sum only once because it is element of sums array, false-sharing the same cache line with other threads.
            // Updating sum in the loop above saturates the inter-core bus with cache coherence protocol messages.
            std::cout << " Done with Consumer " << std::endl << std::flush;
            sum = s;
        });

    // Start the producers.
    std::thread producers[PRODUCERS];
    for(int i = 0; i < PRODUCERS; ++i)
        producers[i] = std::thread([&q, &NILELEMENT]() {
            // Each producer pushes range [1, N] elements into the queue.
            // Ascending order [1, N] requires comparing with N at each loop iteration. Ascending order isn't necessary here.
            // Push elements in descending order, range [N, 1] with step -1, so that CPU decrement instruction sets zero/equal flag
            // when 0 is reached, which breaks the loop without having to compare n with N at each iteration.
            for(auto n = N; n > 0; --n) {
                q.push(new Room());
	    }
	    q.push(NILELEMENT);
	    std::cout << "Done with pushing " << std::endl << std::flush;
        });

    // Wait till producers have terminated.
    for(auto& t : producers)
        t.join();

    // Tell consumers to terminate by pushing one 0 element for each consumer.
    for(int i = CONSUMERS; i--;)
        q.push(nullptr);
    // Wait till consumers have terminated.
    for(auto& t : consumers)
        t.join();
    // When all consumers have terminated the queue is empty.

    // Sum up consumer's received elements sums.
    uint64_t total_sum = 0;
    for(auto& sum : sums) {
        total_sum += sum;
        if(!sum) // Verify that each consumer received at least one element.
            std::cerr << "WARNING: consumer " << (&sum - sums) << " received no elements.\n";
	else
	    std::cerr << "CONSUMER CONSUMED ALL ELEMENTS \n";
    }

    // Verify that each element has been pop'ed exactly once; not corrupted, dropped or duplicated.
    uint64_t constexpr expected_total_sum =  N * PRODUCERS;
    if(int64_t total_sum_diff = total_sum - expected_total_sum) {
        std::cerr << "ERROR: unexpected total_sum difference " << total_sum_diff << '\n';
        return EXIT_FAILURE;
    }

    std::cout << "Successfully completed " << std::endl;
    return EXIT_SUCCESS;
}

I am unable to get this compiled,

   src/example.cc: In function ‘int main()’:
   src/example.cc:18:36: error: ‘reinterpret_cast’ from integer to pointer
   18 |     Element constexpr NILELEMENT = reinterpret_cast<Element>(0xdeaddead);

I have also tried with static_cast and nothing works.

I expect this generic queue to support any datatype (pointer). Any help is appreciated to get this compiled..

Thanks in advance.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.