Code Monkey home page Code Monkey logo

threadpool's Introduction

ThreadPool

A simple C++11 Thread Pool implementation.

Basic usage:

// create thread pool with 4 worker threads
ThreadPool pool(4);

// enqueue and store future
auto result = pool.enqueue([](int answer) { return answer; }, 42);

// get result from future
std::cout << result.get() << std::endl;

threadpool's People

Contributors

progschj avatar vadz avatar wilx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

threadpool's Issues

Template deduction fails with pointer-to-member functions

Code such as this will fail in template deduction:

auto member_task = TP.enqueue(&MyClass::member_func, this);

std::result_of uses std::declval to get the proper result.

Changing the declaration of ThreadPool::enqueue to:

template <class F, class... Args>
auto enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>;

allows the above to compile. Similarly change the typedef inside the definition.

If I felt I had more expertise in template metaprogramming, I would have submitted a patch. But I may have omitted std::forward in places where it matters.

using previous example code breaks

using the 'usage' code from your previous blog post doesn't compile:

#include "ThreadPool.h"
#include <stdio.h>
#include <iostream>

int main() {
    // create a thread pool of 4 worker threads
    ThreadPool pool(4);

    // queue a bunch of "work items"
    for(int i = 0;i<8;++i)
    {
        pool.enqueue([i]
        {
            std::cout << "hello " << i << std::endl;

            std::cout << "world " << i << std::endl;
        });
    }
}

errors out with:

main.cpp: In function 'int main()':
main.cpp:17:7: error: no matching function for call to 'ThreadPool::enqueue(main()::<lambda()>)'
main.cpp:17:7: note: candidate is:
In file included from main.cpp:1:0:
ThreadPool.h:117:15: note: template<class T, class F> Result<T> ThreadPool::enqueue(F)
ThreadPool.h:117:15: note:   template argument deduction/substitution failed:
main.cpp:17:7: note:   couldn't deduce template parameter 'T'
scons: *** [build/main.o] Error 1

why not fix the problem with condition variable to avoid dead-lock?

never mind... i did not fully understand condition variable implementation when open this issue...

previously, i did not realize that the outer mutex lock guarantees the atomic operation for condition
and within the implementation of inner waiting queue, inner mutex lock guarantees the atomic operation for the inner waiting queue

100% CPU utilization

Hello there,

At first, I'd like to thank you for your great work. I just imported the code in my project and it just working fine. :)

However, one thing that bothers me is that as soon I create the thread pool (and ofcourse adding some initial tasks in there), the CPU usage is getting increases over the time and eventually reach 100% (after 10sec), with considering the fact that there is no remaining tasks in queue.

Is it normal behavior ? or am I doing something wrong ? and If you have suggestion for me.

btw, my build environment is FreeBSD 10.2 amd64 RELEASE

Thank you,

Duplicate symbol

Hi,

If I try to include ThreadPool.h in in different source files in my project, the linker starts complaining about duplicate symbols. I'm using XCode 4.6.1 (clang 4.2).

Thanks.

Could change var name "task" at line 70 to "task_ptr"?

First time,I read this code,I have mistaken task is packaged_task at this line auto task = std::make_shared< std::packaged_task<return_type()> >(std::bind(std::forward(f), std::forward(args)...));.

And I worry about whether this line task = std::move(this->tasks.front()); exception safe.At here task is a callable type.I spend many time on reading this line ,tasks.emplace(task{ (*task)(); }); then I found at here task is shared_ptr.

This is a suggestion.
Thanks everyone.

Deadlock spotted!

basically I was just profiling to see how many empty jobs can be scheduled every frame to get idea of average overhead.. and it happened that the queue incurs in deadlocks! The following code reproduce the error on my machine (you just need to play with number of threads in pool and initial recursive depth, the deadlock will necessarily occurs first or later.)

In the few cases in wich following code didn't deadlocked it required ~200 ms (2x2Ghz cores) so 5000 jobs per second is actual throughput if you remove the deadlock.

void recursiveWork(ThreadPool & pool, int depth){
if (depth<=0)
return;
auto result = pool.enqueue( {return 1;}); //empty job.. apparently the formatting remove some parentheses
//here, I'm just passing a lambda that returns 1 by the way
recursiveWork(pool, depth-1);
result.get();
}

int main(){
ThreadPool pool(4);
for (int i= 0; i<1000; i++){
recursiveWork(pool,4);
}
return 0;
}

How to pass multi parameters to pool.enqueue?

Hello, I looked example.cpp and found that the example pass i to pool.enqueue. But in fact, I would like to use this pool to run a function with multi parameters. The code seems a little confusing to me, so l'd like to ask how to pass multi parameters to pool.enqueue?

Errors on VS2012

I have a bunch of errors using visual studio 2012.

e.g. Error 1 error C2332: 'class' : missing tag name

Can anyone help?

threadpool for member function

hi I wonder how can i use the threadpool for member funtion in class and my code like:

UAVProcessThreadPool threadPool(2);
std::vector<std::future> errReturn;
for(const auto iter:feature)
{
errReturn.emplace_back(threadPool.UAVProcess_Enterqueue(
[](FeatureParam param){UAVProcessFeatExtractEach(param);}
,&iter.second
));
}

the code is in member funtion and function UAVProcessFeatExtractEach is also a member funtion, could you please show me a sample of how to use the threadpool in condition like this?

Race Condition in ThreadPool Constructor

There's a race condition for the shared resource task in the ThreadPool constructor. Move the variable definition outside of the infinite loop and add the C++11 thread_local storage duration specifier:

thread_local std::function<void()> task;

Should ThreadPool provide a function which waits until all thread finish?

Dear developer:
I want to add shutdown function that waits until all worker finish. Because I found ThreadPool will deconstructing directly if process exit. I think user have right to choose terminate ThreadPool or wait for finishing.

Are you agree with me? If you do I can submit a PR. Have a good day!

Windows support

Hey.

So I see that this library uses C++11 heavily. How much is this supported on Windows? I want to use this for a cross-platform project.

Kind regards,
Ingwie

android and ios support?

I tried to compile this using android NDK r10 with no luck. There seems to be something wrong with std::future, have you tried or plan to use it on mobile platform?

enqueue: provide variadic function

For now we're able to enqueue e.g. lambdas, or use std::bind to curry functions taking various arguments.

It would be reasonable to provide a variadic enqueue instead, taking various arguments and simply passing them on, e.g.

template<typename Function, typename... Args>
auto enqueue(Function&& f, Args&&... args) -> std::future<decltype(std::forward<F>(f)(std::forward<Args>(args)...))>;

The behavior should be similar to e.g. std::thread's interface.

Simplified enqueue

I've simplified the enqueue method, now the additional classes are not needed and the container is std::deque<std::function<void()>>:

template<class Result, class Callable>
std::future<Result> ThreadPoolManager::enqueue(Callable callable) {
  /**
   * Create task, we use RAII here and copy-capture lambda later as this std::packed_task has to be
   * deleted after it was executed.
   */
  auto task = std::make_shared<std::packaged_task<Result()>>(callable);

  /** Get the result of the task */
  std::future<Result> result = task->get_future();

  {
    /** Lock the queue */
    std::unique_lock<Mutex> lock(mutex_);

    /** Push new task to the queue */
    tasks_.push_back([=] {
      /** Execute task */
      (*task)();
    });
  }
  /** Notify one worker that there is a work to do */
  conditional_variable_.notify_one();

  return result;
}

VS2013: Running the example causes std::bad_function_call

I compiled the example.cpp using VisualStudio 2013. Running it caused a series of:

First-chance exception at 0x761C4118 in TEST.exe: Microsoft C++ exception: std::bad_function_call at memory location 0x0143F4BC.

during enqueuing tasks on a ThreadPool. Do you know what can cause the problem?

Race condition: ThreadPool deadlocks on destruction

Have a look at the worker loop:

ThreadPool/ThreadPool.h

Lines 41 to 50 in 9a42ec1

for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;

and the destructor of the thread pool:

ThreadPool/ThreadPool.h

Lines 87 to 96 in 9a42ec1

inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers)
worker.join();
}

There's a race condition. Let's say that 4 worker threads wait on the lock while destructor fires. The destructor unlocks the lock, and calls condition.notify_all(); but if not all workers managed to reach this->condition.wait(...) then not all will be notified. After some time they finally reach this->condition.wait(...) and wait forever and the destructor hangs forever on worker.join();. Unless ThreadPool::enqueue is fired in parallel (forcing notification) but running methods while object is being destroyed is Undefined Behaviour.

Add a new branch for C++ 17.

Notable changes:

  1. std::result_of has been deprecated in C++ 17. Use std::invoke_result instead.
  2. We should use std::invoke instead of write f(args...) directly.
  3. We could use initializer in lambda's capture list in C++ 14, so instead of make_shared<std::packaged_task> and copying it into the lambda we can write [task = std::move(task)].

How to get the thread id in 'task' function?

Hi,
Thanks for your powerful thread pool. However, when I use it, I would like to get the logic thread id (0, 1, 2, 3) when I invoke the task function. I am wondering how to enable this function in current implementation?
Look forward to your reply.

Race Condition in ThreadPool Destructor

There's a race condition for the shared resource stop in the ThreadPool destructor. Change the definition of lock to be as follows:

std::lock_guard<std::mutex> lock(this->queue_mutex);

enqueue: capture by universal reference

Is there a reason the enqueue function doesn't capture the type-deduced function as universal reference? E.g.:

template<typename Function>
auto enqueue(Function&& f) -> std::future<decltype(std::forward<F>(f)())>;

This would allows us to std::forward(f) later on, preserving the rvalue/lvalue type.

This should handle situations nicely where we enqueue

  • rvalues (e.g. anonymous lambdas)
  • lvalues (e.g. named functors)

queue<packaged_task> has memory leaks?

When I enqueue some task:

		for (int i = 0; i < 10000; i++)
		{
			pool.enqueue_task([i] {
				cout << i << endl;
			});
		}

After all the task ENDED:

snipaste_2018-12-11_11-35-45

The std::packaged_task don't seem to be released, and they will be released after ThreadPool destructed. The same to queue.

main thread block on notify_one():sorry I didn't notice the former issue.

When you change the example.cpp as follows(1.more than 1000 loop times and 2.delete cout in work function):

int main()
{
    m::MThreadPool pool(4);
    std::vector< std::future<int> > results;
    for(int i = 0; i < 10000; ++i) {
        results.emplace_back(
            pool.enqueue([i] {
                return i*i;
            })
        );
    }
    for(auto && result: results)
        result.get();
    
    return 0;
}

The program is easy to block.
I add some output in enqueue function and thread.
This is the output when the program block.
all
We can see, when notify_one() was call, two threads were blocking on condition.wait() and one thread was in calculating and one thread was waiting for mtx.lock().

I can't understand why the main thread will block.

*edit* Nvm, my own fault

std::shared_ptr<ThreadPool> _pool;
_pool = std::make_shared<ThreadPool>(4);

I'm new to c++ and haven't quite figured out what the issue with trying to do it like this is yet. The threads do manage to get into the block for

if (this->stop && this->tasks.empty())
    return;

But creating the threadpool this way leaves them waiting or deadlocked afterwards.

Will continue trying to figure it out tomorrow (good learning experience for myself)

edit nvm my own fault in my test project, works fine lol

Wait for all workers to be done

Is there a way to do that? I can only see the method to add a function to the queue. But I can not see a function to wait for when all threads are done working.

In my scenario, it will be that the main application sends a number of tasks to the queue and in the meantime processes and prepares new tasks to be put into the list. However, if all tasks are processed and handed to the queue, how do I wait for all tasks to be done?

perfect thread pool

if one thread cost many many time , other thread would be blocked.
all thread can not execute on same time .

threadpool.h should be modified. insert "lock.unlock()" before line 55.

bye the way, i have a question. why do you add lock in the line 99 when destroy threadpool?
because this lock. application can not exit when destroy threadpool.

condition.wait deadlock

Hello,

Sometimes, after a worker has begun waiting, the other threads cannot lock the mutex anymore.
I don't get why...
This occurs very rarely, roughly once every 30000 call to enqueue().
It is however a serious issue for my use case.

To track the issue, I have added some prints in enqueue() and the worker :

Prints in enqueue :

std::future<return_type> res = task->get_future();
cout << "enqueue waiting for lock" << endl;
{
    cout << "enqueue locked" << endl;
    std::unique_lock<std::mutex> lock(queue_mutex);

    // don't allow enqueueing after stopping the pool
    if(stop)
        throw std::runtime_error("enqueue on stopped ThreadPool");

    tasks.emplace([task](){ (*task)(); });
    cout << "enqueue will unlock" << endl;
}
cout << "enqueue will notify" << endl;
condition.notify_one();
cout << "enqueue notified" << endl;

Prints in the worker lambda :

                std::function<void()> task;
                cout << "worker "<<i<<" waiting for lock" <<endl;
                {
                    std::unique_lock<std::mutex> lock(this->queue_mutex);
                    cout << "worker "<<i<<" waiting" << endl;
                    this->condition.wait(lock,
                        [this]{ return this->stop || !this->tasks.empty(); });
                    cout << "worker "<<i<<" awoken" << endl;
                    if(this->stop && this->tasks.empty())
                        return;
                    task = std::move(this->tasks.front());
                    this->tasks.pop();
                }
                cout << "worker "<<i<<" released"<<endl;
                task();

When the deadlock happens I get :

enqueue waiting for lock
enqueue locked
enqueue will unlock
enqueue will notify
worker 5 awoken
worker 5 released
worker 3 waiting for lock
worker 3 waiting
enqueue notified
enqueue waiting for lock
enqueue locked
enqueue will unlock
enqueue will notify
worker 6 waiting for lock
worker 6 waiting
worker 6 awoken
worker 6 released
worker 4 waiting for lock
worker 4 waiting
worker 5 waiting for lock
worker 6 waiting for lock

Two things I don't get

  • why the main thread hangs ? (I should get a "enqueue notified")
  • why is this deadlock happening ? (condition.wait should have been called and released the lock..)

I hope I don't give you too much of a headach
Cheers

crash issue

I have many locations to wait the result, so thousand futures are created, but the program throw an exception about the future creations.

The reason may be the inner std::async use the std::launch::async mode.

Add a variable to control the tasks size

A program may accepts many tasks at the same time, it will grow the memory to use. If the memory is taken a lot by the program, the system will kill the program.
May the library add a variable to control the max tasks can be added to the std::queue? Thanks.

  1. // the task queue
    
  2. std::queue< std::function<void()> > tasks;
    

Access violation reading location exception throw out

plantform:
VS2015

code:
#include <threadpool.h>
int main()
{
ThreadPool pool(254);
}

error:
Exception thrown at 0x73163218 (tmmon.dll) in xxx.exe: 0xC0000005: Access violation reading location 0x00D55180.
If there is a handler for this exception, the program may be safely continued.

How can I fix it? When I use try..catch to surround the ThreadPool, it does not work, the error also occurs.

Competition problem

  1. I create a thread pool thread 4.
  2. I have to run f(x) on 4 different threads.
  3. You can not recycle the thread or clean stack of the task?

Help me.

notify_one in ThreadPool::enqueue - valgrind error

valgrind drd error

==3727== Probably a race condition: condition variable 0x6127e30 has been signaled but the associated mutex 0x6127e08 is not locked by the signalling thread.
==3727== at 0x4C37B95: pthread_cond_signal@* (in /usr/lib/valgrind/vgpreload_drd-amd64-linux.so)
==3727== by 0x534DCF8: std::condition_variable::notify_one() (in /usr/lib/x86_64-linux-gnu/libstdc++.so.6.0.21)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.