Code Monkey home page Code Monkey logo

leopard's Introduction

C++20 GitHub tag (latest by date) Conan Center

ThreadPool Leopard

This is a light-weight C++ Thread Pool that allows running any callable similar to std::async interface. It has both global and local queues and employs a work-stealing algorithm.

ThreadPool API Reference

Member Function
Description
Parameters
Code Sample
Constructor Conditioner struct represents (is a wrapper around) a single executable function. See ThreadPool.h. It is used in ThreadPool to specify user custom initializers and cleanups per thread.

ThreadPool has only one constructor with all default values.
Conditioner(s) are a handy interface, if threads need to be initialized before doing anything. And/or they need a cleanup before exiting. For example, see Windows CoInitializeEx function in COM library. See conditioner_test() in thrpool_tester.cc file for code sample
thr_num: Number of initial threads -- defaulted to number of cores in the computer.
pre_conditioner: A function that will execute at the start of each thread in the pool once
post_conditioner: A function that will execute at the end of each thread in the pool once
https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L383
set_timeout() It enables/disables timeouts for threads. If timeout is enabled and a thread idles more than timeout period, it will terminate timeout_flag: Boolean flag to enable/disable timeout mechanism
timeout_time: time_t value to specify the timeout period in seconds -- defaulted to 30 minutes
https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L94
dispatch() It dispatches a task into the thread-pool queue. If a thread is available the task will run. If no thread is available, the task will be added to a queue until a thread becomes available. Dispatch interface is identical tostd::async() with one exception.
dispatch() returns a std::future of the type of your callable return type
immediately: A boolean flag, If true and no thread is available, a thread will be added to the pool and the task runs immediately.
routine: A callable reference
args ...: A variadic list of parameters matching your callable parameter list
https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L209
parallel_loop() It parallelizes a big class of problems very conveniently. It divides the data elements between begin and end to n blocks by dividing by number of capacity threads. It dispatches the n tasks.
parallel_loop() returns a std::vector of std::future corresponding to the above n tasks.
begin: An iterator to mark the beginning of the sequence. It can either be proper a iterator or integral type index
end: An iterator to mark the end of the sequence. It can either be proper a iterator or integral type index
routine: A reference to a callable
args ...: A variadic list of parameters matching your callable parameter list
https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L272
parallel_sort() It uses a parallel version of 3-way quick sort.
This version calls the quick sort with std::less as comparison functor
begin: An iterator to mark the beginning of the sequence.
end: An iterator to mark the end of the sequence
TH (template param):A threshold value below which a serialize sort will be used, defaulted to 500,000
https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L421
parallel_sort() It uses a parallel version of 3-way quick sort.
This version requires a comparison functor
begin: An iterator to mark the beginning of the sequence.
end: An iterator to mark the end of the sequence
compare: Comparison functor
TH (template param):A threshold value below which a serialize sort will be used, defaulted to 500,000
https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L421
attach() It attaches the current thread to the pool so that it may be used for executing submitted tasks. It blocks the calling thread until the pool is shutdown or the thread is timed-out. This is handy, if you already have thread(s), and want to repurpose them this_thr: The parameter is a rvalue reference to the std::thread instance of the calling thread https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L319
run_task() If the pool is not shutdown and there is a pending task in the queue, it runs it on the calling thread synchronously. It returns_true_, if a task was executed, otherwise false. The return value of the task could be obtained from the original future object when it was dispatched.
NOTE: A false return from run_task() does not necessarily mean there were no tasks in thread pool queue. It might be that run_task() just encountered one of the thread pool internal maintenance tasks which it ignored and returned false.
https://github.com/hosseinmoein/Leopard/blob/main/test/par_sort_tester.cc#L51
add_thread() At any point you can add/subtract threads to/from the pool thr_num: Number of threads to be added/subtracted. It can either be a positive or negative number https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L94
available_threads() At any point you can query the ThreadPool for available threads. https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L94
capacity_threads() At any point you can query the ThreadPool for capacity threads. https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L94
pending_tasks() At any point you can query the ThreadPool for number of tasks currently waiting in the_global_ queue https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L94
shutdown() At any point you can callshutdown() to signal the ThreadPool to terminate all threads after they are done running tasks. After shutdown, you cannot dispatch or add threads anymore -- exception will be thrown. https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L232
is_shutdown() It returns_true_ if the pool is shutdown, otherwise false https://github.com/hosseinmoein/Leopard/blob/main/test/thrpool_tester.cc#L94
Destructor It callsshutdown() and waits until all threads are done running tasks N/A

Code Sample

static void parallel_loop_test()  {

    std::cout << "Running parallel_loop_test() ..." << std::endl;

    constexpr std::size_t       n { 10003 };
    constexpr std::size_t       the_sum { (n * (n + 1)) / 2 };
    std::vector<std::size_t>    vec (n);

    std::iota(vec.begin(), vec.end(), 1);

    auto        func =
        [](const auto &begin, const auto &end) -> std::size_t  {
            std::size_t sum { 0 };

            for (auto iter = begin; iter != end; ++iter)
                sum += *iter;
            return (sum);
        };
    ThreadPool  thr_pool { 5 };  // Thread pool with 5 threads.
    // First we do it using iterators
    //
    auto        futs = thr_pool.parallel_loop(vec.cbegin(), vec.cend(), func);

    std::size_t result {0};

    for (auto &fut : futs)
        result += fut.get();
    assert(result == the_sum);

    // Now do the same thing, this time with integer indices
    //
    auto    func2 =
        [](auto begin, auto end, const auto &vec) -> std::size_t  {
            std::size_t sum { 0 };

            for (auto i = begin; i != end; ++i)
                sum += vec[i];
            return (sum);
        };
    auto    futs2 = thr_pool.parallel_loop(std::size_t(0), n, func2, vec);

    result = 0;
    for (auto &fut : futs2)
        result += fut.get();
    assert(result == the_sum);
}

// ----------------------------------------------------------------------------

struct   MyClass  {
    bool routine(std::size_t i)  {
        std::cout << "From routine() " << i << "\n";
        return (true);
    }
};

// ----------------------------------------------------------------------------

std::string
my_func(std::size_t i, double d, const std::string &s)  {
    std::cout << "my_func(): " << i << ", " << d << ", " << s << '\n';
    return (s + "_ABC");
}

// ----------------------------------------------------------------------------

int main (int, char *[])  {
    MyClass my_obj;

    // Start off with 5 threads in reserve in the pool
    // "timeout_flag" is true by default
    // "timeout_time" is set to half an hour for idle threads by default
    //
    ThreadPoolType  thr_pool (5);

    thr_pool.add_thread (2);  // Add 2 threads to the pool
    thr_pool.add_thread (-3); // Terminate 3 threads in the pool

    // Dispatch a member function
    // "immediately" flag is set to false
    //
    auto    my_class_fut = thr_pool.dispatch(false, &MyClass::routine, &my_obj, 5555);
    std::cout << "MyClass Future result: " << my_class_fut.get() << std::endl;

    // Dispatch a lambda
    // "immediately" flag is set to true
    //
    auto    my_lambda_fut = thr_pool.dispatch(true,
                                              [](std::size_t i) -> std::size_t { return (i * i); },
                                              10);
    std::cout << "Lambda result: " << my_lambda_fut.get() << std::endl;

    // Dispatch a function
    // "immediately" flag is set to false
    //
    const std::string   str = "XXXX";
    auto                my_func_fut = thr_pool.dispatch(false, my_func, 5555, 0.555, std::cref(str));
    std::cout << "my_func Future result: " << my_func_fut.get() << std::endl;

    std::cout << "Available threads: " << thr_pool.available_threads() << std::endl;
    std::cout << "Capacity threads: " << thr_pool.capacity_threads() << std::endl;

    // You don't have to call shutdown() necessarily, but you could.
    // It is called by the destructor.
    //
    thr_pool.shutdown();
    return (EXIT_SUCCESS);
}

leopard's People

Contributors

hosseinmoein avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.