Code Monkey home page Code Monkey logo

thread-pool's Introduction

I am a theoretical, mathematical, and computational physicist, and an Assistant Professor of Physics at Brock University. My research focuses on the nature of time and causality in general relativity and quantum mechanics, as well as symbolic and high-performance scientific computing. For more information, please see my personal website.

thread-pool's People

Contributors

bshoshany avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

thread-pool's Issues

It would be better to provide some unit tests.

As far as I know, there are 2 benifits if you would provide unit tests:

  1. Confidence: with all the use cases tested, provider would be more confident with the code.
  2. Convenience: it would be convenient for users to refer to the test cases while using the code.

Small issue with lates release

So I was extending the fresh thread-pool release with a priority_queue. While doing so I ran into multiple issues:

  • std::vector<std::atomic<bool>> seems to be invalid code.
    I read that std::atomic is non-copyable and non-movable, while std::vector requires at least either.
  • running static analysis (cppcheck) told me flag1 == falg2 comparing the same value.
    Not sure if cppcheck is right here, but I think a compiler might optimize this away...
    Possible solution: declaring bool flag; a volatile.

I really like how simple the Library is 😃

My wip code: https://github.com/Green-Sky/thread-pool/commits/priority_queue
Right now my code consistently fails the "task monitoring" test. I suspect the first issue to be the cause.

Is there anyway to get and pass thread IDs or pointers to tasks?

I divide data by the number of threads. Each thread will update its divided data. All will merge at the end. By doing that I can avoid using locks. However I need to know which thread is used to run a task. It could be whatever such as thread IDs or thread pointers. Thanks

Suggestion: Consider using vector of threads instead of smart pointer to dynamically allocated array of threads.

Current memory allocation for threads:

    thread_pool(const ui32& _thread_count = std::thread::hardware_concurrency())
            :thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()),
             threads(new std::thread[_thread_count ? _thread_count : std::thread::hardware_concurrency()])
    {
        create_threads();
    }

Suggestion:

// Class member declaration
std::vector<std::thread> thread;

// Constructor
    thread_pool(const ui32& _thread_count = std::thread::hardware_concurrency())
            threads(_thread_count, {&thread_pool::worker, this})
    { }

Then the number of threads is also available from threads.size(), and RAII takes care of the vector's deconstruction.

[BUG] Multiple Thread Pools

Describe the bug

Instantiating multiple thread pools throughout a code base causes issues when attempting to perform operations on said thread pools.

Minimal working example

Just allocate two thread_pool objects on the stack in different objects (also on the stack) and try to use them.

Behavior

No crashes.

System information

  • CPU model, architecture, # of cores and threads: i7-10875H x 16
  • Operating system: Ubuntu 21.10
  • Name and version of C++ compiler: gcc 11.2
  • Full command used for compiling, including all compiler flags: Bazel's -c opt for C++20
  • Thread pool library version: Latest

(Please note that only the latest version of the thread pool library is supported.)

Additional information

Include any additional information here.

[BUG]Problem with parallel for

Describe the bug

I found about this library and I think it's great. I was reading the article and trying out the examples and the example with the parallel loop failed to compile.

Minimal working example

first I tried this code

thread_pool pool(10);
size_t squares[100];
double a=1;
pool.parallelize_loop(0, 99, [&squares, &a](size_t i) { squares[i] = i * i* a; });

and got the compiler error

In file included from pool.cpp:1:
thread-pool/thread_pool.hpp: In instantiation of ‘void thread_pool::parallelize_loop(const T1&, const T2&, const F&, thread_pool::ui32) [with T1 = int; T2 = int; F = main()::<lambda(size_t)>; thread_pool::ui32 = long unsigned int]’:
pool.cpp:7:24:   required from here
thread-pool/thread_pool.hpp:155:31: error: no match for call to ‘(const main()::<lambda(size_t)>) (const T&, const T&)’
  155 |                           loop(start, end);
      |                           ~~~~^~~~~~~~~~~~
pool.cpp:7:32: note: candidate: ‘main()::<lambda(size_t)>’
    7 |   pool.parallelize_loop(0, 99, [&squares, &a](size_t i) { squares[i] = i * i* a; });
      |                                ^
pool.cpp:7:32: note:   candidate expects 1 argument, 2 provided

Then, I tried the example on the paper

#include "thread_pool.hpp"
int main()
{
  thread_pool pool(10);
  size_t squares[100];
  pool.parallelize_loop(0, 99, [&squares](size_t i) { squares[i] = i * i; });
  std::cout << "16ˆ2 = " << squares[16] << '\n';
  std::cout << "32ˆ2 = " << squares[32] << '\n';
}

And got a similar error

In file included from pool.cpp:1:
thread-pool/thread_pool.hpp: In instantiation of ‘void thread_pool::parallelize_loop(const T1&, const T2&, const F&, thread_pool::ui32) [with T1 = int; T2 = int; F = main()::<lambda(size_t)>; thread_pool::ui32 = long unsigned int]’:
pool.cpp:6:24:   required from here
thread-pool/thread_pool.hpp:155:31: error: no match for call to ‘(const main()::<lambda(size_t)>) (const T&, const T&)’
  155 |                           loop(start, end);
      |                           ~~~~^~~~~~~~~~~~
pool.cpp:6:32: note: candidate: ‘main()::<lambda(size_t)>’
    6 |   pool.parallelize_loop(0, 99, [&squares](size_t i) { squares[i] = i * i; });
      |                                ^
pool.cpp:6:32: note:   candidate expects 1 argument, 2 provided

I compiled with the command

g++ -pthread -std=c++17 pool.cpp

System information

  • CPU model, architecture, # of cores and threads: intel core i5 11th gen, 8 cores.
  • Operating system: Pop os 20.10
  • Name and version of C++ compiler: GCC 11.2.0
  • Full command used for compiling, including all compiler flags: -pthread -std=c++17 pool.cpp
  • Thread pool library version: cloned from mater branch on January 18.

Use lock-free queue as the task queue

In the current code, a normal queue with mutex lock is used as the task queue. The cost of lock cannot be ignored when there are a lot of small tasks. The overheads cause that the sleep duration of per worker cannot be too short or even use busy-polling. I think a lock-free queue could help solve this problem.
The CTPL project also uses a version using lock-free queue to improve the performance.

Request URL changes if I execute it in thread_pool

I'm using your lib and restclient-cpp lib to compile get and post requests and execute them in parallel to the main runtime.

This is an example version of my code:

#include "../restclient-cpp/restclient.h"
#include "../thread_pool.hpp"

enum RequestType {
  get, post, put, patch, del, head, options
};

struct Request {
  RequestType type = RequestType::get;
  std::string url;
  std::string body_type;
  std::string body;

[[nodiscard]] RestClient::Response execute() const {
switch (type) {
    case get: { std::cout << " - Inside request.h get. URL:\n" << url << "\n"; return RestClient::get(url); }
    default: { assert(false); }
    }
  }
}

Request() = default;

Request(RequestType type, std::string url): type(type), url(std::move(url)) {}
}

Request get_positions() {
  return {RequestType::get, 
  "https://api.bybit.com/private/linear/position/list"}}

void callback(const RestClient::Response& response) {
  std::cout << request.url << "\n";
}

int main(int argc, const char** argv) { 
  RestClient::init(); 
  thread_pool pool; 
  std::function<void(const RestClient::Response&)> callback = [&](const RestClient::Response& response) {
      callback(response);
    };
  Request request; 
  request = get_positions();
  std::cout << "Request url:\n" << request.url << "\n"; 
  std::cout << "Pushed to threadpool\n";
  pool.push_task([&](){ 
    callback(request.execute()); 
  }); 
}

If I call request.execute() outside of the threadpool, it gets executed normally, while if I call it in the push_task the URL of the request gets modified, it's like if the first few chars in the url string get converted to bytes or something, I can't figure it out. Those are the prints:

Request url in main thread:
https://api.bybit.com/private/linear/position/list
Pushed to threadpool
Inside request.h get. URL:
t��t.com/private/linear/position/list

I asked this question in the restclient-cpp repo issues aswell.

link errors with 3.2.0

The following two lines cause link error (using Visual Studio + mingw64 or remote ubuntu g++12) because of multiple defined symbols :

std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);

Direct compilation (ubuntu g++ or msvc) don't trigger linker problems.

However, changing these lines to

inline static std::ostream& (&endl)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::endl);
inline static std::ostream& (&flush)(std::ostream&) = static_cast<std::ostream& (&)(std::ostream&)>(std::flush);

seems more correct, don't hurt already working builds, and fixes the linking error.

[REQ] Passing thread_pool as parameters?

Describe the new feature

Passing thread_pool as parameters in recursion.

Code example

For the example bellow, I want to parallel the chain operation in multiple threads. Each "parent" chain object will generate two "children", next and update, until quick element in each object is equal to 0. After next and update are produced, the parent can be destroyed. I want to put next and update to the thread pool, like I awkwardly described in ChainReact().

The basic process is like:
start --> start_n (from start.next()) and start_u (from start.update()), then throw them to the thread pool and destroy start.

start_n --> start_n_n and start_n_u, then throw them to thread pool and destroy start_n.
start_u --> start_u_n and start_u_u, then throw them to thread pool and destroy start_u.
...

I give a silly example below, but I can not compile it. Please feel free to modify it, or correct me if I misused the thread-pool.

#include <iostream>

#include "thread-pool/thread_pool.hpp"

using namespace std;

class chain {
public:
  chain(const int quick, const int slow) : quick{ quick }, slow{ slow } {}

  int get_quick() const { return quick; }
  int get_slow() const { return slow; }

  chain next() const {
    chain elem{ quick / 2, slow };
    return elem;
  }

  chain update() const {
    chain elem{ (quick + slow) / 2, slow > 0 ? slow - 1 : 0 };
    return elem;
  }

  void print() const {
    cout << "quick is: " << quick << "; slow is: " << slow << endl;
  }

private:
  const int quick, slow;
};


void ChainReact(const chain& start, const thread_pool& tpool) {
  if (start.get_quick() == 0) {
    cout << "End reaction." << endl;
  } else {
    chain nchain = start.next();
    tpool.push_task(ChainReact, nchain, tpool);

    chain uchain = start.update();
    tpool.push_task(ChainReact, uchain, tpool);
  }

  if (tpool.get_tasks_total() == 0) { return; }
}


int main() {

  thread_pool pool;

  cout << "#threads: " << pool.get_thread_count() << endl;

  chain start{ 4, 20 };
  start.next().print();
  start.update().print();

  ChainReact(start, pool);

  return 0;
}

Less copy in push_task?

    template <typename F>
    void push_task(const F &task)

Maybe it would be better to forward reference or move? If no then why :) ?

Cannot be used inside the class

I found out that I can't call threadpool
inside class. If I call it in main function in app (like your example) or in dll like this:

void sleep_half_second(const size_t& i, synced_stream* sync_out)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(500));
    sync_out->println("Task ", i, " done.");
}

BOOL APIENTRY DllMain( HMODULE hModule,
                       DWORD  ul_reason_for_call,
                       LPVOID lpReserved
                     )
{
    switch (ul_reason_for_call)
    {
    case DLL_PROCESS_ATTACH:
    case DLL_THREAD_ATTACH:
    case DLL_THREAD_DETACH:
    case DLL_PROCESS_DETACH:
        break;
    }
    thread_pool* pool;
    synced_stream sync_out;
    int i = 1;
    pool = new thread_pool(12);
    pool->push_task(sleep_half_second, i, &sync_out);

    return TRUE;
}

It's OK, doesn't generate error c2064. But if using it in class, it still generates error c2064.

Please help me fix this, I need to call it inside the class

doubleRsi.zip
Thank you very much

Potential zero threads

explicit thread_pool(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) : thread_count(thread_count_ ? thread_count_ : std::thread::hardware_concurrency()), threads(std::make_unique<std::thread[]>(thread_count_ ? thread_count_ : std::thread::hardware_concurrency()))

std::thread::hardware_concurrency() may return 0. So when constructed as thread_pool(), thread_count may be initialised to 0. thread_count(thread_count_ > 1 ? thread_count_ : 1 may be safer.

Feedback: "Awesome library :D"

No problems with the library:
-awesome performance (2x performance of OpenMP in VS Clang compiler)
-simple use and really nice manual
-cross-platform compatibilty
Thank you !

Task exceptions are not forwarded to caller

In case an exception is thrown inside a task, this exception leads to the direct termination of the application.
To fix that, the scheduler has to catch the exception in the sub-task and set the exception-context of the promise object itself.
For details, see cppreference: https://en.cppreference.com/w/cpp/thread/promise/set_exception

Example:

auto fut = pool.submit([](){throw std::runtime_error("error");});
try {
  fut.get();
} catch(std::runtime_error & e){
  std::cout << "catched error" << std::endl;
}

This application terminates with:

terminate called after throwing an instance of 'std::runtime_error'

Suggestion about wake-up mechanism of worker

Currently, threading worker function wake-up mechanism is kind of a polling way which use sleep_or_yield() to yield CPU.

  void worker() {
    while (running) {
      std::function<void()> task;
      if (!paused && pop_task(task)) {
        task();
        tasks_total--;
      } else {
        sleep_or_yield();
      }
    }
  }

How about to use std::condition_variable wait-notification wake-up mechanism? Thanks.
Something like below:
worker function changed to wait for notification

  void worker() {
    while (running) {
      std::unique_lock lock(queue_mutex);
      tasks_cv.wait(lock, [this] {
        return !this->is_tasks_queue_empty() || !this->running();
      });
      std::function<void()> task;
      if (!paused_ && pop_task(task)) {
        task();
        tasks_total--;
      }
    }
  }

wait up worker by cv notification when task coming or thread pool reset or thread pool shutdown

  template <typename F>
  void push_task(const F &task) {
    tasks_total++;
    {
      const std::scoped_lock lock(queue_mutex);
      tasks.push(std::function<void()>(task));
    }
    tasks_cv.notify_all();
  }
  void reset(const ui32 &_thread_count = std::thread::hardware_concurrency()) {
    bool was_paused = paused;
    paused = true;
    wait_for_tasks();
    running = false;
    tasks_cv.notify_all();
    destroy_threads();
    thread_count =
        _thread_count ? _thread_count : std::thread::hardware_concurrency();
    threads.reset(new std::thread[thread_count]);
    paused = was_paused;
    create_threads();
    running = true;
  }
  ~thread_pool() {
    wait_for_tasks();
    running = false;
    tasks_cv.notify_all();
    destroy_threads();
  }

std::move prevents copy ellision of std::function

When using the lib, a compiler warning occurs due to moving and object directly and not a unique/shared pointer.
Place of occurrence: thread_pool.hpp:131

clang warning:

warning: moving a temporary object prevents copy elision [-Wpessimizing-move]
            tasks.push(std::move(std::function<void()>(task)));

note: remove std::move call here
            tasks.push(std::move(std::function<void()>(task)));

Solution: Remove the std::move and push the object directly to the vector.

    template <typename F>
    void push_task(const F &task)
    {
        tasks_waiting++;
        {
            const std::scoped_lock lock(queue_mutex);
            tasks.push(std::function<void()>(task));
        }
    }

Migrate from std::function to std::packaged_task

I have recently used this library as inspiration for writing my own thread pool class and found some advantages to using std::packaged_task over std::function for task storage:

  • Able to capture and store move-only objects such as unique_ptr (usability boost).
  • Provide future return values natively, i.e. no need to create one explicitly via shared_ptr in the submit function (efficiency boost).
  • Automatically catches exceptions and passes to the future object (readability boost).

I am not sure if you have already considered this and chosen to avoid it. I would be happy to put together a merge request if you are willing to review it.

passing thread data structure

hi and thanks for this great job
I have a structure that im allocating memory for each thread and finally each thread has their own data structure for executing function
but I don't know how to pass their own thread_data while the task is in queue and I don't know which thread will execute it

struct thread_data
{
    /* some stuff */
};

void func(thread_data* data, int x)
{
	/* some stuff */
}

int main(int argc, char** argv)
{
	int num_thread = 4;

	thread_data* data = (thread_data*)malloc(num_thread * sizeof(thread_data)); // struct array for each thread
	if (data == NULL) return 1;

	thread_pool pool(num_thread);

	for (auto i = 1; i <= 100; i++)
		pool.push_task(func, &data[0], i); // !HERE! &data[0], &data[1], ... ?

	return 0;
}

if two or more threads are using the same structure, program crashes, they should have their own unique structure. How to solve this problem?

wait_for_tasks() no work

Without std::this_thread::sleep_for(std::chrono::microseconds(1000)) after push_task, it will quit immediately, while the tasks are not done.

    thread_pool pool(6);
    for (uint32_t j = 0; j < scene.height; ++j)
    {
        for (uint32_t i = 0; i < scene.width; ++i)
        {
            int offset = j * scene.width + i;
            if( offset == framebuffer.size() ) {
                continue;
            }

            // generate primary ray direction
            float x = (2 * (i + 0.5) / (float)scene.width - 1) * imageAspectRatio * scale;
            float y = (1 - 2 * (j + 0.5) / (float)scene.height) * scale;
            Vector3f dir = normalize(Vector3f(-x, y, 1));

            //std::cout << x << "," << y << std::endl ;

            pool.push_task([&] {
                for (int k = 0; k < spp; k++) {
                    framebuffer[offset] += weight*scene.castRay(Ray(eye_pos, dir));
                }
            });

            std::this_thread::sleep_for(std::chrono::microseconds(1000));
        }
    }

    //sleep(1000);
    pool.wait_for_tasks();

[Question] How to retrieve pointer function of functions available in thread pool list ?

Again a question, sorry :)

I try to developp parfeval equivalent feature using thread-pool ;)
https://pbs.twimg.com/media/FVeuU0gVsAIjxEL?format=jpg&name=small
https://pbs.twimg.com/media/FSN0xl_X0AEt9hJ?format=jpg&name=900x900

Currently I manage another list with all functions pointer submitted. But maintains another list is difficult (synchro).

How can I do this without maintaining another list with submitted functions ?

f function evaluate fptr(args)

threadpool pool;
pool.submit(f, fptr1, args)
pool.submit(f, fptr2, args)
pool.submit(f, fptr3, args)

I would like to have list fptr1, ... fptr3 available in thread_pool and also two lists with running and queued functions

std::vector<fptr> runningsFunctions = pool.getRunningThreads()
std::vector<fptr> queuedFunctions = pool.getQueuedThreads()
std::vector<fptr> poolFunctions = pool.getAllThreads()

Ideas are welcome (Thanks for your help)

[TEST] Problem with vectors

Hi,
Is it possible to add from your library to a public vector or a vector passed by reference? And then adding another job to the pool. Or is it possible to push a vector through global variables. I mean mainly: "test 4". Perhaps there is another way to achieve the same, or to deal with vectors in a special way when multithreading. I have tried to use std :: unique_lock to ensure proper access but to no avail. Can you tell if it's my fault? I tested it in an android studio.

My code:

#include "BS_thread_pool.hpp"
#include <android/log.h>
#include <mutex>
#include <shared_mutex>

using namespace std;
BS::thread_pool pool;
vector<string> test_3;
vector<vector<string>> test_4;
std::shared_mutex door;
mutex m;

void test1(vector<std::vector<std::string>> test)
{
    std::vector<std::string> res;
    res.push_back("string");
    res.push_back("string2");
    test.push_back(res);
}


void test2(vector<std::vector<std::string>> &test)
{
    std::vector<std::string> res;
    res.push_back("string");
    res.push_back("string2");
    test.push_back(res);
}

void test3(std::string str)
{
    test_3.push_back(str);
}

void test4(std::string str)
{
    std::vector<std::string> res;
    res.push_back(str);
    res.push_back(str+"_2");
    std::unique_lock<std::shared_mutex> ul(door);
    test_4.push_back(res);
    pool.push_task(test4, str+"_test");
}


void run_test()
{
    // test 1
    // without reference
    // tasks don't crash but vector isn't updated (no reference)
//    vector<std::vector<std::string>> test;
//    for (int i = 0; i < 100; i++)
//    {
//        pool.push_task(test1, test);
//    }
//    pool.wait_for_tasks();

    //print results
//    for(auto &row_info :test)
//    {
//        for (auto &info :row_info)
//        {
//            __android_log_print(ANDROID_LOG_INFO, "Object: ", "%s", info.c_str());
//        }
//    }

    // test 2
    // with reference
    // don't compile
    // In template: type 'const std::vector<std::vector<std::basic_string<char>>>' does not provide a call operator
//    vector<std::vector<std::string>> test_2;
//    for (int i = 0; i < 100; i++)
//    {
//        pool.push_task(test2, &test_2);
//    }
//    pool.wait_for_tasks();


    // test 3
    // with global vector
    // work don't correctly
    // sometimes lost parameters
//    for (int i = 0; i < 100; i++)
//    {
//        pool.push_task(test3, to_string(i));
//    }
//    pool.wait_for_tasks();

    //Object:: 97
    //Object:: 98
    //Object:: 82
    //Object:: 99
    //Object::
    //Object::
    //Object:: 93
//    for(auto &row_info :test_3)
//    {
//        __android_log_print(ANDROID_LOG_INFO, "Object: ", "%s", row_info.c_str());
//    }

    // test 4
    // with vector of vector with shared mutex
    // compile but crash app without mutex, freeze when used mutex - no errors
    for (int i = 0; i < 100; i++)
    {
        pool.push_task(test4, to_string(i));
    }
    pool.wait_for_tasks();

    for(auto &row_info :test_4)
    {
        for (auto &info :row_info)
        {
            __android_log_print(ANDROID_LOG_INFO, "Object: ", "%s", info.c_str());
        }
    }

}

System information

  • Core i7 7700HQ
  • Operating system: Windows 10, Android Studio, NDK 21
  • CMAKE config:
    cmake_minimum_required(VERSION 3.18.1)
    set(CMAKE_CXX_STANDARD 17)
    set(CMAKE_CXX11_EXTENSION_COMPILE_OPTION "-std=c++17")
  • Thread pool library version:
    3.0

Update conan package

Thanks for the great package! Could you please update the conan package to 3.1.0? Thanks!

[REQ]Handle cases where hardware_concurrency() returns 0

std::thread::hardware_concurrency should only be treated as a hint

C++'s std::thread::hardware_concurrency() is allowed to return 0 if it cannot compute the value for you. From a cursory reading of the code: if this were to happen, you will have an empty thread pool. It may be better to use a value of 1 in that case, so that you can still submit work.

ERROR: term does not evaluate to a function taking 2 arguments

says at:

 template <typename T1, typename T2, typename F>
    void parallelize_loop(const T1 &first_index, const T2 &index_after_last, const F &loop, ui32 num_blocks = 0)
    {
        typedef std::common_type_t<T1, T2> T;
        T the_first_index = (T)first_index;
        T last_index = (T)index_after_last;
        if (the_first_index == last_index)
            return;
        if (last_index < the_first_index)
        {
            T temp = last_index;
            last_index = the_first_index;
            the_first_index = temp;
        }
        last_index--;
        if (num_blocks == 0)
            num_blocks = thread_count;
        ui64 total_size = (ui64)(last_index - the_first_index + 1);
        ui64 block_size = (ui64)(total_size / num_blocks);
        if (block_size == 0)
        {
            block_size = 1;
            num_blocks = (ui32)total_size > 1 ? (ui32)total_size : 1;
        }
        std::atomic<ui32> blocks_running = 0;
        for (ui32 t = 0; t < num_blocks; t++)
        {
            T start = ((T)(t * block_size) + the_first_index);
            T end = (t == num_blocks - 1) ? last_index + 1 : ((T)((t + 1) * block_size) + the_first_index);
            blocks_running++;
            push_task([start, end, &loop, &blocks_running]
                      {
                          loop(start, end); //<---------------------------------------ERROR HERE
                          blocks_running--;
                      });
        }
        while (blocks_running != 0)
        {
            sleep_or_yield();
        }
    }

type of loop() is void(size_t, size_t), so T1 = size_t and T2 = size_t, can't see whats wrong

[BUG] VS 2022 Win32 crash (compiler bug ?)

Describe the bug

This simple code crashes with VS 2022 win32 target BUT it works with same compiler x64

image

Minimal working example

    thread_pool pool;

"A breakpoint statement (__debugbreak() statement or similar call) was executed in NelSon-gui.exe."

Behavior

Code works perfectly on x64 target but same crashs on win32 target

System information

  • CPU model, architecture, # of cores and threads: AMD 4800H
  • Operating system: Windows 10
  • Name and version of C++ compiler: VS 2022 Version 17.2.1
  • Full command used for compiling, including all compiler flags:
/permissive- /MP /ifcOutput "Release\" /GS /GL /analyze- /W3 /Gy /Zc:wchar_t /I"D:\Developpements\Github\nelson\modules/os_functions/src/include" /I"D:\Developpements\Github\nelson\modules/dynamic_link/src/include" /I"D:\Developpements\Github\nelson\modules/characters_encoding/src/include" /I"D:\Developpements\Github\nelson\modules/i18n/src/include" /I"D:\Developpements\Github\nelson\modules/interpreter/src/include" /I"D:\Developpements\Github\nelson\modules/types/src/include" /I"D:\Developpements\Github\nelson\modules/api_nelson/src/include" /I"D:\Developpements\Github\nelson\modules/stream_manager/src/include" /I"D:\Developpements\Github\nelson\modules/error_manager/src/include" /I"D:\Developpements\Github\nelson\modules/nelson_manager/src/include" /I"D:\Developpements\Github\nelson\../NelSon-thirdparty-Win32/Boost" /Zi /Gm- /O2 /sdl /Fd"Release\vc143.pdb" /Zc:inline /fp:precise /D "WIN32" /D "NDEBUG" /D "_WINDOWS" /D "_USRDLL" /D "NLSOS_FUNCTIONS_EXPORTS" /D "_WINDLL" /D "_UNICODE" /D "UNICODE" /errorReport:prompt /WX- /Zc:forScope /Gd /Oy- /Oi /MD /openmp /std:c++17 /FC /Fa"Release\" /EHsc /nologo /Fo"Release\" /Fp"Release\libnlsOs_functions.pch" /diagnostics:column 
  • Thread pool library version: github master

(Please note that only the latest version of the thread pool library is supported.)

Additional information

No sure but it must be more a compiler bug that code :(
Reported for feedback and eventualy a workaround

How to build using Microsoft Visual Studio 2019 ?

I have not written any code yet. Simply, I want to (first) build and run your thread_pool_test.cpp file using MSVS 2019 (lastest update 16.11.9)

Below, I'm listing the first few compiler errors, and I also provide the compiler switches that are set (and STD++17 is set).

Thanks in advance for help!

Manfred Sever.

The top of the compiler output (errors) is:
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(176,24): error C2039: 'scoped_lock': is not a member of 'std'
1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\queue(22): message : see declaration of 'std'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(172,10): message : This diagnostic occurred in the compiler generated function 'void thread_pool::push_task(const F &)'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,20): error C2039: 'scoped_lock': is not a member of 'std'
1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\queue(22): message : see declaration of 'std'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,1): error C4430: missing type specifier - int assumed. Note: C++ does not support default-int
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,32): error C2146: syntax error: missing ';' before identifier 'lock'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,32): error C2672: 'lock': no matching overloaded function found
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(78,1): error C2780: 'void std::lock(_Lock0 &,_Lock1 &,_LockN &...)': expects 3 arguments - 1 provided
1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\mutex(427): message : see declaration of 'std::lock'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,20): error C2039: 'scoped_lock': is not a member of 'std'
1>C:\Program Files (x86)\Microsoft Visual Studio\2019\Professional\VC\Tools\MSVC\14.29.30133\include\queue(22): message : see declaration of 'std'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,1): error C4430: missing type specifier - int assumed. Note: C++ does not support default-int
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,32): error C2146: syntax error: missing ';' before identifier 'lock'
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,32): error C2672: 'lock': no matching overloaded function found
1>C:\Users\MSever\source\repos\testThreadPool\testThreadPool\thread_pool.hpp(356,1): error C2780: 'void std::lock(_Lock0 &,_Lock1 &,_LockN &...)': expects 3 arguments - 1 provided

The compilter options I have set are:
/JMC /permissive- /ifcOutput "x64\Debug" /GS /W4 /Zc:wchar_t /ZI /Gm- /O2 /sdl /Fd"x64\Debug\vc142.pdb" /Zc:inline /fp:precise /D "_DEBUG" /D "_CONSOLE" /D "_UNICODE" /D "UNICODE" /errorReport:prompt /WX- /Zc:forScope /RTC1 /Gd /MDd /std:c++17 /FC /Fa"x64\Debug" /EHsc /nologo /Fo"x64\Debug" /Fp"x64\Debug\testThreadPool.pch" /diagnostics:column

Passing reference to other threads

Line 108

            push_task([&start, &end, &loop, &blocks_running] {

passes references to start and stop to other threads. These variables are then overwritten in the next iteration, and may even go out of scope when the function finishes. This is undefined behaviour. Instead, the thread should receive copies of these variables.

Compiling on Amazon Linux

Describe the bug

I am trying to compile my code which uses your thread_pool library. The code compiles and runs without any problems on my own PC (WSL2 and GCC). When I compile on Amazon Linux 2, I get the errors attached in the file, during compile time. The code and compile commands are exactly the same. Since your library is based on C++17, I wonder why there is a compile error. Any suggestions for the reasons for the errors?

Minimal working example

not yet.

Behavior

errors.txt

System information

  • CPU model, architecture, # of cores and threads: AWS EC2 t2.micro
  • Operating system: Amazon Linux 2
  • Name and version of C++ compiler: GCC
  • Full command used for compiling, including all compiler flags: -std=c++17 -march=native -O3 -pthread
  • Thread pool library version: 3.0.0

Additional information

same compile errors between versions 2 and 3.

[Question] Find Threads currently queued ?

More a question than a request. sorry if it is not the good place.

BS::thread_pool pool(2);
pool.push_task(task1);
pool.push_task(task2);
pool.push_task(task3);
pool.push_task(task4);
pool.push_task(task5);

2 threads are running but 3 are queued

How to know explicitly if a task is currently running or queued?

I would like to know at each instant which tasks are running or queued
something as pool.isRunning(task5) and pool.isQueued(task5)

How to detect the number of physic cores?

My computer has 4 physic/real cores and 4 virtual ones. The thread-pool auto-detects and initializes with 8 threads. However, my program runs some heavy computing tasks which are suitable to run on real cores only. 8 threads are significantly slower than 4 threads.

My question: is there any way to detect and set thread-pool to run with the number of physic/real cores only?

Avoiding multiple declaration error at compile time

First, thanks a lot for this library, it's really useful and easy to use !

My problem is that if I want to use the library in more than one .cpp file (aka I need to include the lib's hpp in more than one cpp file), I inevitably have a multiple declaration error at compile time because of the .hpp defining the functions and stuff.

(if the lib's hpp is included in a header (foo.h), there is as much multiple declaration as the foo.h is included)

Maybe i just miss the good use of the lib, but if not it could be usefull to have a cpp/h version that split definition and declaration to avoid this problem.

Regards

Add explicit to single-parameter constructors

Describe the new feature

Can we add explicit to single-parameter constructors to avoid unintended implicit construct?

Code example

    explicit thread_pool(const ui32 &_thread_count = std::thread::hardware_concurrency())
        : thread_count(_thread_count ? _thread_count : std::thread::hardware_concurrency()), threads(new std::thread[thread_count]);
// ...
    explicit synced_stream(std::ostream &_out_stream = std::cout);

Add Github CI running tests

A good practice is to run test on CI
each Platform officially supported could be tested.

No complaints, just a quality approach.
Thanks for this really good library

Compile error c2064

Hi, I got an error c2064 when merging your file with my program.

void IndicatorDRSI::Refresh(const int limit, const  int newBars, const  int startbar)
{
	doubleRsiLine->RefeshDRsi( my_symbol, limit, newBars, startbar);
}
void IndicatorDRSI::RefreshRate(const int limit, const int newBars, const int startbar)
{
	auto x = &IndicatorDRSI::Refresh;
	thread_pool pool(12);
	pool.push_task(x, limit, newBars, startbar);
}

Error C2064 term does not evaluate to a function taking 3 arguments doubleRsi line 154
Here line 154

  template <typename F, typename... A>
    void push_task(const F& task, const A &...args)
    {
        push_task([task, args...]
            { task(args...); });// line 154
    }	

I use Microsoft Visual Studio Community 2019. Version 16.9.4

Please help me,
Thank you very much
Jewel

[REQ] Thread busy loop optimization

In the void worker() function, when no task is available, it just calls sleep_or_yield(), which means all threads are frequently awaken and then go to sleep again (Like 1000 times per second). I think this is suboptimal, why not put a thread to sleep with condition variable?

Question on submitting tasks

Hi Barak,

Thanks for publishing this library. It is great and super simple to use.

I do have a question about submitting tasks. Do we have to use lambdas to submit?

Using the <thread> library, I can submit a member function along with parms like this:

FyxSNS fyxSNS; // my object

std::thread worker(&FyxSNS::listTopics, fyxSNS);    // calls fyxSNS.listTopics()
worker.join();

Instead, using your library, I am submitting the same method as

thread_pool t_pool;

t_pool.push_task([&fyxSNS](){fyxSNS.listTopics();});
t_pool.wait_for_tasks();

Is there a way to submit tasks without using a lambda expression?

Thank you in advance. Loving the simplicity and power of your thread pool!

is it possible to parallel compute nested loops ?

        BS::thread_pool pool(10);
        pool.parallelize_loop(0, cwts.eventOfEvents.at(0).size(),
                              [&cwts,&hist](const int a, const int b)
                              {
                                  for (int t = 0; t < cwts.eventOfEvents.size(); ++t)
                                  {

                                      for (int i = a; i < b; ++i)
                                      {

                                          hist->Fill(i, t, cwts.eventOfEvents[t][i]);
                                      }
                                  }
                              })
            .wait();

right now I'm doing this, which is attempting to read the inner arrays in parallel. Is it possible to make it do both loops ?

Thanks !

[TEST] thread_pool_test.cpp

System information

  • CPU model, architecture, # of cores and threads:
    11th Gen Intel® Core™ i7-11390H @ 3.40GHz × 8

  • Operating system:
    Ubuntu 20.04.4 LTS

  • Name and version of C++ compiler:
    GCC 9.4.0

  • Full command used for compiling, including all compiler flags:
    gcc -Wall -Wpedantic -Wextra -Wconversion -Weffc++ thread_pool_test.cpp

  • Thread pool library version:
    latest

Log file

http://codepad.org/uwXjtRzH

Memory usage?

It seems nice, I noticed the use of std::shared_ptr. Can it avoid calls to memory allocations?

[Question] How to remove a queued thread not running

Assume 4 threads running and 4 other threads queuing.
To stop the execution of one running thread, it is possible to define a variable in the execution function.
But how to remove/cancel a queued thread which is not yet running.

my current workaround is to wait that thread comes running and to stop/cancel.
Any others better way will be appreciate :)

use pool in class/struct

I'd like to use thread_pool as a static struct/class member, for a test demo code like

//bs_thread_pool.cpp

#include <algorithm> // std::min, std::min_element, std::sort, std::unique
#include <atomic>    // std::atomic
#include <chrono>    // std::chrono
#include <cmath>     // std::abs, std::llround, std::round, std::sqrt
#include <ctime>     // std::localtime, std::strftime, std::time_t
#include <exception> // std::exception
#include <fstream>   // std::ofstream
#include <future>    // std::future
#include <iomanip>   // std::setprecision, std::setw
#include <ios>       // std::fixed
#include <iostream>  // std::cout
#include <limits>    // std::numeric_limits
#include <random>    // std::mt19937_64, std::random_device, std::uniform_int_distribution, std::uniform_real_distribution
#include <stdexcept> // std::runtime_error
#include <string>    // std::string, std::to_string
#include <thread>    // std::this_thread, std::thread
#include <utility>   // std::pair
#include <vector>    // std::begin, std::end, std::vector

// Include the header file for the thread pool library.
#include "BS_thread_pool.hpp"

struct test{
    static BS::thread_pool pool;
    static void testfunc(){
        int squares[100];
        pool.parallelize_loop(0, 100,
                            [&squares](const int a, const int b)
                            {
                                for (int i = a; i < b; ++i)
                                    squares[i] = i * i;
                            })
            .wait();
    }
};

int main(){
    test::testfunc();
    return 0;
}

compile with g++ bs_thread_pool.cpp -std=c++17 -pthread -o bs_thread_pool.cpp.o
I got errors like

/tmp/cc6he5hB.o: In function `test::testfunc()':
bs_thread_pool.cpp:(.text._ZN4test8testfuncEv[_ZN4test8testfuncEv]+0x3f): undefined reference to `test::pool'
collect2: error: ld returned 1 exit status

the testfunc() in my code must be static so I cannot let pool be no-static. It seems that the declaration and instantiation of pool are bound. And I think the error is independent with OS or g++/clang++ version. Do you have any idea about this situation? Maybe I made some stupid mistake
the thread_pool version is the latest 3.0.0

parallelize_loop contains std::cout

Problem: The function parallelize loop of the thread_pool contains a std::cout instructions at line 106. This causes unwanted prints on the console.

    void parallelize_loop(T first_index, T last_index, const F& loop, ui32 num_tasks = 0)
    {
        if (num_tasks==0)
            num_tasks = thread_count;
        if (last_index<first_index)
            std::swap(last_index, first_index);
        size_t total_size = last_index-first_index+1;
        size_t block_size = total_size/num_tasks;
        if (block_size==0) {
            block_size = 1;
            num_tasks = std::max((ui32) 1, (ui32) total_size);
        }
        std::atomic<ui32> blocks_running = 0;
        for (ui32 t = 0; t<num_tasks; t++) {
            T start = (T) (t*block_size+first_index);
            T end = (t==num_tasks-1) ? last_index : (T) ((t+1)*block_size+first_index-1);
            std::cout << start << '-' << end << '\n';  // < -- HERE IT IS :)
            blocks_running++;
            push_task([&start, &end, &loop, &blocks_running] {
                for (T i = start; i<=end; i++)
                    loop(i);
                blocks_running--;
            });
            while (blocks_running!=0) {
                std::this_thread::yield();
            }
        }
    }

[BUG] wait_for_tasks() Segmentation Faults in Conjunction with submit()

Describe the bug

Extended usage of the thread_pool library causes odd quirks. Particularly, the call wait_for_tasks() crashes when paired with the submit() and storing into a std::vector of std::future(s). I have not been able to replicate beyond the fact it occurs in the same test. I have tried replicating the issue to no avail with similar calls. Unless an edge case has been missed in my unit tests (likely, but probably not), the call itself crashes as the function passes through without issue with similar parameter specifications.

Minimal working example

Short of giving you my source code for my project, there is not much. Submit some tasks with the std::future(s) stuffed into a std::vector. Call wait_for_tasks(). Then I have a section of code for accessing the values in the futures.

Behavior

Conjoined usage of the wait_for_tasks() and submit().

System information

  • CPU model, architecture, # of cores and threads: i7-10875H x 16
  • Operating system: Ubuntu 21.10
  • Name and version of C++ compiler: gcc 11.2
  • Full command used for compiling, including all compiler flags: Bazel's -c opt and C++20 specified
  • Thread pool library version: latest

(Please note that only the latest version of the thread pool library is supported.)

Additional information

I did some digging into the source code. There are a lot of C-Style casts amongst other bits and pieces that lead to undefined behavior. At one point, I went so far as to strip out everything but the code itself to see what was going on (lots of noise otherwise). No luck.

My best guess is either a cast is failing (unlikely, but maybe) or a race condition exists within the logic of the submit() and/or wait_for_tasks().

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.