Code Monkey home page Code Monkey logo

concurrentqueue's Introduction

moodycamel::ConcurrentQueue

An industrial-strength lock-free queue for C++.

Note: If all you need is a single-producer, single-consumer queue, I have one of those too.

Features

  • Knock-your-socks-off blazing fast performance.
  • Single-header implementation. Just drop it in your project.
  • Fully thread-safe lock-free queue. Use concurrently from any number of threads.
  • C++11 implementation -- elements are moved (instead of copied) where possible.
  • Templated, obviating the need to deal exclusively with pointers -- memory is managed for you.
  • No artificial limitations on element types or maximum count.
  • Memory can be allocated once up-front, or dynamically as needed.
  • Fully portable (no assembly; all is done through standard C++11 primitives).
  • Supports super-fast bulk operations.
  • Includes a low-overhead blocking version (BlockingConcurrentQueue).
  • Exception safe.

Reasons to use

There are not that many full-fledged lock-free queues for C++. Boost has one, but it's limited to objects with trivial assignment operators and trivial destructors, for example. Intel's TBB queue isn't lock-free, and requires trivial constructors too. There're many academic papers that implement lock-free queues in C++, but usable source code is hard to find, and tests even more so.

This queue not only has less limitations than others (for the most part), but it's also faster. It's been fairly well-tested, and offers advanced features like bulk enqueueing/dequeueing (which, with my new design, is much faster than one element at a time, approaching and even surpassing the speed of a non-concurrent queue even under heavy contention).

In short, there was a lock-free queue shaped hole in the C++ open-source universe, and I set out to fill it with the fastest, most complete, and well-tested design and implementation I could. The result is moodycamel::ConcurrentQueue :-)

Reasons not to use

The fastest synchronization of all is the kind that never takes place. Fundamentally, concurrent data structures require some synchronization, and that takes time. Every effort was made, of course, to minimize the overhead, but if you can avoid sharing data between threads, do so!

Why use concurrent data structures at all, then? Because they're gosh darn convenient! (And, indeed, sometimes sharing data concurrently is unavoidable.)

My queue is not linearizable (see the next section on high-level design). The foundations of its design assume that producers are independent; if this is not the case, and your producers co-ordinate amongst themselves in some fashion, be aware that the elements won't necessarily come out of the queue in the same order they were put in relative to the ordering formed by that co-ordination (but they will still come out in the order they were put in by any individual producer). If this affects your use case, you may be better off with another implementation; either way, it's an important limitation to be aware of.

My queue is also not NUMA aware, and does a lot of memory re-use internally, meaning it probably doesn't scale particularly well on NUMA architectures; however, I don't know of any other lock-free queue that is NUMA aware (except for SALSA, which is very cool, but has no publicly available implementation that I know of).

Finally, the queue is not sequentially consistent; there is a happens-before relationship between when an element is put in the queue and when it comes out, but other things (such as pumping the queue until it's empty) require more thought to get right in all eventualities, because explicit memory ordering may have to be done to get the desired effect. In other words, it can sometimes be difficult to use the queue correctly. This is why it's a good idea to follow the samples where possible. On the other hand, the upside of this lack of sequential consistency is better performance.

High-level design

Elements are stored internally using contiguous blocks instead of linked lists for better performance. The queue is made up of a collection of sub-queues, one for each producer. When a consumer wants to dequeue an element, it checks all the sub-queues until it finds one that's not empty. All of this is largely transparent to the user of the queue, however -- it mostly just worksTM.

One particular consequence of this design, however, (which seems to be non-intuitive) is that if two producers enqueue at the same time, there is no defined ordering between the elements when they're later dequeued. Normally this is fine, because even with a fully linearizable queue there'd be a race between the producer threads and so you couldn't rely on the ordering anyway. However, if for some reason you do extra explicit synchronization between the two producer threads yourself, thus defining a total order between enqueue operations, you might expect that the elements would come out in the same total order, which is a guarantee my queue does not offer. At that point, though, there semantically aren't really two separate producers, but rather one that happens to be spread across multiple threads. In this case, you can still establish a total ordering with my queue by creating a single producer token, and using that from both threads to enqueue (taking care to synchronize access to the token, of course, but there was already extra synchronization involved anyway).

I've written a more detailed overview of the internal design, as well as the full nitty-gritty details of the design, on my blog. Finally, the source itself is available for perusal for those interested in its implementation.

Basic use

The entire queue's implementation is contained in one header, concurrentqueue.h. Simply download and include that to use the queue. The blocking version is in a separate header, blockingconcurrentqueue.h, that depends on concurrentqueue.h and lightweightsemaphore.h. The implementation makes use of certain key C++11 features, so it requires a relatively recent compiler (e.g. VS2012+ or g++ 4.8; note that g++ 4.6 has a known bug with std::atomic and is thus not supported). The algorithm implementations themselves are platform independent.

Use it like you would any other templated queue, with the exception that you can use it from many threads at once :-)

Simple example:

#include "concurrentqueue.h"

moodycamel::ConcurrentQueue<int> q;
q.enqueue(25);

int item;
bool found = q.try_dequeue(item);
assert(found && item == 25);

Description of basic methods:

  • ConcurrentQueue(size_t initialSizeEstimate) Constructor which optionally accepts an estimate of the number of elements the queue will hold
  • enqueue(T&& item) Enqueues one item, allocating extra space if necessary
  • try_enqueue(T&& item) Enqueues one item, but only if enough memory is already allocated
  • try_dequeue(T& item) Dequeues one item, returning true if an item was found or false if the queue appeared empty

Note that it is up to the user to ensure that the queue object is completely constructed before being used by any other threads (this includes making the memory effects of construction visible, possibly via a memory barrier). Similarly, it's important that all threads have finished using the queue (and the memory effects have fully propagated) before it is destructed.

There's usually two versions of each method, one "explicit" version that takes a user-allocated per-producer or per-consumer token, and one "implicit" version that works without tokens. Using the explicit methods is almost always faster (though not necessarily by a huge factor). Apart from performance, the primary distinction between them is their sub-queue allocation behaviour for enqueue operations: Using the implicit enqueue methods causes an automatically-allocated thread-local producer sub-queue to be allocated. Explicit producers, on the other hand, are tied directly to their tokens' lifetimes (but are recycled internally).

In order to avoid the number of sub-queues growing without bound, implicit producers are marked for reuse once their thread exits. However, this is not supported on all platforms. If using the queue from short-lived threads, it is recommended to use explicit producer tokens instead.

Full API (pseudocode):

# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool

# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool

# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t

# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t

# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t

Blocking version

As mentioned above, a full blocking wrapper of the queue is provided that adds wait_dequeue and wait_dequeue_bulk methods in addition to the regular interface. This wrapper is extremely low-overhead, but slightly less fast than the non-blocking queue (due to the necessary bookkeeping involving a lightweight semaphore).

There are also timed versions that allow a timeout to be specified (either in microseconds or with a std::chrono object).

The only major caveat with the blocking version is that you must be careful not to destroy the queue while somebody is waiting on it. This generally means you need to know for certain that another element is going to come along before you call one of the blocking methods. (To be fair, the non-blocking version cannot be destroyed while in use either, but it can be easier to coordinate the cleanup.)

Blocking example:

#include "blockingconcurrentqueue.h"

moodycamel::BlockingConcurrentQueue<int> q;
std::thread producer([&]() {
    for (int i = 0; i != 100; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(i % 10));
        q.enqueue(i);
    }
});
std::thread consumer([&]() {
    for (int i = 0; i != 100; ++i) {
        int item;
        q.wait_dequeue(item);
        assert(item == i);
        
        if (q.wait_dequeue_timed(item, std::chrono::milliseconds(5))) {
            ++i;
            assert(item == i);
        }
    }
});
producer.join();
consumer.join();

assert(q.size_approx() == 0);

Advanced features

Tokens

The queue can take advantage of extra per-producer and per-consumer storage if it's available to speed up its operations. This takes the form of "tokens": You can create a consumer token and/or a producer token for each thread or task (tokens themselves are not thread-safe), and use the methods that accept a token as their first parameter:

moodycamel::ConcurrentQueue<int> q;

moodycamel::ProducerToken ptok(q);
q.enqueue(ptok, 17);

moodycamel::ConsumerToken ctok(q);
int item;
q.try_dequeue(ctok, item);
assert(item == 17);

If you happen to know which producer you want to consume from (e.g. in a single-producer, multi-consumer scenario), you can use the try_dequeue_from_producer methods, which accept a producer token instead of a consumer token, and cut some overhead.

Note that tokens work with the blocking version of the queue too.

When producing or consuming many elements, the most efficient way is to:

  1. Use the bulk methods of the queue with tokens
  2. Failing that, use the bulk methods without tokens
  3. Failing that, use the single-item methods with tokens
  4. Failing that, use the single-item methods without tokens

Having said that, don't create tokens willy-nilly -- ideally there would be one token (of each kind) per thread. The queue will work with what it is given, but it performs best when used with tokens.

Note that tokens aren't actually tied to any given thread; it's not technically required that they be local to the thread, only that they be used by a single producer/consumer at a time.

Bulk operations

Thanks to the novel design of the queue, it's just as easy to enqueue/dequeue multiple items as it is to do one at a time. This means that overhead can be cut drastically for bulk operations. Example syntax:

moodycamel::ConcurrentQueue<int> q;

int items[] = { 1, 2, 3, 4, 5 };
q.enqueue_bulk(items, 5);

int results[5];     // Could also be any iterator
size_t count = q.try_dequeue_bulk(results, 5);
for (size_t i = 0; i != count; ++i) {
    assert(results[i] == items[i]);
}

Preallocation (correctly using try_enqueue)

try_enqueue, unlike just plain enqueue, will never allocate memory. If there's not enough room in the queue, it simply returns false. The key to using this method properly, then, is to ensure enough space is pre-allocated for your desired maximum element count.

The constructor accepts a count of the number of elements that it should reserve space for. Because the queue works with blocks of elements, however, and not individual elements themselves, the value to pass in order to obtain an effective number of pre-allocated element slots is non-obvious.

First, be aware that the count passed is rounded up to the next multiple of the block size. Note that the default block size is 32 (this can be changed via the traits). Second, once a slot in a block has been enqueued to, that slot cannot be re-used until the rest of the block has been completely filled up and then completely emptied. This affects the number of blocks you need in order to account for the overhead of partially-filled blocks. Third, each producer (whether implicit or explicit) claims and recycles blocks in a different manner, which again affects the number of blocks you need to account for a desired number of usable slots.

Suppose you want the queue to be able to hold at least N elements at any given time. Without delving too deep into the rather arcane implementation details, here are some simple formulas for the number of elements to request for pre-allocation in such a case. Note the division is intended to be arithmetic division and not integer division (in order for ceil() to work).

For explicit producers (using tokens to enqueue):

(ceil(N / BLOCK_SIZE) + 1) * MAX_NUM_PRODUCERS * BLOCK_SIZE

For implicit producers (no tokens):

(ceil(N / BLOCK_SIZE) - 1 + 2 * MAX_NUM_PRODUCERS) * BLOCK_SIZE

When using mixed producer types:

((ceil(N / BLOCK_SIZE) - 1) * (MAX_EXPLICIT_PRODUCERS + 1) + 2 * (MAX_IMPLICIT_PRODUCERS + MAX_EXPLICIT_PRODUCERS)) * BLOCK_SIZE

If these formulas seem rather inconvenient, you can use the constructor overload that accepts the minimum number of elements (N) and the maximum number of explicit and implicit producers directly, and let it do the computation for you.

In addition to blocks, there are other internal data structures that require allocating memory if they need to resize (grow). If using try_enqueue exclusively, the initial sizes may be exceeded, causing subsequent try_enqueue operations to fail. Specifically, the INITIAL_IMPLICIT_PRODUCER_HASH_SIZE trait limits the number of implicit producers that can be active at once before the internal hash needs resizing. Along the same lines, the IMPLICIT_INITIAL_INDEX_SIZE trait limits the number of unconsumed elements that an implicit producer can insert before its internal hash needs resizing. Similarly, the EXPLICIT_INITIAL_INDEX_SIZE trait limits the number of unconsumed elements that an explicit producer can insert before its internal hash needs resizing. In order to avoid hitting these limits when using try_enqueue, it is crucial to adjust the initial sizes in the traits appropriately, in addition to sizing the number of blocks properly as outlined above.

Finally, it's important to note that because the queue is only eventually consistent and takes advantage of weak memory ordering for speed, there's always a possibility that under contention try_enqueue will fail even if the queue is correctly pre-sized for the desired number of elements. (e.g. A given thread may think that the queue's full even when that's no longer the case.) So no matter what, you still need to handle the failure case (perhaps looping until it succeeds), unless you don't mind dropping elements.

Exception safety

The queue is exception safe, and will never become corrupted if used with a type that may throw exceptions. The queue itself never throws any exceptions (operations fail gracefully (return false) if memory allocation fails instead of throwing std::bad_alloc).

It is important to note that the guarantees of exception safety only hold if the element type never throws from its destructor, and that any iterators passed into the queue (for bulk operations) never throw either. Note that in particular this means std::back_inserter iterators must be used with care, since the vector being inserted into may need to allocate and throw a std::bad_alloc exception from inside the iterator; so be sure to reserve enough capacity in the target container first if you do this.

The guarantees are presently as follows:

  • Enqueue operations are rolled back completely if an exception is thrown from an element's constructor. For bulk enqueue operations, this means that elements are copied instead of moved (in order to avoid having only some objects moved in the event of an exception). Non-bulk enqueues always use the move constructor if one is available.
  • If the assignment operator throws during a dequeue operation (both single and bulk), the element(s) are considered dequeued regardless. In such a case, the dequeued elements are all properly destructed before the exception is propagated, but there's no way to get the elements themselves back.
  • Any exception that is thrown is propagated up the call stack, at which point the queue is in a consistent state.

Note: If any of your type's copy constructors/move constructors/assignment operators don't throw, be sure to annotate them with noexcept; this will avoid the exception-checking overhead in the queue where possible (even with zero-cost exceptions, there's still a code size impact that has to be taken into account).

Traits

The queue also supports a traits template argument which defines various types, constants, and the memory allocation and deallocation functions that are to be used by the queue. The typical pattern to providing your own traits is to create a class that inherits from the default traits and override only the values you wish to change. Example:

struct MyTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
	static const size_t BLOCK_SIZE = 256;		// Use bigger blocks
};

moodycamel::ConcurrentQueue<int, MyTraits> q;

How to dequeue types without calling the constructor

The normal way to dequeue an item is to pass in an existing object by reference, which is then assigned to internally by the queue (using the move-assignment operator if possible). This can pose a problem for types that are expensive to construct or don't have a default constructor; fortunately, there is a simple workaround: Create a wrapper class that copies the memory contents of the object when it is assigned by the queue (a poor man's move, essentially). Note that this only works if the object contains no internal pointers. Example:

struct MyObjectMover {
    inline void operator=(MyObject&& obj) {
        std::memcpy(data, &obj, sizeof(MyObject));
        
        // TODO: Cleanup obj so that when it's destructed by the queue
        // it doesn't corrupt the data of the object we just moved it into
    }

    inline MyObject& obj() { return *reinterpret_cast<MyObject*>(data); }

private:
    align(alignof(MyObject)) char data[sizeof(MyObject)];
};

A less dodgy alternative, if moves are cheap but default construction is not, is to use a wrapper that defers construction until the object is assigned, enabling use of the move constructor:

struct MyObjectMover {
    inline void operator=(MyObject&& x) {
        new (data) MyObject(std::move(x));
        created = true;
    }

    inline MyObject& obj() {
        assert(created);
        return *reinterpret_cast<MyObject*>(data);
    }

    ~MyObjectMover() {
        if (created)
            obj().~MyObject();
    }

private:
    align(alignof(MyObject)) char data[sizeof(MyObject)];
    bool created = false;
};

Samples

There are some more detailed samples here. The source of the unit tests and benchmarks are available for reference as well.

Benchmarks

See my blog post for some benchmark results (including versus boost::lockfree::queue and tbb::concurrent_queue), or run the benchmarks yourself (requires MinGW and certain GnuWin32 utilities to build on Windows, or a recent g++ on Linux):

cd build
make benchmarks
bin/benchmarks

The short version of the benchmarks is that it's so fast (especially the bulk methods), that if you're actually using the queue to do anything, the queue won't be your bottleneck.

Tests (and bugs)

I've written quite a few unit tests as well as a randomized long-running fuzz tester. I also ran the core queue algorithm through the CDSChecker C++11 memory model model checker. Some of the inner algorithms were tested separately using the Relacy model checker, and full integration tests were also performed with Relacy. I've tested on Linux (Fedora 19) and Windows (7), but only on x86 processors so far (Intel and AMD). The code was written to be platform-independent, however, and should work across all processors and OSes.

Due to the complexity of the implementation and the difficult-to-test nature of lock-free code in general, there may still be bugs. If anyone is seeing buggy behaviour, I'd like to hear about it! (Especially if a unit test for it can be cooked up.) Just open an issue on GitHub.

Using vcpkg

You can download and install moodycamel::ConcurrentQueue using the vcpkg dependency manager:

git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
./vcpkg integrate install
vcpkg install concurrentqueue

The moodycamel::ConcurrentQueue port in vcpkg is kept up to date by Microsoft team members and community contributors. If the version is out of date, please create an issue or pull request on the vcpkg repository.

License

I'm releasing the source of this repository (with the exception of third-party code, i.e. the Boost queue (used in the benchmarks for comparison), Intel's TBB library (ditto), CDSChecker, Relacy, and Jeff Preshing's cross-platform semaphore, which all have their own licenses) under a simplified BSD license. I'm also dual-licensing under the Boost Software License. See the LICENSE.md file for more details.

Note that lock-free programming is a patent minefield, and this code may very well violate a pending patent (I haven't looked), though it does not to my present knowledge. I did design and implement this queue from scratch.

Diving into the code

If you're interested in the source code itself, it helps to have a rough idea of how it's laid out. This section attempts to describe that.

The queue is formed of several basic parts (listed here in roughly the order they appear in the source). There's the helper functions (e.g. for rounding to a power of 2). There's the default traits of the queue, which contain the constants and malloc/free functions used by the queue. There's the producer and consumer tokens. Then there's the queue's public API itself, starting with the constructor, destructor, and swap/assignment methods. There's the public enqueue methods, which are all wrappers around a small set of private enqueue methods found later on. There's the dequeue methods, which are defined inline and are relatively straightforward.

Then there's all the main internal data structures. First, there's a lock-free free list, used for recycling spent blocks (elements are enqueued to blocks internally). Then there's the block structure itself, which has two different ways of tracking whether it's fully emptied or not (remember, given two parallel consumers, there's no way to know which one will finish first) depending on where it's used. Then there's a small base class for the two types of internal SPMC producer queues (one for explicit producers that holds onto memory but attempts to be faster, and one for implicit ones which attempt to recycle more memory back into the parent but is a little slower). The explicit producer is defined first, then the implicit one. They both contain the same general four methods: One to enqueue, one to dequeue, one to enqueue in bulk, and one to dequeue in bulk. (Obviously they have constructors and destructors too, and helper methods.) The main difference between them is how the block handling is done (they both use the same blocks, but in different ways, and map indices to them in different ways).

Finally, there's the miscellaneous internal methods: There's the ones that handle the initial block pool (populated when the queue is constructed), and an abstract block pool that comprises the initial pool and any blocks on the free list. There's ones that handle the producer list (a lock-free add-only linked list of all the producers in the system). There's ones that handle the implicit producer lookup table (which is really a sort of specialized TLS lookup). And then there's some helper methods for allocating and freeing objects, and the data members of the queue itself, followed lastly by the free-standing swap functions.

concurrentqueue's People

Contributors

biddisco avatar cahlbin avatar cameron314 avatar cboulay avatar cf-natali avatar chaosvex avatar cjappl avatar danielmelody avatar darkdimius avatar echo-mike avatar graphicsman avatar huachaohuang avatar improbablejan avatar invexed avatar ivysnow avatar jcelerier avatar jesschis avatar kevin-verdazo avatar khuck avatar leolchat avatar mathiasmagnus avatar miketsukerman avatar mpark avatar n-zer avatar pfeatherstone avatar philbucher avatar slumber avatar thieum avatar v1gnesh avatar zerodefect avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

concurrentqueue's Issues

Why define threadId()?

Hi,

Why do you defined the function in moodycamel::details called threadId? Can the same be accomplished by using the C++11 std::this_thread::get_id()?

Maybe I am misunderstanding the use.

Thanks.

Passing iterator references to enqueue_bulk and variants

Hi,

I am working on a channel abstraction over concurrentqueue; one of the features allows me to limit the number of elements in the queue dynamically.

It's similar to your blocking queue, but it allows me to change the capacity of the queue on the fly and it allows me to enqueue_bulk with a count much greater than the capacity.

I do this by waiting until there is some free space, filling that up with elements from the iterator and then repeating that until the iterator is empty.

To achieve this, I am passing a reference to the iterator my wrapper got rather than it's value, so outside of enqueue_bulk I my iterator will be incremented and in the next step I can just pass the same to enqueue_bulk again.

Unfortunately, eneuque_bulk decays my references to values. I was able to change this, but I am not quite sure this was by design.

Here is the code, hopefully with fewer bugs than the last one :)

SoftwearDevelopment@95a00a3

linux semaphore adds overhead in bulk operations for the blocking version

I started using the blocking version of the concurrent queue for scheduling tasks in Linux. The performance is pretty good, however I noticed that using bulk operations there is a bottleneck due to calling semaphore_signal in a loop. The bulk operation gets rid of the atomic operation overhead but since the semaphore_signal is being called multiple times, and being a system call it's an expensive operation, the performance starts to degrade after there are more than 15 consumer threads where there are 20 tasks per consumers. So basically this part if problematic:

https://github.com/cameron314/concurrentqueue/blob/master/blockingconcurrentqueue.h#L127

However, I am not sure if there is any alternatives for this since linux does not provide a low over head alternative. So just wanted to point it out , you might have an idea on how to remove this bottleneck?

Compilation with VS2012 fails (because of =delete functions)

The disabling of the copy constructors / operator= is the only thing preventing your queue from compiling in VS2012 (deleted functions are only supported since VS2013).

Would you consider hiding the "= delete" behind a macro that resolves to nothing when _MSC_VER <= 1700 ? (I can make a pull request if you want)

In any case, thanks for this very nice code!

Won't compile in GCC 4.9.1

I have had this issue just including your header and compiling with GCC 4.9.1. My error log comes out like this:

concurrentqueue/concurrentqueue.h: In destructor ‘moodycamel::ConcurrentQueue<T, Traits>::ExplicitProducer::~ExplicitProducer()’:
concurrentqueue/concurrentqueue.h:1538:51: error: type/value mismatch at argument 1 in template parameter list for ‘template<class _Tp> struct std::is_empty’
  if (block->template is_empty<explicit_context>()) {
                                               ^
concurrentqueue/concurrentqueue.h:1538:51: error:   expected a type, got ‘explicit_context’
concurrentqueue/concurrentqueue.h: In member function ‘bool moodycamel::ConcurrentQueue<T, Traits>::ExplicitProducer::enqueue(U&&)’:
concurrentqueue/concurrentqueue.h:1586:96: error: type/value mismatch at argument 1 in template parameter list for ‘template<class _Tp> struct std::is_empty’
 if (this->tailBlock != nullptr && this->tailBlock->next->template is_empty<explicit_context>()) {
                                                                                        ^
concurrentqueue/concurrentqueue.h:1586:96: error:   expected a type, got ‘explicit_context’
concurrentqueue/concurrentqueue.h: In member function ‘bool moodycamel::ConcurrentQueue<T, Traits>::ExplicitProducer::enqueue_bulk(It, moodycamel::ConcurrentQueue<T, Traits>::size_t)’:
concurrentqueue/concurrentqueue.h:1791:168: error: type/value mismatch at argument 1 in template parameter list for ‘template<class _Tp> struct std::is_empty’
 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->template is_empty<explicit_context>()) {
                                                                                                                                                                    ^
concurrentqueue/concurrentqueue.h:1791:168: error:   expected a type, got ‘explicit_context’
concurrentqueue/concurrentqueue.h: In destructor ‘moodycamel::ConcurrentQueue<T, Traits>::ExplicitProducer::~ExplicitProducer()’:
concurrentqueue/concurrentqueue.h:1538:51: error: type/value mismatch at argument 1 in template parameter list for ‘template<class _Tp> struct std::is_empty’
  if (block->template is_empty<explicit_context>()) {
                                               ^
concurrentqueue/concurrentqueue.h:1538:51: error:   expected a type, got ‘explicit_context’
concurrentqueue/concurrentqueue.h: In member function ‘bool moodycamel::ConcurrentQueue<T, Traits>::ExplicitProducer::enqueue(U&&)’:
concurrentqueue/concurrentqueue.h:1586:96: error: type/value mismatch at argument 1 in template parameter list for ‘template<class _Tp> struct std::is_empty’
 if (this->tailBlock != nullptr && this->tailBlock->next->template is_empty<explicit_context>()) {
                                                                                            ^
concurrentqueue/concurrentqueue.h:1586:96: error:   expected a type, got ‘explicit_context’
concurrentqueue/concurrentqueue.h: In member function ‘bool moodycamel::ConcurrentQueue<T, Traits>::ExplicitProducer::enqueue_bulk(It, moodycamel::ConcurrentQueue<T, Traits>::size_t)’:
concurrentqueue/concurrentqueue.h:1791:168: error: type/value mismatch at argument 1 in template parameter list for ‘template<class _Tp> struct std::is_empty’
 while (blockBaseDiff > 0 && this->tailBlock != nullptr && this->tailBlock->next != firstAllocatedBlock && this->tailBlock->next->template is_empty<explicit_context>()) {
                                                                                                                                                                    ^

Funny enough somebody has already tried to fix it just 3 days ago [gituhub]GATB/bcalm@be7bda8 so you might have a look at it.

Thanks for your work.

Making concurrentqueue even better :)

Thanks for creating this project and for making the source code available as a simple drop-in and cross-platform implementation. I'm impressed by the lengths you went to with developing the idea, implementing it, building the tests and metrics to prove its worth. Much appreciation also for making it commercial-friendly because as I'll attempt to demonstrate it encourages external contributions and evolution ;)

For NUMA support, we ended up implementing the SALSA algorithm (which I've seen you mention, so I know you're at least a little familiar ) -- but we ran into some issues. In particular, the requirement of "CPU binding" is not generally cross-platform, and the paper's talk of using mfence as an alternative didn't seem to work at all. So instead we did a little bit of a rework which uses an atomic OR 32-bit operation for stealing and an atomic increment for fast-path consumers, using CAS only in the event of a "steal" operation.

Anyway, wanted to drop this note to encourage you to check out SALSA in depth if you haven't already, and to warn you about the CPU bind issue. But the good news is that the workaround of a single interlocked increment on the consumer fast-path, with an interlocked OR and CAS on the "steal" path is tenable and has better performance than concurrentqueue in all permutations of 1x1, 1xN, Nx1, NxN consumer x producer. The CPU bind path is even faster, but it won't work on OSX since they don't provide a way to bind to a specific CPU core (take it up with their kernel team, whom I am forever disappointed with).

Not sure if you are aware, but there's an implementation of SALSA (not ours) available here: https://subversion.assembla.com/svn/scpools/SCPools which is not totally complete but is plenty enough to guide you toward a proper implementation, particularly when armed with the CPU bind warning which I'm not sure if they're aware of (haven't tried contacting the authors). There also seem to be various versions of their paper floating around, and this seemed to be the most up-to-date and complete (though I may be wrong) http://webee.technion.ac.il/~idish/ftp/SALSA-full.pdf.

Is basic release-consume semantic guaranteed?

I have a question about a statement in the document. The document says

Note that it is up to the user to ensure that the queue object is completely constructed before being used by any other threads (this includes making the memory effects of construction visible, possibly via a memory barrier

Suppose I have a int* a, I assigned *a = 10 and push a into the queue in thread 1, then thread 2 gets the pointer from the queue. Is that thread guaranteed to see *a == 10? If not, how can I workaround it by applying a memory barrier? Since int* isn't an atomic variable, trivially adding std::atomic_thread_fence does not seem to make any difference. I might be misunderstanding memory order stuffs or the document, though.

Thanks!

iOS

I would like to use the concurrent queue in an iOS project. However, when I try to compile I get the following error:

concurrentqueue.h:103:49: Thread-local storage is not supported for the current target

The problem is in this line:

static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }

For now I've found a quick solution by creating a file concurrentqueue.mm with the following contents:

#include "concurrentqueue.h"
#import <Foundation/Foundation.h>

moodycamel::details::thread_id_t moodycamel::details::thread_id() { return reinterpret_cast<thread_id_t>([NSThread currentThread]); }

...adding a conditional include to the top of concurrentqueue.h:

#if defined(__APPLE__)
#include "TargetConditionals.h"
#endif

...and replacing the problematic line by this code:

#if defined(__APPLE__) && (defined(TARGET_IPHONE_SIMULATOR) || defined(TARGET_OS_IPHONE))
    thread_id_t thread_id();
#else
    static inline thread_id_t thread_id() { static MOODYCAMEL_THREADLOCAL int x; return reinterpret_cast<thread_id_t>(&x); }
#endif

This seems to work, but it's not as nice as just having one header file. I can't think of a better solution right now, but maybe we can discuss it here.

Build errors in VS2015 update 1

Error C3066 there are multiple ways that an object of this type can be called with these arguments fuzztests c:\devl\c++14\concurrentqueue-master\concurrentqueue-master\tests\fuzztests\fuzztests.cpp 676 1

Error C2280 'std::atomicboost::lockfree::detail::tagged_ptr::atomic(void) noexcept': attempting to reference a deleted function benchmarks c:\devl\c++14\concurrentqueue-master\concurrentqueue-master\benchmarks\boost\lockfree\queue.hpp 109 1

Error (active) the declared exception specification is incompatible with the generated one benchmarks c:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\include\atomic 662 2

Error (active) the declared exception specification is incompatible with the generated one benchmarks c:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\include\atomic 508 2

Possible data race when running code from sample.md compiled with g++

While running "hello concurrency": https://github.com/cameron314/concurrentqueue/blob/d3735b5b7a94340f7d2eca0de9566ec8e8295ddb/samples.md I get warnings from g++ when using -fsanitize. I don't get the same warnings from clang.

System details:

Linux host 4.4.0-51-generic #72-Ubuntu SMP Thu Nov 24 18:29:54 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

Compiled using gcc:

Version info: c++ (Ubuntu 5.4.0-6ubuntu1~16.04.4) 5.4.0 20160609
c++ -g -fsanitize=thread -std=c++14 -pthread main.cpp

Full Results:

==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 8 at 0x7d1c0000a798 by thread T3:
    #0 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::next_prod() const /home/shansen/code/concurrentqueue/concurrentqueue.h:1653 (a.out+0x000000406278)
    #1 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3131 (a.out+0x00000040838e)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #3 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #5 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #6 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #7 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #8 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #9 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d1c0000a798 by thread T2:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3516 (a.out+0x000000408f92)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 104 at 0x7d1c0000a790 allocated by thread T2:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3516 (a.out+0x000000408f92)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T3 (tid=5520, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

  Thread T2 (tid=5519, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:1653 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::next_prod() const
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 8 at 0x7d1c0000a5d8 by thread T11:
    #0 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::next_prod() const /home/shansen/code/concurrentqueue/concurrentqueue.h:1653 (a.out+0x000000406278)
    #1 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1040 (a.out+0x000000405cb9)
    #2 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #3 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #4 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #5 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #6 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d1c0000a5d8 by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3516 (a.out+0x000000408f92)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 104 at 0x7d1c0000a5d0 allocated by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3516 (a.out+0x000000408f92)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T11 (tid=5528, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T10 (tid=5527, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:1653 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::next_prod() const
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 1 at 0x7d1c0000a618 by thread T11:
    #0 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1634 (a.out+0x0000004066f6)
    #1 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1054 (a.out+0x000000405cdd)
    #2 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #3 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #4 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #5 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #6 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d1c0000a618 by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3516 (a.out+0x000000408f92)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 104 at 0x7d1c0000a5d0 allocated by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3516 (a.out+0x000000408f92)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T11 (tid=5528, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T10 (tid=5527, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:1634 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&)
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 8 at 0x7d5c0001e318 by thread T11:
    #0 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index(unsigned long, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader*&) const /home/shansen/code/concurrentqueue/concurrentqueue.h:2854 (a.out+0x000000409880)
    #1 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_entry_for_index(unsigned long) const /home/shansen/code/concurrentqueue/concurrentqueue.h:2842 (a.out+0x000000408cb2)
    #2 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2478 (a.out+0x000000407f99)
    #3 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1638 (a.out+0x00000040672c)
    #4 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1054 (a.out+0x000000405cdd)
    #5 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #6 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #7 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #8 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #9 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d5c0001e318 by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::new_block_index() /home/shansen/code/concurrentqueue/concurrentqueue.h:2872 (a.out+0x0000004091e1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::ImplicitProducer(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*) /home/shansen/code/concurrentqueue/concurrentqueue.h:2340 (a.out+0x000000409db2)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3517 (a.out+0x000000408fdb)
    #5 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #7 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #8 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #9 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #10 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #11 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #12 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #13 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 822 at 0x7d5c0001e300 allocated by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::new_block_index() /home/shansen/code/concurrentqueue/concurrentqueue.h:2872 (a.out+0x0000004091e1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::ImplicitProducer(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*) /home/shansen/code/concurrentqueue/concurrentqueue.h:2340 (a.out+0x000000409db2)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3517 (a.out+0x000000408fdb)
    #5 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #7 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #8 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #9 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #10 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #11 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #12 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #13 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T11 (tid=5528, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T10 (tid=5527, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:2854 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index(unsigned long, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader*&) const
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 8 at 0x7d5c0001e528 by thread T11:
    #0 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index(unsigned long, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader*&) const /home/shansen/code/concurrentqueue/concurrentqueue.h:2854 (a.out+0x000000409898)
    #1 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_entry_for_index(unsigned long) const /home/shansen/code/concurrentqueue/concurrentqueue.h:2842 (a.out+0x000000408cb2)
    #2 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2478 (a.out+0x000000407f99)
    #3 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1638 (a.out+0x00000040672c)
    #4 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1054 (a.out+0x000000405cdd)
    #5 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #6 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #7 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #8 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #9 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d5c0001e528 by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::new_block_index() /home/shansen/code/concurrentqueue/concurrentqueue.h:2872 (a.out+0x0000004091e1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::ImplicitProducer(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*) /home/shansen/code/concurrentqueue/concurrentqueue.h:2340 (a.out+0x000000409db2)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3517 (a.out+0x000000408fdb)
    #5 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #7 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #8 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #9 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #10 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #11 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #12 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #13 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 822 at 0x7d5c0001e300 allocated by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::new_block_index() /home/shansen/code/concurrentqueue/concurrentqueue.h:2872 (a.out+0x0000004091e1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::ImplicitProducer(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*) /home/shansen/code/concurrentqueue/concurrentqueue.h:2340 (a.out+0x000000409db2)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3517 (a.out+0x000000408fdb)
    #5 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #7 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #8 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #9 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #10 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #11 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #12 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #13 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T11 (tid=5528, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T10 (tid=5527, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:2854 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index(unsigned long, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader*&) const
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 8 at 0x7d5c0001e300 by thread T11:
    #0 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index(unsigned long, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader*&) const /home/shansen/code/concurrentqueue/concurrentqueue.h:2859 (a.out+0x00000040991d)
    #1 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_entry_for_index(unsigned long) const /home/shansen/code/concurrentqueue/concurrentqueue.h:2842 (a.out+0x000000408cb2)
    #2 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2478 (a.out+0x000000407f99)
    #3 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1638 (a.out+0x00000040672c)
    #4 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1054 (a.out+0x000000405cdd)
    #5 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #6 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #7 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #8 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #9 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d5c0001e300 by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::new_block_index() /home/shansen/code/concurrentqueue/concurrentqueue.h:2872 (a.out+0x0000004091e1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::ImplicitProducer(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*) /home/shansen/code/concurrentqueue/concurrentqueue.h:2340 (a.out+0x000000409db2)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3517 (a.out+0x000000408fdb)
    #5 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #7 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #8 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #9 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #10 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #11 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #12 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #13 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 822 at 0x7d5c0001e300 allocated by thread T10:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::new_block_index() /home/shansen/code/concurrentqueue/concurrentqueue.h:2872 (a.out+0x0000004091e1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::ImplicitProducer(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*) /home/shansen/code/concurrentqueue/concurrentqueue.h:2340 (a.out+0x000000409db2)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>*&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3517 (a.out+0x000000408fdb)
    #5 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) /home/shansen/code/concurrentqueue/concurrentqueue.h:3143 (a.out+0x00000040840b)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() /home/shansen/code/concurrentqueue/concurrentqueue.h:3389 (a.out+0x000000407248)
    #7 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1280 (a.out+0x0000004064ce)
    #8 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #9 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #10 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #11 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #12 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #13 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T11 (tid=5528, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T10 (tid=5527, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:2859 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index(unsigned long, moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader*&) const
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 4 at 0x7d3400006730 by thread T12:
    #0 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2509 (a.out+0x000000407fe4)
    #1 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1638 (a.out+0x00000040672c)
    #2 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1054 (a.out+0x000000405cdd)
    #3 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #4 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #5 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #6 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #7 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 8 at 0x7d3400006730 by thread T9:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block>() /home/shansen/code/concurrentqueue/concurrentqueue.h:3509 (a.out+0x000000409724)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::requisition_block<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0>() /home/shansen/code/concurrentqueue/concurrentqueue.h:3002 (a.out+0x0000004088e5)
    #4 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2420 (a.out+0x0000004076ce)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1281 (a.out+0x0000004064f7)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 208 at 0x7d3400006730 allocated by thread T9:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block>() /home/shansen/code/concurrentqueue/concurrentqueue.h:3509 (a.out+0x000000409724)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::requisition_block<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0>() /home/shansen/code/concurrentqueue/concurrentqueue.h:3002 (a.out+0x0000004088e5)
    #4 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2420 (a.out+0x0000004076ce)
    #5 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1281 (a.out+0x0000004064f7)
    #6 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #7 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #8 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #9 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #10 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #11 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Thread T12 (tid=5529, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T9 (tid=5526, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:2509 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&)
==================
==================
WARNING: ThreadSanitizer: data race (pid=5516)
  Read of size 4 at 0x7d640000b810 by thread T11:
    #0 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2509 (a.out+0x000000407fe4)
    #1 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1638 (a.out+0x00000040672c)
    #2 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<int>(int&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1054 (a.out+0x000000405cdd)
    #3 main::{lambda()#2}::operator()() const <null> (a.out+0x000000401786)
    #4 _M_invoke<> /usr/include/c++/5/functional:1531 (a.out+0x000000404bac)
    #5 operator() /usr/include/c++/5/functional:1520 (a.out+0x00000040499e)
    #6 _M_run /usr/include/c++/5/thread:115 (a.out+0x00000040481e)
    #7 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Previous write of size 4 at 0x7d640000b810 by thread T5:
    #0 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:2456 (a.out+0x0000004077d5)
    #1 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, int>(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:1281 (a.out+0x0000004064f7)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(int&&) /home/shansen/code/concurrentqueue/concurrentqueue.h:917 (a.out+0x000000405bec)
    #3 main::{lambda(int)#1}::operator()(int) const <null> (a.out+0x0000004016ea)
    #4 _M_invoke<0ul> /usr/include/c++/5/functional:1531 (a.out+0x000000404ccb)
    #5 operator() /usr/include/c++/5/functional:1520 (a.out+0x000000404a10)
    #6 _M_run /usr/include/c++/5/thread:115 (a.out+0x000000404868)
    #7 <null> <null> (libstdc++.so.6+0x0000000b8c7f)

  Location is heap block of size 1248 at 0x7d640000b400 allocated by main thread:
    #0 malloc <null> (libtsan.so.0+0x0000000254a3)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:323 (a.out+0x00000040545f)
    #2 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::create_array<moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::Block>(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:3483 (a.out+0x0000004068c1)
    #3 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::populate_initial_block_list(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:2945 (a.out+0x000000406105)
    #4 moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ConcurrentQueue(unsigned long) /home/shansen/code/concurrentqueue/concurrentqueue.h:730 (a.out+0x0000004059a9)
    #5 main /home/shansen/code/concurrentqueue/main.cpp:5 (a.out+0x000000401867)

  Thread T11 (tid=5528, running) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:20 (a.out+0x000000401a14)

  Thread T5 (tid=5522, finished) created by main thread at:
    #0 pthread_create <null> (libtsan.so.0+0x000000027577)
    #1 std::thread::_M_start_thread(std::shared_ptr<std::thread::_Impl_base>, void (*)()) <null> (libstdc++.so.6+0x0000000b8dc2)
    #2 main /home/shansen/code/concurrentqueue/main.cpp:11 (a.out+0x00000040192d)

SUMMARY: ThreadSanitizer: data race /home/shansen/code/concurrentqueue/concurrentqueue.h:2509 bool moodycamel::ConcurrentQueue<int, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<int>(int&)

Single producer, multi-consumer audio processing

I'm trying to use this (amazing) project for multi-threading some audio processing algorithm. It's a synthesiser with 50 separate voices (notes) that can be played at the same time.

I set up the consumer threads like so:

processQueue = moodycamel::BlockingConcurrentQueue<VoiceProcessTask>(numVoices);
numThreads = SystemStats::getNumCpus();

for (int i = 0; i < numThreads - 1; i++) {
            processThreads.push_back(thread([&, i]() {
                int maxTasks = numVoices / numThreads;
                vector<VoiceProcessTask> tasks(maxTasks);
                VoiceProcessTask task;
                moodycamel::ConsumerToken token(processQueue);

                while (true) {
                    int count = processQueue.wait_dequeue_bulk(token, &tasks[0], maxTasks);

                    for (int i = 0; i < count; ++i) tasks[i].process();

                    voicesRemaining.fetch_add(-count, std::memory_order_release);
                }
            }));
        }

The maxTasks is an attempt to spread the voices over each thread, because I assume that otherwise the first thread will snatch all the Tasks.

Then everytime the sound device needs a new block of audio, this is called:

voicesRemaining = voices.size();
        for (int i = 0; i < voices.size(); ++i) {
            processQueue.enqueue(
                VoiceProcessTask(voices.getUnchecked(i), &buffer, startSample, numSamples));
        }

        int count;
        while (voicesRemaining.load(std::memory_order_acquire) > 0) {
            int count = processQueue.try_dequeue_bulk(&remainingTaskList[0], numVoices / numThreads);
            voicesRemaining.fetch_add(-count, std::memory_order_release);
            for (int i = 0; i != count; ++i) {
                remainingTaskList[i].process();
            }
        }

However, the audio crackles if I use the multi-threading. I checked that voicesRemaining is zero at the end of each block and it is, so the processing is finishing it seems. Am I doing something silly? The crackles suggest that the voices are being processed fast enough, leading to buffer overruns.

Also using the xcode profiler it seems that overall, the multi-threaded code uses more cpu time. Any tips here?

Thanks

Program received signal SIGSEGV, Segmentation fault while using producer tokens

When I use moodycamel::ConcurrentQueuestd::string with 4 consumer and 4 producer threads with initial size 1024*1024.

NOTE:

  1. This worked fine while using data type as int.
  2. This work fine while using non producer tokens
  3. I have undefined MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
  4. Using GCC 4.9
  5. Darwin 7831c1c1501a 14.0.0 Darwin Kernel Version 14.0.0: Fri Sep 19 00:26:44 PDT 2014; root:xnu-2782.1.97~2/RELEASE_X86_64 x86_64
Program received signal SIGSEGV, Segmentation fault.
0x000000010000363e in moodycamel::ConcurrentQueue<std::string,     moodycamel::ConcurrentQueueDefaultTraits>::ExplicitProducer::~ExplicitProducer() ()
(gdb) bt
#0  0x000000010000363e in moodycamel::ConcurrentQueue<std::string,     moodycamel::ConcurrentQueueDefaultTraits>::ExplicitProducer::~ExplicitProducer() ()
#1  0x00000001000028b1 in test_1000() ()
#2  0x0000000100008599 in main ()

memory leak detect in my program, when using the concurrentqueue.try_dequeue() methord

My questions are as follows:

Defines an ConcurrentQueue type for storing multiple Tree.

ConcurrentQueue _leakedTrees new = ConcurrentQueue static ();

Write a method, according to the input of the configuration, the structure of the size of the Tree, and the Tree into the ConcurrentQueue.

After the execution of the TryDequeue cycle, the Queue will be cleared. In theory, we think that after ConcurrentQueue, TryDequeue has lost the reference to the various Tree object instance, and each Tree object has not been referenced in the program, it can be considered in the implementation of GC.Collect (), will be the Tree object from the heap.

But the leak was so naked.

Thank you for your assistance!

multi-thread problem

Hi,when using the ConcurrentQueue in multi-thread,I found a problem.Below is my code:

moodycamel::ConcurrentQueue q;
void threadproc()
{
for (int i = 0; i < 10; i++)
{
q.enqueue(i);
}
}

int main()
{
thread td1(threadproc);
td1.join();
thread td2(threadproc);
td2.join();

int d;
while (q.try_dequeue(d))
    {
    cout << d << " ";
}
cin.get();
    return 0;

}

The expected output is:
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9

Not as expected, the output result is:
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9

Is it a bug of ConcurrentQueue about multi-thread or the usage of ConcurrentQueue in my code is wrong?

The maximum size of the queue is 1024?

I‘ve wotten test code following:
int main(int argc, char** argv)
{
ConcurrentQueue queue(10000000);
cout << queue.size_approx() << endl;
for(int i = 0; i < 40000; i++)
{
if(!queue.try_enqueue(i))
cout << i << " queue is full" << endl;
}
}

but the result is:
[signal@keyuanfenxi02 test]$ ./test | more
0
1024 queue is full
1025 queue is full
1026 queue is full
1027 queue is full
1028 queue is full
1029 queue is full

How can I add more elements into the queue?

On compilation: undefined reference to `clock_gettime'

On compile, I get the following errors:

systemtime.cpp:121: undefined reference to `clock_gettime'
/tmp/ccoaIDky.o: In function `moodycamel::getTimeDelta(timespec)':
systemtime.cpp:134: undefined reference to `clock_gettime'
collect2: error: ld returned 1 exit status

This is on Ubuntu 14.04, using gcc v 4.8.1

The solution is trivial: command-line option '-lrt' should be right at the the end of the command line, not in between, eg.:

g++ -std=gnu++11 -Wall -Wconversion -fno-elide-constructors -pthread -g -O0 ../tests/common/simplethread.cpp ../tests/common/systemtime.cpp ../tests/unittests/unittests.cpp -o bin/unittests -lrt

Please adjust your makefile accordingly.

Thanks,

Wim.

Two-step Initialization

Some of us who haven't figured out how to handle errors in constructors tend to use two-step constructors. It should be pretty basic to implement for the queue here... although ugly (as is every two-step initialization).

Strict aliasing warning with Android GCC 4.9

Compiler version:
arm-linux-androideabi-c++ (GCC) 4.9 20140827 (prerelease)

Built in C++-1y mode (almost C++14).

The warning is as follows:

include/moodycamel/concurrentqueue.h:1575:97: warning: dereferencing type-punned pointer will break strict-aliasing rules [-Wstrict-aliasing]
    inline T* operator[](index_t idx) MOODYCAMEL_NOEXCEPT { return reinterpret_cast<T*>(elements) + static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1)); }

(line may be slightly off due to additional comments inserted)

Warning seems correct in that char object is accessed through T*, which is not allowed in C++11 and newer.

Compilation on MSVS 2015 (CTP5) fails

Just a heads up as this is pre-release software ...

Building fails from converted MSVS 2012 solution - all configurations

unittests builds and passes OK

however rest of solution not so happy ...

Warning at start of benchmarks build (assumed benign):

'Unknown compiler version - please run the configure tests and report the results'

Errors:

  1. benchmarks - boost:lockfree problem

u:\dev\concurrentqueue\benchmarks\boost/lockfree/queue.hpp(109): error C2280: 'std::atomicboost::lockfree::detail::tagged_ptr::atomic(void) noexcept': attempting to reference a deleted function

(also tried with latest boost develop branch implementation of queue.hpp with same error)

  1. fuzztests

....\tests\fuzztests\fuzztests.cpp(676): error C3066: there are multiple ways that an object of this type can be called with these arguments

Microsoft's <xxatomic> has 2 overloads of
template
struct atomic<_Ty *>
{
operator _Ty *() const volatile _NOEXCEPT
operator _Ty *() const _NOEXCEPT

and it can't choose between them (I didn't even realise I could differentiate const functions on volatile)

  1. Also several 'shadowed variable' warnings to ignore

Exception safety

The queue should be resilient against exceptions thrown from element objects, without imposing any overhead for types that have noexcept constructors and assignment operators.

It is reasonable to assume (and document) that destructors and iterators should never throw an exception.

ABA problem

I have a quick try by replacing Apple's OSAtomicEnqueue/OSAtomicDequeue with this queue and my code crashes. I use OSAtomic* from system library because my naive lock-free singly linked list implementation suffers from ABA bug. Does this one deal with ABA problem?

Support for interdependent producer linearizability

The cornerstone of the entire design is that independent producers can safely put their elements in independent sub-queues, without loss of queue semantics (since depending on the overall order of elements between those queues is tantamount to a race condition). Thanks to the design based on this assumption, the queue can be very fast, but is not linearizable.

However, many people have raised concerns about this, since it turns out that producers are not always independent; often there is synchronization between them, and in such cases it's often a requirement to have a relative ordering between elements produced by one producer with respect to the other.

Example

As an example, the queue may be used for log messages; one thread may put on a message saying it's scheduling a task, then that task is executed by another thread (with a happens-after synchronization), which logs a message saying the task is complete. Obviously, the 'task scheduled' message should come out before the 'task complete' message, but it may not because the producers are presently treated as being independent.

Workaround

Currently, the only workaround is to share an explicit producer token between all interdependent producers, and lock around multi-threaded access to it (since producer tokens are not thread-safe). This somewhat negates the benefits of using a lock-free queue in the first place!

Proposed solution

I propose adding a third type of token, the InterdependentSharedProducerToken (alas, I was never very good at naming things). This token will be usable from multiple producer threads simultaneously, and will also be lock-free. Users would then be able to choose which groups of producers are interdependent and share one of these tokens among each of them. This is still more efficient than always enforcing linearizability, while still supporting it where required (at a slight increase in complexity, of course).

ConcurrentQueue not a FIFO

I was using the queue in my logger and noticed that the timestamps are out of order.
Both blocking and non-blocking have the same problem. The SafeQueue which can be found on Github is a FIFO and passes the same test.

I am using GCC-4.9.1 on CentOS 6.5.

This is a test (I replaced my timestmped messages with just integers:

//#include "blockingconcurrentqueue.h"
#include "concurrentqueue.h"
//#include "SafeQueue.hpp"
#include <vector>
#include <future>
#include <mutex>
#include <functional>
#include <iostream>
#include <unistd.h>
#include <atomic>

constexpr unsigned threadCnt = 10;
std::mutex              mtx, inmtx;                   // for queue_
std::condition_variable cv;                    // for queue_
using QueData = std::vector<std::int64_t>;
QueData in, out;
std::atomic<int> nta{0};
using Queue = moodycamel::ConcurrentQueue<std::int64_t>; // errors
//using Queue = moodycamel::BlockingConcurrentQueue<std::int64_t>;//errors
using Queue =SafeQueue<std::int64_t>; // no errors

auto generateData (Queue& q)-> void
{
    int nt = std::atomic_fetch_add(&nta, 1);
    std::cout << "thread "<< nt <<" started "<< std::endl;
    std::int64_t  n = 10000;
    while ( n > -2)
    {
                // serialize enqueueing
                std::unique_lock<std::mutex> lk(inmtx);
                auto k = n;
                in.push_back(k); // sent data to compare
                q.enqueue(std::move(k));
                cv.notify_one();
                --n;
    }
    std::cout << "thread "<< nt <<" finished with n="<< n+1 <<std::endl;
}
int main()
{
    Queue queue;
    std::vector<std::future<void>> tasks;                 // thread pool
    tasks.emplace_back(std::async(std::launch::async,[&]()
    {
                std::int64_t ncount{};
                std::cout << "dequeueing thread started "<< std::endl;
                while(ncount<threadCnt)
                {
                    //std::cout << "reading dequeued item"<< std::endl;
                    std::int64_t item;
                    if(queue.try_dequeue(item)) // get of the queue
                    {
                                if( -1 == item )
                                {
                                    ncount++;
                                    std::cout << "received -1 in dequeueing thread"<< ncount << std::endl;
                                }
                                out.push_back(item); // received data to compare
                    }
                    else
                    {
                               //wait for signal or timeout
                                std::unique_lock<std::mutex> lk(mtx);
                                cv.wait_for(lk, std::chrono::milliseconds(100));
                    }
                }
                std::cout << "dequeueing thread finished "<< std::endl;
    }));
    usleep(1000*100);// for reader to start
    for(int n = 0; n<threadCnt; ++n )
    {
                tasks.emplace_back(std::async(std::launch::async,[&]()
                {
                    generateData(queue);
                }));
    }
    // wait for all threads to finish
    for( auto& t : tasks) t.get();
    std::cout << "all threads finished "<< std::endl;
    // compare results
    assert (in.size() == out.size());
    std::cout << "comparing "<<  in.size() << "items" << std::endl;
    unsigned errCnt{};
    for (int n =0 ; n < in.size(); ++n)
    {
                if(in[n] != out[n])
                {
                    //std::cout << "item " << n <<" " << in[n] << " != " << out[n] << std::endl;
                    ++errCnt;
                }
    }
    std::cout << "done: error count=" << errCnt << std::endl;
    return 0;
}

won't compile on FreeBSD CLANG version 3.4.1

c++ -g -O2 -D_BSD_SOURCE -DLIBNET_BSDISH_OS -DLIBNET_BSD_BYTE_SWAP -DHAVE_SOCKADDR_SA_LEN -I.. -I/usr/local/include -std=c++11 -c log_queue.cpp
In file included from log_queue.cpp:8:
../utils/concurrentqueue.hpp:1622:65: error: expected expression
  ...auto newBlock = this->parent->requisition_block();
                                                                ^
../utils/concurrentqueue.hpp:1828:65: error: expected expression
  ...auto newBlock = this->parent->requisition_block();
                                                                ^
../utils/concurrentqueue.hpp:2214:64: error: expected expression
  ...auto newBlock = this->parent->requisition_block();
                                                                ^
../utils/concurrentqueue.hpp:2363:159: error: expected expression
  ...|| (newBlock = this->parent->requisition_block()) == nullptr) {
                                                               ^

The arrow is pointing to the last brace in "();".

Fuzz tests on Android fail

Hi, I'm evaluating the concurrentqueue for use on android, in a c++ app.

I tried running the fuzz tests (well, basically a chopped down version of the main in fuzztests.cpp), and some of them failed. It happens in two different points of the same type of test (core_thread_local). Here's some data, grouped by type of error:

AFTER 672 ITERATIONS
 multithread_produce: 232 successful, 0 failed
 multithread_consume: 240 successful, 0 failed
 multithread_produce_and_consume: 192 successful, 0 failed
 completely_random: 208 successful, 0 failed
 core_add_only_list: 252 successful, 0 failed
 core_thread_local: 197 successful, 14 failed
 - Executed 1335 tests so far

Reason: assertion failed on line 593: p->value == 0
 Seed: d7990b4e4dea1895
 Seed: 96fe1c7d0392537b
 Seed: 139af5f3d1c7e4bb
 Seed: d013a74b4faa2d1b
 Seed: 7b0ceb690c3286d9
 Seed: 955a9db3764ac57b
 Seed: 09efdf685ad77030

Reason: assertion failed on line 611: *localData[i] == i
 Seed: 3eab167012cb7d8c
 Seed: c448fd7c0b9725fc
 Seed: 4e5b54df425bbbb6
 Seed: 639d2c83865d2b47
 Seed: f9ae9a0365bec08c
 Seed: c3bd84c3c96e2f17
 Seed: 6de7a43c21a7bd6e

I'm not sure how to interpret these results yet, I will try having a closer look at the code, in the meanwhile I thought it wouldn't hurt to ask and share what I got. Any pointer would be much appreciated, thanks!

As a reference, here's the test function I "assembled":

bool FuzzyTestMoodyCamelConcurrentQueue()
{
    uint32_t iteration = 0;
    uint64_t seed = 0;
    int result = 0;
    test_type test;
    const char* failReason;

    while (true)
    {
        seed = (static_cast<uint64_t>(std::time(NULL)) << 32) | iteration++;
        // MurmurHash3 64-bit finalizer
        seed ^= seed >> 33;
        seed *= 0xff51afd7ed558ccd;
        seed ^= seed >> 33;
        seed *= 0xc4ceb9fe1a85ec53;

        g_seed.store(seed, std::memory_order_release);
        int result;

        try {
            result = run_test(seed, 2, test, failReason);
        }
        catch (std::exception const& e) {

            PRINTMSG("*** Exception : %s\n      Seed: %08x%08x\n      Test: %s\n\n", e.what(), (uint32_t)(seed >> 32), (uint32_t)(seed), test_names[test]);
            return false;
        }
        catch (...) {

            PRINTMSG("*** Unknown exception thrown!\n      Seed: %08x%08x\n      Test: %s\n\n", (uint32_t)(seed >> 32), (uint32_t)(seed), test_names[test]);
            return false;
        }

        if (!result) {
            result = 1;
            PRINTMSG("*** Failure detected!\n      Seed: %08x%08x\n      Test: %s\n      Reason: %s\n", (uint32_t)(seed >> 32), (uint32_t)(seed), test_names[test], failReason);
        }

        if ((iteration & 15) == 0) {

            std::uint64_t total = 0;
            PRINTMSG("AFTER %u ITERATIONS",iteration);
            for (int i = 0; i != TEST_TYPE_COUNT; ++i)
            {
                PRINTMSG(" %s: %llu successful, %llu failed\n", test_names[i], (unsigned long long)(test_count[i] - fail_count[i]), (unsigned long long)fail_count[i]);
                total += test_count[i];
            }
            PRINTMSG(" - Executed %llu tests so far", (unsigned long long)total);
        }
    }
}

where PRINTMSG is a macro defined to the android native log function.

VS2013 atomic_flag compile error

Hi, thanks for making this project. I've been doing C# for over seven years, so it's a good learning experience for me!

When trying to get this running in Visual Studio 2013, I got this compile error:

Error   1   error C2280: 'std::atomic_flag::atomic_flag(const std::atomic_flag &)' : attempting to reference a deleted function c:\sceadev\github\concurentqueue\concurrentqueue\concurrentqueue.h  438 1   LockFreeConcurrentQueue

The compile error is because this member initializer:

        implicitProducerHashResizeInProgress(ATOMIC_FLAG_INIT),

... uses a constructor that is deleted:

    atomic_flag(const atomic_flag&) = delete;

The default parameterless constructor is available, so the fix is to remove the ATOMIC_FLAG_INIT parameter and the atomic_flag still gets initialized to zero. Line 436 becomes:

        implicitProducerHashResizeInProgress(),

I have no idea what the repercussions are for other compilers. I also don't understand why you haven't encountered this problem. I'm compiling for Win32, in case that matters.

--Ron

Segfault in get_block_index_index_for_index

Can't find the problem here, maybe you can point me out where's the problem. I'm storing pointers to objects in multi-producer multi-consumer mode.
#0 0x0004fec4 in std::__atomic_base::load (__m=std::memory_order_relaxed, this=0x0) at /root/x-tools/arm-unknown-linux-gnueabihf/arm-unknown-linux-gnueabihf/include/c++/5.3.0/bits/atomic_base.h:396
#1 moodycamel::ConcurrentQueue<Packet*, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_index_for_index (this=0xa9600678, index=615040, localBlockIndex=@0x935fe5c4: 0xa9600e90)

at multireaderwriterqueue.h:2861

#2 0x0004b748 in moodycamel::ConcurrentQueue<Packet*, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::get_block_index_entry_for_index (this=0xa9600678, index=615059) at multireaderwriterqueue.h:2849
#3 0x00046494 in moodycamel::ConcurrentQueue<Packet*, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<Packet*> (this=0xa9600678, element=@0x935fe6c8: 0x0) at multireaderwriterqueue.h:2485
#4 0x00042434 in moodycamel::ConcurrentQueue<Packet*, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<Packet*> (this=0xa9600678, element=@0x935fe6c8: 0x0) at multireaderwriterqueue.h:1645
#5 0x0003d478 in moodycamel::ConcurrentQueue<Packet*, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<Packet*> (this=0x5d5460, item=@0x935fe6c8: 0x0) at multireaderwriterqueue.h:1061

..........
the line in multireaderwriterqueue.h is:
auto tailBase = localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
..........
(gdb) p localBlockIndex
$1 = (moodycamel::ConcurrentQueue<Packet*, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexHeader &) @0x935fe5c4: 0xa9600e90
(gdb) p tail
$3 = 2841641024
(gdb) p localBlockIndex->index
$8 = (moodycamel::ConcurrentQueue<Packet
, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::BlockIndexEntry **) 0x0

Can you please tell me what happened and how to fix it?
Thank you.

Enumerate contents

This is a feature request (not a bug!)

I need a routine to enumerate all the elements (more precisely, the addresses of
all the elements). I need this for my system garbage collector so it can find
contained pointers. The routine can assume all (other) threads are stopped.

FYI: I am implementing a thread pool whose threads grab jobs from the
queue. These are represented by a pointer. My first exercise is to use it
to do matrix multiplication quickly (to win a language micro-test competition : )
A naive implementation will just dispatch N x N jobs, each of which calculates
a single inner-product. If N is large enough that should run several times faster
than a serial algorithm on a quad-core. A more advanced algorithm would use
submatrix partitions (which does a few more operations but uses the cache
better). My current queue uses traditional single mutex lock.

BTW: This kind of queue must be bound. When the queue is full
we want producers to hang waiting (i.e. block), when the queue is empty,
the consumers should also hang (i.e. block). So to actually use your queue,
I needs to wrap the enqueue/dequeue functions which do a timed yield loop.
My standard queue uses a semaphore, which is another option. So interestingly
I will have to wrap all the locking around you queue anyhow (but only use it when
the queue is empty or full). Note: approximate fullness is good enough.

And so on to the next queue: a priority queue .. :)

Possible data race using `BlockingConcurrentQueue::wait_dequeue` detected by thread sanitizer

I have come across a situation where ThreadSanitizer detects a data race after moving an object inside a moodycamel::BlockingConcurrentQueue. The object has non-default move-constructor and move-assignment operators.

I apologize in advance if I am doing something wrong/stupid in my code - it's likely that the race condition is caused because of a mistake I may have made.

Here's a minimal example, also available on gist.

#include <array>
#include <atomic>
#include "./blockingconcurrentqueue.h"

using namespace std::chrono_literals;

// global counter
// * the program can exit when it reaches 1000
// * incremented by tasks
std::atomic<int> ctr{0};

// task 
// * non-copyable
// * increments `ctr` on construction
// * transfers some mock state `x` on moves
struct task
{
    int x = 0;

    task()
    {
        ++ctr;
    }

    task(const task&) = delete;
    task& operator=(const task&) = delete;

    task(task&& rhs) : x{rhs.x}
    {
    }

    task& operator=(task&& rhs)
    {
        x = rhs.x;
        return *this;
    }
};

// task queue
moodycamel::BlockingConcurrentQueue<task> q;

// worker
// * contains thread that constantly calls `wait_dequeue`
// * on dtor, spawns tasks until thread is joined
struct worker
{
    std::thread th;
    std::atomic<bool> running{true};
    std::atomic<bool> exited{false};

    worker()
    {
        th = std::thread([this]
            {
                task t;
                while(running)
                {
                    q.wait_dequeue(t);
                }
                exited = true;
            });
    }

    ~worker()
    {
        running = false;

        while(!exited)
        {
            q.enqueue(task{});
            std::this_thread::sleep_for(1ms);
        }

        th.join();
    }
};

int main()
{
    // create 8 workers
    std::array<worker, 8> ws;

    // enqueue 1000 tasks
    for(int i = 0; i < 1000; ++i)
    {
        q.enqueue(task{});
    }

    // wait for `ctr` to be `>= 1000`
    while(ctr < 1000)
    {
        std::this_thread::sleep_for(1ms);
    }
}

The worker instances simulate a thread pool. The task type simulates a moveable callable object (like std::function).

When compiling with -fsanitize=thread, both g++ 6.1.1 and clang++ 3.8.0 report a huge number of the following data race (also available on the same gist):

WARNING: ThreadSanitizer: data race (pid=15381)
  Read of size 4 at 0x7d340000cf48 by thread T7:
    #0 task::operator=(task&&) <null> (a.out+0x000000402c36)
    #1 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::dequeue<task>(task&) <null> (a.out+0x000000406279)
    #2 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::ProducerBase::dequeue<task>(task&) <null> (a.out+0x000000405340)
    #3 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::try_dequeue<task>(task&) <null> (a.out+0x0000004044e9)
    #4 void moodycamel::BlockingConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::wait_dequeue<task>(task&) <null> (a.out+0x000000403303)
    #5 worker::worker()::{lambda()#1}::operator()() const <null> (a.out+0x000000402ce6)
    #6 void std::_Bind_simple<worker::worker()::{lambda()#1} ()>::_M_invoke<>(std::_Index_tuple<>) <null> (a.out+0x00000040a1a4)
    #7 std::_Bind_simple<worker::worker()::{lambda()#1} ()>::operator()() <null> (a.out+0x00000040a0dd)
    #8 std::thread::_State_impl<std::_Bind_simple<worker::worker()::{lambda()#1} ()> >::_M_run() <null> (a.out+0x00000040a018)
    #9 execute_native_thread_routine /build/gcc-multilib/src/gcc/libstdc++-v3/src/c++11/thread.cc:83 (libstdc++.so.6+0x0000000baaae)

  Previous write of size 8 at 0x7d340000cf48 by main thread:
    #0 malloc /build/gcc-multilib/src/gcc/libsanitizer/tsan/tsan_interceptors.cc:538 (libtsan.so.0+0x000000026aac)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) <null> (a.out+0x000000402102)
    #2 moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::Block>() <null> (a.out+0x00000040880b)
    #3 moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::requisition_block<(moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0>() <null> (a.out+0x000000407add)
    #4 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::enqueue<(moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, task>(task&&) <null> (a.out+0x000000406ed4)
    #5 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, task>(task&&) <null> (a.out+0x0000004057eb)
    #6 moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(task&&) <null> (a.out+0x0000004049c6)
    #7 moodycamel::BlockingConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(task&&) <null> (a.out+0x00000040343e)
    #8 main <null> (a.out+0x0000004016dd)

  Location is heap block of size 208 at 0x7d340000cf30 allocated by main thread:
    #0 malloc /build/gcc-multilib/src/gcc/libsanitizer/tsan/tsan_interceptors.cc:538 (libtsan.so.0+0x000000026aac)
    #1 moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) <null> (a.out+0x000000402102)
    #2 moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::Block>() <null> (a.out+0x00000040880b)
    #3 moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::Block* moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::requisition_block<(moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0>() <null> (a.out+0x000000407add)
    #4 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::enqueue<(moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, task>(task&&) <null> (a.out+0x000000406ed4)
    #5 bool moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, task>(task&&) <null> (a.out+0x0000004057eb)
    #6 moodycamel::ConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(task&&) <null> (a.out+0x0000004049c6)
    #7 moodycamel::BlockingConcurrentQueue<task, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(task&&) <null> (a.out+0x00000040343e)
    #8 main <null> (a.out+0x0000004016dd)

  Thread T7 (tid=15389, running) created by main thread at:
    #0 pthread_create /build/gcc-multilib/src/gcc/libsanitizer/tsan/tsan_interceptors.cc:876 (libtsan.so.0+0x000000028360)
    #1 __gthread_create /build/gcc-multilib/src/gcc-build/x86_64-pc-linux-gnu/libstdc++-v3/include/x86_64-pc-linux-gnu/bits/gthr-default.h:662 (libstdc++.so.6+0x0000000badc4)
    #2 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) /build/gcc-multilib/src/gcc/libstdc++-v3/src/c++11/thread.cc:163 (libstdc++.so.6+0x0000000badc4)
    #3 worker::worker() <null> (a.out+0x000000402da3)
    #4 std::array<worker, 8ul>::array() <null> (a.out+0x000000402f80)
    #5 main <null> (a.out+0x0000004016b0)

SUMMARY: ThreadSanitizer: data race (a.out+0x402c36) in task::operator=(task&&)

It seems that the data race occurs during the move operations, between queuing and dequeuing task instances.

Unless I am missing something obvious, I do not see why a data race should occur when moving the object in and out the queue. Is this a false positive or a potential problem in the blocking queue implementation?

Redundant #IfElse For Define

concurrentqueue.h : line 240-244
`#ifdef GNUC
typedef std::max_align_t max_align_t; // GCC forgot to add it to std:: for a while

else

typedef std::max_align_t max_align_t;   // Others (e.g. MSVC) insist it can *only* be accessed via std::

endif`

This appears to be redundant. ;)

static_assert failed "The queue does not support super-aligned types at this time" with T=std::function (iOS)

I tried to use ConcurrentQueue in our project as a drop-in replacement of our crude std::queue+std::mutex queue, but unfortunately I've hit a snag on iOS.

When instantiating the queue with std::function<void(void)> that assertion in the title fails while building on Xcode7 for iOS, where it turns out that a lot of types have a bigger alignment than max_align_t:

  • std::alignment_of <std::function<void()>>::value == 8
  • std::alignment_of <details::max_align_t>::value == 4

I thought of opening this issue because I feel that the solution here might be more trivial than having to implement support for overaligned types.
I'm not sure of how the static_cast could be fixed, but perhaps max_align_t isn't the right alignment to check there?
I'm pretty sure that malloc's alignment on iOS is actually 16 bytes, so the code should just work.

"concurrentqueue.h:366:63: error: no template named 'has_trivial_destructor'"

OS: OSX 10.10.2

ERROR MESSAGE:

concurrentqueue.h:366:63: error: no template named 'has_trivial_destructor' in namespace 'std'; did you mean
      'has_virtual_destructor'?
        template<typename T> struct is_trivially_destructible : std::has_trivial_destructor<T> { };
                                                                ~~~~~^~~~~~~~~~~~~~~~~~~~~~
                                                                     has_virtual_destructor
/Applications/Xcode.app/Contents/Developer/Toolchains/XcodeDefault.xctoolchain/usr/bin/../include/c++/v1/type_traits:971:51: note: 'has_virtual_destructor' declared here
template <class _Tp> struct _LIBCPP_TYPE_VIS_ONLY has_virtual_destructor
                                                  ^

Compiler:

$ gcc -v
Configured with: --prefix=/Applications/Xcode.app/Contents/Developer/usr --with-gxx-include-dir=/usr/include/c++/4.2.1
Apple LLVM version 6.0 (clang-600.0.56) (based on LLVM 3.5svn)
Target: x86_64-apple-darwin14.1.0
Thread model: posix

Realted Macros :
__GNUC__ : 4
__GNUC_MINOR__ : 2
__APPLE__ : 1

My fix :
line 362: ( https://github.com/cameron314/concurrentqueue/blob/master/concurrentqueue.h#L362 )

- #if !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
+ #if defined(__APPLE__) || !defined(__GNUC__) || __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)

It works, but I think it seems too ugly.

Segmentation fault on producer destruction.

There seems to be a segmentation fault when destructing a producer. I compiled my code with GCC, std=c++11, pthreads, fsanitize=address, fno-omit-frame-pointer. Here is the output.

=================================================================
==30682==ERROR: AddressSanitizer: alloc-dealloc-mismatch (malloc vs operator delete) on 0x60b00000af90
#0 0x7fd92a797e3a in operator delete(void*) /build/gcc/src/gcc-5.2.0/libsanitizer/asan/asan_new_delete.cc:92
#1 0x43052b in moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer::~ImplicitProducer() (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x43052b)
#2 0x4252c7 in MPSCWorker<std::string, 1u>::~MPSCWorker() (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x4252c7)
#3 0x423c07 in main (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x423c07)
#4 0x7fd9298c760f in __libc_start_main (/usr/lib/libc.so.6+0x2060f)
#5 0x4234e8 in _start (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x4234e8)

0x60b00000af90 is located 0 bytes inside of 104-byte region [0x60b00000af90,0x60b00000aff8)
allocated by thread T0 here:
#0 0x7fd92a79693a in __interceptor_malloc /build/gcc/src/gcc-5.2.0/libsanitizer/asan/asan_malloc_linux.cc:38
#1 0x424810 in moodycamel::ConcurrentQueueDefaultTraits::malloc(unsigned long) (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x424810)
#2 0x42cad1 in moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer* moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::create<moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::ImplicitProducer, moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>*>(moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>*&&) (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x42cad1)
#3 0x42bbe0 in moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::recycle_or_create_producer(bool, bool&) (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x42bbe0)
#4 0x429e82 in moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::get_or_add_implicit_producer() (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x429e82)
#5 0x4285ef in bool moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::inner_enqueue<(moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::AllocationMode)0, MPSCWorker<std::string, 1u>::InternalType>(MPSCWorker<std::string, 1u>::InternalType&&) (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x4285ef)
#6 0x426e89 in moodycamel::ConcurrentQueue<MPSCWorker<std::string, 1u>::InternalType, moodycamel::ConcurrentQueueDefaultTraits>::enqueue(MPSCWorker<std::string, 1u>::InternalType&&) (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x426e89)
#7 0x425bf3 in MPSCWorker<std::string, 1u>::send(std::string, unsigned int) (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x425bf3)
#8 0x42394f in main (/home/cyberunner23/Documents/tmp_cpp_dev_2/DEV_PROJ/MPSCWorker/build/MPSCWorker+0x42394f)
#9 0x7fd9298c760f in __libc_start_main (/usr/lib/libc.so.6+0x2060f)

SUMMARY: AddressSanitizer: alloc-dealloc-mismatch /build/gcc/src/gcc-5.2.0/libsanitizer/asan/asan_new_delete.cc:92 operator delete(void*)
==30682==HINT: if you don't care about these warnings you may set ASAN_OPTIONS=alloc_dealloc_mismatch=0
==30682==ABORTING

And here is my code in question.

main.cpp (a little test i made for my code)

#include <iostream>
#include "MPSCWorker.hpp"

template <class Type>
class PrintSink : public SinkBase<Type>{
public:
    virtual bool onInit(){return true;}
    virtual void onExit(){}
    virtual void onProcess(Type data){std::cout << data << std::endl;}
};


int main(){

    MPSCWorker<std::string, 1> worker;
    std::unique_ptr<PrintSink<std::string>> sink(new PrintSink<std::string>);
    if(sink.get() == nullptr){
        return -1;
    }
    worker.addSink(std::move(sink), 0);
    worker.start();

    std::string test = "test";
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);
    worker.send(test, 0);

    return 0;
}

And my code using ConcurrentQueue
MPSCWorker.hpp

#ifndef MPSCWORKER_MPSCWORKER_HPP
#define MPSCWORKER_MPSCWORKER_HPP

#include <atomic>
#include <condition_variable>
#include <exception>
#include <memory>
#include <thread>

#include "ConcurrentQueue.h"

template <class Type>
class SinkBase;
class SinkNotRegisteredException;

template <class Type, const unsigned int numOfSinks>
class MPSCWorker{

public:
    //Funcs
    MPSCWorker();
    ~MPSCWorker(){

        Type msg = {};
        //Send an empty msg to the reserved channel to say we're done.
        send(msg, numOfSinks + 1);
        if(workerThread.joinable())
            workerThread.join();

        for(int i = 0; i <= numOfSinks + 1; i++){
            if(sinks[i].get() != nullptr){
                sinks[i].get()->onExit();
            }
        }
    }

    void start(){
        isThreadRunning.store(true);
        workerThread = std::thread(run,
                                   std::ref(workQueue),
                                   std::ref(sinks),
                                   std::ref(isThreadRunning),
                                   std::ref(threadCondVar),
                                   std::ref(threadMutex));
    }

    int  addSink(std::unique_ptr<SinkBase<Type>> &&sink, unsigned int sinkID){

        if(isThreadRunning.load()){
            if(sinkID <= numOfSinks + 1){
                sinks[sinkID] = std::move(sink);
                if(!sinks[sinkID].get()->onInit()){
                    return -3;
                }
            } else{
                return -1;
            }
        }else{
            return -2;
        }

        return 0;
    }

    void send(Type msg, unsigned int sinkID){
        workQueue.enqueue(InternalType{msg, sinkID});
        threadCondVar.notify_one();
    }

private:

    //Types
    struct InternalType{
        Type msg;
        unsigned int sinkID;
    };

    //Vars
    std::atomic<bool>                         isThreadRunning;
    std::thread                               workerThread;
    std::condition_variable                   threadCondVar;
    std::mutex                                threadMutex;
    //ID = numOfSinks + 1: reserved
    std::unique_ptr<SinkBase<Type>>           sinks[numOfSinks + 1];
    moodycamel::ConcurrentQueue<InternalType> workQueue;

    //Funcs
    static void run(moodycamel::ConcurrentQueue<InternalType> &workQueue,
                    std::unique_ptr<SinkBase<Type>>           (&sinks)[numOfSinks + 1],
                    std::atomic<bool>                         &isThreadRunning,
                    std::condition_variable                   &threadCondVar,
                    std::mutex                                &threadMutex){

        while(isThreadRunning.load()){

            InternalType                 msg = {};
            bool                         isDequeueSuccess = false;
            std::unique_lock<std::mutex> lock(threadMutex);

            threadCondVar.wait(lock, [&workQueue, &msg, &isDequeueSuccess](){
                isDequeueSuccess = workQueue.try_dequeue(msg);
                return isDequeueSuccess;
            });

            if(msg.sinkID == numOfSinks + 1){
                isThreadRunning.store(false);
                return;
            }

            if(sinks[msg.sinkID].get() != nullptr){
                sinks[msg.sinkID].get()->onProcess(msg.msg);
            }else{
                throw SinkNotRegisteredException("Sink ID: " + std::to_string(msg.sinkID)
                                                 + " has not been registered. Use addSink"
                                                 "to associate this ID with a Sink before"
                                                 "starting the worker.");
            }
        }
    }
};

template <class Type, const unsigned int numOfSinks>
MPSCWorker<Type, numOfSinks>::MPSCWorker(){
    for(unsigned int i = 0; i <= numOfSinks +1; i++){
        sinks[i] = nullptr;
    }
}


class SinkNotRegisteredException : std::exception{
public:
    SinkNotRegisteredException(std::string msg){this->msg = msg;}
    const char* what(){return msg.c_str();}
    std::string whatStr(){return std::string(msg);}
private:
    std::string msg;
};


template <class Type>
class SinkBase{
public:
    virtual bool onInit()             = 0;
    virtual void onExit()             = 0;
    virtual void onProcess(Type data) = 0;
};

#endif //MPSCWORKER_MPSCWORKER_HPP

I posted my code as it seems to me as a bug inside ConcurrentQueue however I'm only able to replicate with the code above... So perhaps its my fault?... (I can't figure out the exact cause...)

Need to understand ConcurrentQueue

Hello
I am looking for some faster queuing library and then when i saw the benchmark results of ConcurrentQueue, it's just awesome..

I am learning this, i need to understand few things whats this ConcurrentQueueDefaultTraits?
I mean under which circumstances I should use this
And I am going to produce from multiple threads and going to consume from respective consumer threads Do i need to just create one instance of something like

ConcurrentQueue<customeObject, Traits> q; or each thread have
ConcurrentQueue<customeObject, Traits> q;

Support for timeout on blocking dequeue

I needed this for my application and modified the unix semaphore implementation to support this.

Are you interested in adding more widespread support for this? I can create a pull request for you to look at if you want.

About ConsumerToken

Hi,there's another question.

I create a single ConsumerToken which used in multi-thread without synchronization.It appears that the dequeue method in every thread act correctlly.In your readme I found:

***************** from readme
The queue can take advantage of extra per-producer and per-consumer storage if
it's available to speed up its operations. This takes the form of "tokens":
You can create a consumer token and/or a producer token for each thread or task
(tokens themselves are not thread-safe), and use the methods that accept a token
as their first parameter


So my confution is whether the ConsumerToken is thread-safe or not exactlly? And in which situations should I use ConsumerToken?

Thanks a lot.

recycling implicit producers (introduced in e53f28cb3) causes heap_use_after_free error

Location of error: concurrentqueue.h:412
Identified by g++ AddressSanitizer

Environment: g++ 4.8.2-19ubuntu1, compiling with std=c++11, O2, -fopenmp, pthreads

I can consistently replicate the behavior in my code, but cannot pinpoint the location of the error sicne this occurs at thread termination. Downgrading to previous commit 4671562 resolves the problem. ConcurrentQueue unit tests pass fine (but it's using std::thread). I would like to be able to use your latest version of code so it'd be nice to figure out why this is happending.

==5558== ERROR: AddressSanitizer: heap-use-after-free on address 0x601800023fe8 at pc 0x7f622d6b12b1 bp 0x7f62284cbdc0 sp 0x7f62284cbdb8
READ of size 8 at 0x601800023fe8 thread T16777215
#0 0x7f622d6b12b0 in moodycamel::details::ThreadExitNotifier::~ThreadExitNotifier() /home/tpan/src/bliss/ext/concurrentqueue/concurrentqueue.h:412
#1 0x7f622a249d78 in run /build/buildd/gcc-4.8-4.8.2/build/x86_64-linux-gnu/libstdc++-v3/libsupc++/../../../../src/libstdc++-v3/libsupc++/atexit_thread.cc:64
#2 0x7f62297ebf81 in __nptl_deallocate_tsd /build/buildd/eglibc-2.19/nptl/pthread_create.c:158
#3 0x7f62297ec194 in start_thread /build/buildd/eglibc-2.19/nptl/pthread_create.c:325
#4 0x7f6229afd00c (/lib/x86_64-linux-gnu/libc.so.6+0xfb00c)
0x601800023fe8 is located 104 bytes inside of 128-byte region [0x601800023f80,0x601800024000)
==5558== AddressSanitizer CHECK failed: ../../../../src/libsanitizer/asan/asan_report.cc:344 "((t)) != (0)" (0x0, 0x0)
#0 0x7f622a50331d (/usr/lib/x86_64-linux-gnu/libasan.so.0+0x1231d)
#1 0x7f622a50a133 (/usr/lib/x86_64-linux-gnu/libasan.so.0+0x19133)
#2 0x7f622a5080d6 (/usr/lib/x86_64-linux-gnu/libasan.so.0+0x170d6)
#3 0x7f622a508f71 (/usr/lib/x86_64-linux-gnu/libasan.so.0+0x17f71)
#4 0x7f622a503733 (/usr/lib/x86_64-linux-gnu/libasan.so.0+0x12733)
#5 0x7f622d6b12b0 in moodycamel::details::ThreadExitNotifier::~ThreadExitNotifier() /home/tpan/src/bliss/ext/concurrentqueue/concurrentqueue.h:412
#6 0x7f622a249d78 in run /build/buildd/gcc-4.8-4.8.2/build/x86_64-linux-gnu/libstdc++-v3/libsupc++/../../../../src/libstdc++-v3/libsupc++/atexit_thread.cc:64
#7 0x7f62297ebf81 in __nptl_deallocate_tsd /build/buildd/eglibc-2.19/nptl/pthread_create.c:158
#8 0x7f62297ec194 in start_thread /build/buildd/eglibc-2.19/nptl/pthread_create.c:325
#9 0x7f6229afd00c (/lib/x86_64-linux-gnu/libc.so.6+0xfb00c)

Segfault during benchmark

Dima reported:

I tried to run your benchmark and moodycamel::ConcurrentQueue crashed during the test:

only enqueue (pre-allocated):
(Measures the average operation speed when all threads are producers,
and the queue has been stretched out first)

moodycamel::ConcurrentQueue
Without tokens
1 thread: Avg: 0.0131us Range: [0.0131us, 0.0131us] Ops/s: 76.48M Ops/s/t: 76.48M
2 threads: Avg: 0.0296us Range: [0.0295us, 0.0297us] Ops/s: 67.48M Ops/s/t: 33.74M
4 threads: Avg: 0.1216us Range: [0.1212us, 0.1221us] Ops/s: 32.90M Ops/s/t: 8.23M
8 threads: Avg: 0.7136us Range: [0.7077us, 0.7170us] Ops/s: 11.21M Ops/s/t: 1.40M
Segmentation fault (core dumped)

Core was generated by `./benchmarks'.
Program terminated with signal 11, Segmentation fault.
#0 load (__m=std::memory_order_relaxed, this=0x8040) at /usr/include/c++/4.8.2/bits/atomic_base.h:496

496 return __atomic_load_n(&_M_i, __m);

BlockingConcurrentQueue does not compile with exceptions disabled (Clang)

Hey

I'm trying to compile BlockingConcurrentQueue with exceptions disabled with Clang.

It looks like the ifdefs detect properly that the exceptions are disabled (I get no errors coming from MOODYCAMEL_TRY/CATCH), but the two throw std::bad_alloc() in BlockingConcurrentQueue constructors are not protected by any macros and trigger an error like:

error : cannot use 'throw' with exceptions disabled

Do you have a point of view on the matter?

Limit max number of elements in a queue

Thank you for this great library!

I was wondering if it's possible to limit a maximum number of elements in a queue (possibly per producer, at compile time)? In some cases it's necessary to prevent unlimited queue growth. I know I can override memory allocation callbacks but maybe there is a simpler way.

try_enqueue failed when size_approx() is already 0

There are N > 1 producers and one consumer. When shutting down the application, the last remaining producer sometimes loops infinitely while the consumer is still waiting for new items and channel.size_approx() also returns 0.

while (!channel.try_enqueue(producer_token, item))
{
  LOG(INFO) << "try_enqueue() failed, queue_size: " << channel.size_approx();
  usleep(wait_us);
}
// signal the consumer

Blocking Enqueue Methods (Feature Request)

Your blocking Concurrent Queue has wait and wait_for methods defined for dequeue. If there were analogous methods for enqueue in conditions where we don't want to allocate memory, that would be super useful for creating pipeline style code.

cast bug

~/felix>clang++ -std=c++11 -I. ccx.cpp
In file included from ccx.cpp:2:
./concurrentqueue.hpp:1508:78: error: reinterpret_cast from 'const char ' to 'int *'
casts away qualifiers
...idx) const MOODYCAMEL_NOEXCEPT { return reinterpret_cast<T
>(elements) + static_c...
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ccx.cpp:3:28: note: in instantiation of member function 'moodycamel::ConcurrentQueue<int,
moodycamel::ConcurrentQueueDefaultTraits>::Block::operator[]' requested here
template class moodycamel::ConcurrentQueue;

This is a stupidity in C++, you'll have to work around it.
reinterpret cast isn't allowed to cast away const.

The fix is simple: use an ordinary C cast: (T*) instead.

CAS Memory Order in Semaphore fails compile check GCC4.8

In blockingconcurrentqueue.h:291 you appear to have your memory barriers slightly wrong in the spin part of the semaphore. You have relaxed on your success barrier but acquire on your failure barrier causing the code to not pass sanity checks during compilation on GCC4.8 (and I assume later). This is not valid C++11 as your success barrier should be at least as strong as the failure barrier.

if (oldCount < 0 && m_count.compare_exchange_strong(oldCount, oldCount + 1, std::memory_order_relaxed, std::memory_order_acquire))

The success barrier should be std::memory_order_acquire as well if I'm understanding the purpose of the loop correctly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.