Advanced Thread Management

Author: jay@thoughtmachina.com

Modern C++ makes it surprisingly easy to spawn thousands of threads, but managing them efficiently and safely is a different challenge entirely. Real applications need structured shutdown, cooperative cancellation, clean error propagation, and a way to package user code so workers can execute it uniformly. In this article, we build a compact yet fully featured thread pool from first principles, unfolding each component step-by-step: a common task interface, a packaged_task wrapper, future based result retrieval, work distribution, and finally a cooperative interruption mechanism via interrupt_token.

A Simple Thread Pool

Before introducing interruption or futures, we start with the simplest building block: a thread-safe queue that supports blocking retrieval and clean shutdown. The queue stores pending work items in a std::deque<Task> and uses a mutex plus condition variable to synchronize access. Worker threads call get(), which sleeps until either a task becomes available or shutdown is requested. Once shutdown is triggered, get() returns nullptr, allowing workers to exit without races or dangling threads.


The Task Queue - Safe Communication Between Threads

The task queue is a classic producer-consumer structure. Producers submit tasks; worker threads consume them. Access is synchronized using a mutex and condition variable.

using Task = std::function<void()>;

class task_queue {
public:
    void submit(Task&& task) {
        std::unique_lock<std::mutex> lock(_mutex);
        if (_shutdown_requested.load()) {
            throw std::runtime_error("Can not submit a task after shutdown");
        }
        _task_list.push_back(std::move(task));
        _cv.notify_all();
    }

    std::unique_ptr<Task> get() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] {
            return !_task_list.empty() || _shutdown_requested.load();
        });

        if (_shutdown_requested.load()) {
            return nullptr;
        }

        std::unique_ptr<Task> task =
            std::make_unique<Task>(std::move(_task_list.front()));
        _task_list.pop_front();
        return task;
    }

    void shutdown() {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutdown_requested.store(true);
        _cv.notify_all();
    }

private:
    std::mutex _mutex;
    std::atomic<bool> _shutdown_requested = false;
    std::condition_variable _cv;
    std::deque<Task> _task_list;
};

How it works

The queue holds tasks in a std::deque<Task> and protects it with a mutex. Worker threads call get(), which blocks on a condition variable until either:

When shutdown occurs, get() returns nullptr, signaling the workers to exit.

This cleanly stops the pool without race conditions or dangling threads.


The Thread Pool - Spawning Workers and Running Tasks

The thread_pool class launches a fixed number of worker threads. Each worker repeatedly fetches a task from the queue and executes it.

class thread_pool {
public:
    thread_pool(int count) : _count(count) {
        for (int i = 0; i < count; i++) {
            _threads.push_back(std::thread(&thread_pool::_worker_loop, this));
        }
    }

    template<class Callable, class ...Args>
    void submit(Callable&& callable, Args&& ...args) {
        Task function = std::bind(std::forward<Callable>(callable),
                                  std::forward<Args>(args)...);
        _queue.submit(std::move(function));
    }

    void shutdown() {
        _queue.shutdown();
    }

    ~thread_pool() {
        try {
            for (std::thread& t : _threads) {
                if (t.joinable()) {
                    t.join();
                }
            }
        } catch (...) {
            _queue.shutdown();
            throw;
        }
    }

private:
    void _worker_loop() {
        while (true) {
            std::unique_ptr<Task> task = _queue.get();
            if (task == nullptr) {
                break;
            }
            (*task)();
        }
    }

private:
    int _count;
    std::vector<std::thread> _threads;
    task_queue _queue;
};

How it works

This design ensures that tasks submitted before shutdown finish executing, while preventing new tasks from being added during shutdown.


Using the Thread Pool

Here is a minimal example of how it would be used:

int main() {
    thread_pool pool(4);
    for (int i = 0; i < 10; i++) {
        pool.submit([i] {
            std::cout << "Task " << i << " running\n";
        });
    }
    pool.shutdown();
}

Each submitted task is executed by the next available worker thread. Once shutdown is called, workers drain the queue and exit cleanly.

Waiting on Tasks

C++ provides tools like std::future, std::promise, and std::packaged_task for asynchronous programming. Understanding their internal mechanisms helps in building custom concurrency utilities. Below is a minimal implementation of a future/packaged_task system, coupled with a simple thread pool, showing how tasks can be queued, executed, and results safely retrieved.


task_state<T> - The Shared State

template <typename T> class task_state {
public:
    void post_completion(T&& result) {
        std::unique_lock<std::mutex> lock(_mutex);
        _completed = true;
        _result = std::move(result);
        _cv.notify_all();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] { return _completed; });
    }

    T& get_result() { return _result; }

private:
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _completed = false;
    T _result;
};

This class stores the result of a task and provides safe synchronization. post_completion sets the result and notifies all waiting threads. Consumers call wait() to block until the result is ready and then use get_result() to retrieve it.


future<T> - The Public Handle

template <typename T> class future {
public:
    future(std::shared_ptr<task_state<T>> state) : _state(state) {}

    T& get() {
        _state.get()->wait();
        return _state.get()->get_result();
    }

private:
    std::shared_ptr<task_state<T>> _state;
};

A future is the handle through which users retrieve results. It does not perform computation but waits for the associated task_state to be completed.


template_task - Wrapping Callables

class task_base {
public:
    virtual ~task_base() = default;
    virtual void operator()() = 0;
};

template <class Callable, class... Args>
class template_task : public task_base {
public:
    using T = std::invoke_result_t<Callable, Args...>;
    template_task(Callable&& callable, Args&&... args)
        : _state(new task_state<T>()) {
        _function = std::bind(std::forward<Callable>(callable),
                              std::forward<Args>(args)...);
    }

    void operator()() override {
        _state.get()->post_completion(std::move(_function()));
    }

    future<T> get_future() { return future<T>(_state); }

    ~template_task() override = default;

private:
    std::function<T()> _function;
    std::shared_ptr<task_state<T>> _state;
};

template_task stores a callable and its arguments. When executed, it calls the function and posts the result to the shared state, enabling the future to access it.


task_queue - Holding Pending Work

using packaged_task = std::unique_ptr<task_base>;

class task_queue {
public:
    void submit(packaged_task&& task) {
        std::unique_lock<std::mutex> lock(_mutex);
        if (_shutdown_requested.load()) {
            throw std::runtime_error("Can not submit a task after shutdown");
        }
        _task_list.push_back(std::move(task));
        _cv.notify_all();
    }

    packaged_task get() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] {
            return !_task_list.empty() || _shutdown_requested.load();
        });
        if (_shutdown_requested.load()) return nullptr;
        packaged_task task = std::move(_task_list.front());
        _task_list.pop_front();
        return task;
    }

    void shutdown() {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutdown_requested.store(true);
        _cv.notify_all();
    }

private:
    std::mutex _mutex;
    std::atomic<bool> _shutdown_requested = false;
    std::condition_variable _cv;
    std::deque<packaged_task> _task_list;
};

The task_queue allows threads to safely submit and retrieve tasks. The condition variable lets worker threads sleep until new work arrives. The shutdown mechanism ensures no tasks are added after the pool is being destroyed.


thread_pool - Running Tasks Concurrently

class thread_pool {
public:
    thread_pool(int count) : _count(count) {
        for (int i = 0; i < count; i++) {
            _threads.push_back(std::thread(&thread_pool::_worker_loop, this));
        }
    }

    template <class Callable, class... Args>
    auto submit(Callable&& callable, Args&&... args)
        -> future<std::invoke_result_t<Callable, Args...>> {
        using T = std::invoke_result_t<Callable, Args...>;
        auto fn_ptr = new template_task(std::forward<Callable>(callable),
                                        std::forward<Args>(args)...);
        future<T> fut = fn_ptr->get_future();
        packaged_task function = std::unique_ptr<task_base>(fn_ptr);
        _queue.submit(std::move(function));
        return fut;
    }

    void shutdown() { _queue.shutdown(); }

    ~thread_pool() {
        for (auto& t : _threads) {
            if (t.joinable()) t.join();
        }
    }

private:
    void _worker_loop() {
        while (true) {
            packaged_task task = _queue.get();
            if (!task) break; // Shutdown requested
            (*task)(); // Execute the task
        }
    }

private:
    int _count;
    std::vector<std::thread> _threads;
    task_queue _queue;
};

Explanation of Key Functions:

  1. Constructor (thread_pool(int count))
    • Spawns count threads.
    • Each thread immediately begins executing _worker_loop, waiting for tasks to arrive in the queue.
  2. submit
    • Wraps any callable into a template_task and retrieves its future.
    • The task is then submitted to the queue.
    • Returns the future so the caller can wait for the result asynchronously.
  3. _worker_loop
    • Each worker continuously fetches tasks from the queue.
    • If the queue is empty, the worker thread blocks on the condition variable.
    • On shutdown, get() returns nullptr, signaling the worker to exit the loop.
    • Otherwise, it executes the task by calling operator().
  4. shutdown and Destructor
    • Marks the queue as shutdown, waking up all waiting threads.
    • Destructor joins all threads to ensure graceful termination.
    • Any tasks submitted after shutdown will throw an exception.

Thread Pool Overview:


Example Usage

class function {
public:
    int operator()(int x) { return x + 10; }
};

int main() {
    thread_pool pool(2);

    auto fut = pool.submit(function(), 32);

    std::cout << "Result: " << fut.get() << std::endl;

    pool.shutdown();
}

Reducing contention

Ok, now let us see how the threadpool performs under high contention:

struct number {
    int operator()() { return 1; }
};

int main() {

    int task_count_per_thread = 1000;
    int submission_thread_count = 1000;

    int count = 0;

    thread_pool pool(16);

    std::vector<std::thread> submission_threads;
    std::vector<int> local_results(submission_thread_count);
    for (int x = 0; x < submission_thread_count; x++) {
        std::thread t([&pool, task_count_per_thread, &local_results, x]() {
            int count = 0;
            std::deque<future<int>> futures;
            for (int i = 0; i < task_count_per_thread; i++) {
                future<int> res = pool.submit(number());
                futures.push_back(std::move(res));
            }
            for (future<int>& fut : futures) {
                count += fut.get();
            }
            local_results[x] = count;
        });
        submission_threads.push_back(std::move(t));
    }

    std::cout << "Done submitting tasks" << std::endl;
    for (std::thread& t : submission_threads) {
        t.join();
    }

    for (int x : local_results) {
        count += x;
    }

    std::cout << count << std::endl;
    pool.shutdown();
}

Let us compare the following 2 scenarios:

    int task_count_per_thread = 1000;
    int submission_thread_count = 2000;
jay@jayport:~/code/concurrency $ time ./x
Done submitting tasks
2000000

real    0m4.653s
user    0m5.570s
sys     0m30.235s

Let us expect more contention now:

    int task_count_per_thread = 2000;
    int submission_thread_count = 1000;
jay@jayport:~/code/concurrency $ time ./x
Done submitting tasks
2000000

real    0m6.064s
user    0m4.285s
sys     0m42.097s

Reducing Contention with Per-Worker Queues

In high-performance thread pools, contention on a single shared queue can become a bottleneck. Every submit() and get() operation requires locking the same mutex, which slows down task submission and execution when many threads are involved.

One way to reduce this contention is per-worker queues: each worker thread maintains its own private queue, and tasks are distributed across these queues. Workers first try to fetch tasks from their local queue. Only if their queue is empty do they access the central queue, reducing the frequency of global lock acquisition.

Here is how we implemented it in our thread pool:

std::vector<task_queue> _worker_queues;
size_t _approx_submission_index = 0;

template <class Callable, class... Args>
auto submit(Callable&& callable, Args&&... args)
    -> future<std::invoke_result_t<Callable, Args...>> {

    auto fn_ptr = new template_task(std::forward<Callable>(callable),
                                    std::forward<Args>(args)...);
    future<T> fut = fn_ptr->get_future();
    packaged_task function = std::unique_ptr<task_base>(fn_ptr);

    int wrap_index = _count + 1;
    int index = ++_approx_submission_index;

    if (index % wrap_index == 0) {
        _queue.submit(std::move(function)); // occasional fallback to central queue
    } else {
        index = (index - 1) % _count;
        _worker_queues[index].submit(std::move(function)); // local queue
    }
    return fut;
}

How it works:

  1. _worker_queues stores a queue for each worker thread.
  2. submit() distributes tasks in a round-robin manner, with most tasks going to worker-local queues.
  3. The central queue _queue is only used occasionally, preventing all threads from contending on a single lock.
  4. Worker threads check their local queue first, and only poll the central queue when idle.

This simple adjustment drastically reduces lock contention under heavy load, which is critical when thousands of tasks are submitted concurrently.


Worker Loop with Timeout:

To support this design, workers fetch tasks from their local queue and only occasionally from the central queue, using a short timeout to avoid busy waiting:

void _worker_loop(int my_index) {
    auto& my_queue = _worker_queues[my_index];
    std::chrono::microseconds timeout(100);
    bool done = false;

    while (!done) {
        packaged_task task;
        if (my_queue.estimated_size() == 0) {
            task = _queue.get(timeout); // central queue
        } else {
            task = my_queue.get(timeout); // local queue
        }

        switch (task.get()->kind()) {
        case task_kind::normal:
            task.get()->operator()();
            break;
        case task_kind::exit:
            done = true;
            break;
        case task_kind::timeout:
        default:
            // nothing to do
            break;
        }
    }
}

By giving each worker its own queue and using timed polling for the central queue, threads spend less time contending for a single mutex. This reduces sys time spikes seen in highly concurrent submissions.

Here is the complete code:

template <typename T> class task_state {
public:
    void post_completion(T&& result) {
        std::unique_lock<std::mutex> lock(_mutex);
        _completed = true;
        _result = std::move(result);
        _cv.notify_all();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] { return _completed; });
    }

    T& get_result() { return _result; }

private:
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _completed = false;
    T _result;
};

template <typename T> class future {
public:
    future(std::shared_ptr<task_state<T>> state) : _state(state) {}

    T& get() {
        _state.get()->wait();
        return _state.get()->get_result();
    }

private:
    std::shared_ptr<task_state<T>> _state;
};

enum class task_kind { normal, exit, timeout };

class task_base {
public:
    virtual ~task_base() = default;
    virtual void operator()() = 0;
    virtual task_kind kind() = 0;
};

class exit_task : public task_base {
    void operator()() override {}
    task_kind kind() override { return task_kind::exit; }
};

class timeout_task : public task_base {
    void operator()() override {}
    task_kind kind() override { return task_kind::timeout; }
};

template <class Callable, class... Args>
class template_task : public task_base {
public:
    using T = std::invoke_result_t<Callable, Args...>;
    template_task(Callable&& callable, Args&&... args)
        : _state(new task_state<T>()) {
        _function = std::bind(std::forward<Callable>(callable),
                              std::forward<Args>(args)...);
    }

    void operator()() override {
        _state.get()->post_completion(std::move(_function()));
    }

    future<T> get_future() { return future<T>(_state); }

    ~template_task() override = default;

    task_kind kind() override { return task_kind::normal; }

private:
    std::function<T()> _function;
    std::shared_ptr<task_state<T>> _state;
};

using packaged_task = std::unique_ptr<task_base>;

class task_queue {

    packaged_task make_exit() { return std::make_unique<exit_task>(); }

    packaged_task make_timeout() { return std::make_unique<timeout_task>(); }

public:
    void submit(packaged_task&& task) {
        std::unique_lock<std::mutex> lock(_mutex);
        if (_shutdown_requested.load()) {
            throw std::runtime_error("Can not submit a task after shutdown");
        }
        _task_list.push_back(std::move(task));
        _cv.notify_all();
    }

    packaged_task get() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] {
            return _task_list.size() > 0 || _shutdown_requested.load();
        });
        if (_shutdown_requested.load()) {
            return make_exit();
        }
        packaged_task task = std::move(_task_list.front());
        _task_list.pop_front();
        return task;
    }

    template <class Rep, class Period>
    packaged_task get(std::chrono::duration<Rep, Period> duration) {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait_for(lock, duration, [this] {
            return _task_list.size() > 0 || _shutdown_requested.load();
        });
        if (_shutdown_requested.load()) {
            return make_exit();
        }
        if (_task_list.size() == 0) {
            return make_timeout();
        }
        packaged_task task = std::move(_task_list.front());
        _task_list.pop_front();
        return task;
    }

    int estimated_size() { return _task_list.size(); }

    void shutdown() {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutdown_requested.store(true);
        _cv.notify_all();
    }

private:
    std::mutex _mutex;
    std::atomic<bool> _shutdown_requested = false;
    std::condition_variable _cv;
    std::deque<packaged_task> _task_list;
};

class thread_pool {

public:
    thread_pool(int count) : _count(count), _worker_queues(count) {
        for (int i = 0; i < count; i++) {
            _threads.push_back(
                std::thread(&thread_pool::_worker_loop, this, i));
        }
    }

    template <class Callable, class... Args>
    auto submit(Callable&& callable, Args&&... args)
        -> future<std::invoke_result_t<Callable, Args...>> {
        using T = std::invoke_result_t<Callable, Args...>;
        auto fn_ptr = new template_task(std::forward<Callable>(callable),
                                        std::forward<Args>(args)...);
        future<T> fut = fn_ptr->get_future();
        packaged_task function = std::unique_ptr<task_base>(fn_ptr);

        int wrap_index = _count + 1;

        int index = ++_approx_submission_index;
        if (index % wrap_index == 0) {
            _queue.submit(std::move(function));
        } else {
            index = (index - 1) % _count;
            _worker_queues[index].submit(std::move(function));
        }
        return fut;
    }

    void shutdown() { _queue.shutdown(); }

    ~thread_pool() {
        try {
            for (std::thread& t : _threads) {
                if (t.joinable()) {
                    t.join();
                }
            }
        } catch (const std::exception& e) {
            _queue.shutdown();
            std::cerr << e.what() << std::endl;
            throw e;
        }
    }

private:
    void _worker_loop(int my_index) {
        auto& my_queue = _worker_queues[my_index];
        std::chrono::microseconds timeout(100);
        bool done = false;

        while (!done) {
            packaged_task task;
            if (my_queue.estimated_size() == 0) {
                task = _queue.get(timeout);
            } else {
                task = my_queue.get(timeout);
            }

            switch (task.get()->kind()) {
            case task_kind::normal:
                task.get()->operator()();
                break;
            case task_kind::exit:
                done = true;
                break;
            case task_kind::timeout:
            default:
                // do nothing
                break;
            }
        }
    }

private:
    int _count;
    std::vector<std::thread> _threads;
    task_queue _queue;
    std::vector<task_queue> _worker_queues;
    size_t _approx_submission_index = 0;
};

Let us compare the following 2 scenarios (after making per-worker queues):

    int task_count_per_thread = 1000;
    int submission_thread_count = 2000;
jay@jayport:~/code/concurrency $ time ./low_1k_2k 
Done submitting tasks
2000000

real    0m2.008s
user    0m8.101s
sys     0m6.845s

And the second scenario:

    int task_count_per_thread = 2000;
    int submission_thread_count = 1000;
jay@jayport:~/code/concurrency $ time ./low_2k_1k 
Done submitting tasks
2000000

real    0m2.088s
user    0m8.966s
sys     0m6.451s

Notice that the real time is now roughly 2 seconds, significantly faster than the previous implementation with a single global queue, which took 4–6 seconds. The sys time (time spent in the kernel handling locks) has. dropped dramatically.

Stealing Work

Now, let us try to simulate an even work distribution in our worker-local queues (and let us have each task be non-trivial - aka sleep for 100ms lol). Ok, we now have a recipe for a scenario where some workers have a lot of work.

    template <class Callable, class... Args>
    auto submit(Callable&& callable, Args&&... args)
        -> future<std::invoke_result_t<Callable, Args...>> {
        using T = std::invoke_result_t<Callable, Args...>;
        auto fn_ptr = new template_task(std::forward<Callable>(callable),
                                        std::forward<Args>(args)...);
        future<T> fut = fn_ptr->get_future();
        packaged_task function = std::unique_ptr<task_base>(fn_ptr);

        int wrap_index = _count + 1;

        // what if we had an uneven work distribution?
        std::random_device rd;
        std::mt19937 gen(rd());
        std::poisson_distribution<> d(9);

        int index = d(gen);
        if (index % wrap_index == 0) {
            _queue.submit(std::move(function));
        } else {
            index = (index - 1) % _count;
            _worker_queues[index].submit(std::move(function));
        }
        return fut;
    }

Here, we have std::poisson_distribution<> to create a non-uniformly distributed set of numbers. We will also update our task to do a bit more work than simply returning a number.

struct number {
    int operator()() {
        std::this_thread::sleep_for(std::chrono::microseconds(100));
        return 1;
    }
};

And the runner code:

int main() {

    int task_count_per_thread = 200000;
    int submission_thread_count = 10;

    int count = 0;

    thread_pool pool(16);

    std::vector<std::thread> submission_threads;
    std::vector<int> local_results(submission_thread_count);
    for (int x = 0; x < submission_thread_count; x++) {
        std::thread t([&pool, task_count_per_thread, &local_results, x]() {
            int count = 0;
            std::deque<future<int>> futures;
            for (int i = 0; i < task_count_per_thread; i++) {
                future<int> res = pool.submit(number());
                futures.push_back(std::move(res));
            }
            for (future<int>& fut : futures) {
                count += fut.get();
            }
            local_results[x] = count;
        });
        submission_threads.push_back(std::move(t));
    }

    std::cout << "Done submitting tasks" << std::endl;
    for (std::thread& t : submission_threads) {
        t.join();
    }

    for (int x : local_results) {
        count += x;
    }

    pool.shutdown();

    std::cout << count << std::endl;
    std::cout << "Local Counts: ";
    for (int c : pool._local_count) {
        std::cout << c << ", ";
    }
    std::cout << std::endl;
}

Here Local Counts are the number of tasks executed on each thread.

Let us see the results, shall we?

jay@jayport:~/code/concurrency $ time ./uneven 
Done submitting tasks
2000000
Local Counts: 14259, 15958, 32850, 69216, 121753, 182369, 234013, 262886, 263429, 237218, 195348, 144857, 100608, 64890, 38590, 21756, 

real    0m34.952s
user    0m14.940s
sys     0m13.846s

Clearly, the uneven distribution experiment produced exactly what we expected: task starvation on some workers and work piles on others.

So let us fix that. The classic solution is work stealing: If a worker runs out of tasks in its own local queue, it should start checking other queues - including the global one - and “steal” tasks when possible.

Here is the new worker loop:

void _worker_loop(int my_index) {
    auto& my_queue = _worker_queues[my_index];
    std::chrono::microseconds timeout(100);
    bool done = false;

    size_t index_ = my_index;
    auto index = [&index_, this]() -> size_t {
        return index_++ % (_count + 1);
    };

    while (!done) {
    start:
        packaged_task task;
        if (my_queue.estimated_size() == 0) {
            size_t queue_index = index();
            if (queue_index == _count) {
                if (_queue.estimated_size() > 0) {
                    task = _queue.get(timeout);
                } else {
                    goto start;
                }
            } else {
                if (_worker_queues[queue_index].estimated_size() > 0) {
                    task = _worker_queues[queue_index].get(timeout);
                } else {
                    goto start;
                }
            }
        } else {
            task = my_queue.get(timeout);
        }

        switch (task.get()->kind()) {
        case task_kind::normal:
            task.get()->operator()();
            _local_count[my_index]++;
            break;
        case task_kind::exit:
            done = true;
            break;
        case task_kind::timeout:
        default:
            // do nothing
            break;
        }
    }
}

Each worker maintains its own local queue, and under normal conditions it simply pulls tasks from there. However, when its queue becomes empty, the worker does not just sit idle. Instead, it begins scanning the other worker queues in a round-robin pattern, incrementing an index that wraps around all local queues as well as the global queue. If it finds a worker queue with pending tasks, it steals one; if not, it checks the global queue. Only when all queues appear empty does it loop back and try again. This simple mechanism ensures that even when tasks are submitted unevenly or cluster heavily onto specific workers, idle threads can redistribute the load dynamically and keep overall throughput high.

Ok, let us get to the numbers now:

jay@jayport:~/code/concurrency $ time ./steal 
Done submitting tasks
2000000
Local Counts: 124977, 125012, 125019, 125000, 124995, 125004, 125007, 124985, 124969, 125034, 124976, 125022, 125006, 124996, 125000, 124998, 

real    0m17.341s
user    0m16.549s
sys     0m6.114s

The difference between the two runs is striking. In the first run without work stealing, the task counts per thread were wildly uneven: some threads barely touched twenty thousand tasks, while others processed over a quarter million. This imbalance meant many threads were idle much of the time, stretching the total runtime to almost 35 seconds.

With work stealing enabled, the distribution flattened almost perfectly - every thread handled roughly 125,000 tasks. As a result, all cores stayed busy throughout the execution, and the runtime dropped to just over 17 seconds, cutting the total time by about half.

Interrupting tasks

Let us change the game a bit now. Suppose you have a bunch of potentially long-running tasks, but you want the ability to exit early when needed. Here is a concrete example: imagine you are building something like cybersole - a bot that tries to buy limited drops the moment they go live. Now say FIFA 2026 tickets are being released and you want to scoop up a batch so you can resell later. Your target is 100 tickets for the final match. Given how fast these things disappear, you spin up 200 bots and plan to cancel half of them the moment you hit your target. My job here is to provide a thread pool that can cancel some of these bots functions on demand.

Before we start talking about cancellation or early-exit behavior, we need a realistic workload to test against. To keep things simple, each “task” in this example is just a small function object that sleeps for a random amount of time. This lets us simulate unpredictable workloads without involving any real computation.

Here is the task object:

struct bot_doing_work {
    int operator()() {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> d(10, 1000);

        // Each task sleeps between ~100 ms and ~10 seconds total
        int sleep_count = d(gen);
        while (--sleep_count > 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }

        // Return 1 on completion
        return sleep_count == 0;
    }
};

Every instance of bot_doing_work runs for a randomly chosen number of iterations, each iteration sleeping 10ms. A task that draws 10 finishes almost immediately; a task that draws 1000 takes around 10 seconds.

Driving the Thread Pool

The driver program spins up a pool with 10 worker threads, submits 10 of these randomized tasks, waits for all of them to complete, and prints some diagnostics:

int main() {

    int max_task_count = 10;
    int task_count = 5;

    thread_pool pool(10);

    std::atomic<int> count = 0;
    std::deque<future<int>> futures;

    // Submit 10 randomized tasks
    for (int i = 0; i < max_task_count; i++) {
        future<int> res = pool.submit(bot_doing_work());
        futures.push_back(std::move(res));
    }
    std::cout << "Done submitting tasks" << std::endl;

    // Gather results
    int result = 0;
    for (future<int>& fut : futures) {
        result += fut.get();
    }

    pool.shutdown();

    std::cout << "Completed: " << result << std::endl;

    std::cout << "Local Counts: ";
    for (int c : pool._local_count) {
        std::cout << c << ", ";
    }
    std::cout << std::endl;
}

This is the baseline before we add cancellation. Right now, all tasks run to completion - even if we hit our “100 tickets bought” target halfway through. Next, we will make the thread pool aware of cancellation so we can stop those extra bots early. Here, we will run 10 bot functions in the hope that 5 finish and then we can cancel the rest. We will use a co-operative cancellation mechanism where the coder needs to add checks in their function for any cancellation. Note that each bot_doing_work spends about 10 seconds doing its thing.

ay@jayport:~/code/concurrency $ clang++ -std=c++20 advanced_thread_management/long_running_tasks.cpp -o l
jay@jayport:~/code/concurrency $ time ./l
Done submitting tasks
Completed: 10
Local Counts: 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

real    0m8.501s
user    0m24.263s
sys     0m0.094s

Introducing interrupt_token: A Lightweight Cancellation Signal

To enable cooperative cancellation, we need a simple mechanism for signaling that a task should stop early. The thread pool can not force a running task to halt - that would be unsafe - but it can notify the task that it should stop the next time it checks a flag.

That is where the interrupt_token comes in.

A Minimal Cancellation Primitive

The interrupt_token is nothing more than a small wrapper around an atomic boolean. Tasks receive a copy of this token, and can check it periodically to see whether they should exit early.

class interrupt_token {
public:
    interrupt_token() = default;

    // Returns true if cancellation has been requested
    bool operator()() { return _interrupted.load(); }

private:
    template <typename T> friend class task_state;

    // Called by the thread-pool internals to trigger cancellation
    void set_interrupted() { _interrupted.store(true); }

    std::atomic<bool> _interrupted{false};
};

How This Will Be Used

This is intentional: cancellation is cooperative, not destructive. A task never gets killed while holding locks or in the middle of a critical operation. It only stops in places the programmer chooses.

The task_state object

template <typename T> class task_state {
    template <class Callable, class... Args> friend class template_task;

public:
    void post_completion(T&& result) {
        std::unique_lock<std::mutex> lock(_mutex);
        _completed = true;
        _result = std::move(result);
        _cv.notify_all();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] { return _completed; });
    }

    bool is_ready() { return _completed; }

    void interrupt() {
        _interrupt_token.set_interrupted();
        wait();
    }

    T& get_result() { return _result; }

private:
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _completed = false;
    interrupt_token _interrupt_token;
    T _result;
};

The future<T> object

template <typename T> class future {
public:
    future(std::shared_ptr<task_state<T>> state) : _state(state) {}

    T& get() {
        _state.get()->wait();
        return _state.get()->get_result();
    }

    void interrupt() { _state.get()->interrupt(); }

    bool is_ready() { return _state.get()->is_ready(); }

private:
    std::shared_ptr<task_state<T>> _state;
};

Injecting the interrupt_token Into User Tasks

Now that we have a lightweight interrupt_token, the next step is to make sure every task submitted to the thread pool actually receives one. Tasks cannot magically know about cancellation - they must be explicitly given access to the token so they can check it and exit early when needed.

This is the job of template_task.

Wrapping User Callables

Whenever the user submits a function, lambda, or functor to the thread pool, we wrap it in a template_task. This wrapper does three things:

  1. Creates a task_state<T> object, which stores the interrupt token and future state.
  2. Binds the user callable so that the first argument becomes interrupt_token&.
  3. Stores the bound callable in a type-erased std::function.

Here is the implementation:

template <class Callable, class... Args>
class template_task : public task_base {
public:
    using T = std::invoke_result_t<Callable, interrupt_token&, Args...>;

    template_task(Callable&& callable, Args&&... args)
        : _state(new task_state<T>()) {

        _function = std::bind(
            std::forward<Callable>(callable),
            std::ref(_state.get()->_interrupt_token),    // injected here
            std::forward<Args>(args)...);
    }

    void operator()() override {
        _state.get()->post_completion(std::move(_function()));
    }

    future<T> get_future() { return future<T>(_state); }

    ~template_task() override = default;

    task_kind kind() override { return task_kind::normal; }

private:
    std::function<T()> _function;
    std::shared_ptr<task_state<T>> _state;
};

The Important Part: Injection

The key line is:

std::ref(_state.get()->_interrupt_token)

This is where the users callable gets “rewired” so that the first argument is a reference to the tasks own interrupt_token. From the users perspective, the callable simply needs to take a signature like:

int my_task(interrupt_token& token) {
    while (!token()) {
        // do work...
    }
    return something;
}

They do not need to know anything about task_state, futures, or how the pool manages cancellation. The token just appears.

Cooperative Cancellation in Action

Now that tasks can receive an interrupt_token, we can implement cooperative cancellation: letting some tasks exit early once enough work has been completed.

Making Tasks Cancellation-Aware

The task itself is almost unchanged from before - the only difference is that it periodically checks the stop_token:

struct bot_doing_work {
    int operator()(interrupt_token& stop_token) {
        std::random_device rd;
        std::mt19937 gen(rd());
        std::uniform_int_distribution<> d(0, 1000);
        int sleep_count = d(gen);

        while (!stop_token() && --sleep_count > 0) {
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
        }

        // Return 1 if the task completed fully, 0 if it was interrupted
        return sleep_count == 0;
    }
};

Each task runs a randomized number of iterations, checking the token at each step. If cancellation is requested, the task exits early.


Coordinating Task Completion

The main program submits multiple tasks to the pool, waits for a subset to complete, and then signals the rest to stop:

int main() {

    int max_task_count = 10;
    int task_count = 5;

    thread_pool pool(10);

    std::atomic<int> count = 0;
    bool exit = false;
    std::deque<future<int>> futures;
    std::vector<std::thread> threads;

    // Submit tasks
    for (int i = 0; i < max_task_count; i++) {
        future<int> res = pool.submit(bot_doing_work());
        futures.push_back(std::move(res));

        // Monitor completion and trigger cancellation when enough tasks finish
        std::thread t([&futures, i, &exit, &count, task_count]() {
            bool should_self_exit = false;
            while (!exit && !should_self_exit) {
                std::this_thread::sleep_for(std::chrono::milliseconds(10));
                if (futures[i].is_ready()) {
                    count.fetch_add(1);
                    should_self_exit = true;
                }
                if (count.load() == task_count) {
                    futures[i].interrupt();  // signal remaining tasks
                    exit = true;
                }
            }
        });
        threads.push_back(std::move(t));
    }

    std::cout << "Done submitting tasks" << std::endl;

    for (std::thread& t : threads) {
        t.join();
    }

    pool.shutdown();

    int result = 0;
    for (future<int>& fut : futures) {
        result += fut.get();
    }

    std::cout << "Completed: " << result << std::endl;

    std::cout << "Local Counts: ";
    for (int c : pool._local_count) {
        std::cout << c << ", ";
    }
    std::cout << std::endl;
}

How It Works

  1. Each task gets an interrupt_token injected by the template_task wrapper.
  2. A small monitoring thread per task checks if the task has completed.
  3. Once task_count tasks finish, the monitor threads call interrupt() on the remaining futures, setting the interrupt_token for each task.
  4. Tasks that see stop_token() exit early, freeing up threads for other work or preventing unnecessary computation.

This pattern demonstrates a cooperative cancellation loop, which is safer than forcibly killing threads and allows long-running tasks to exit cleanly when no longer needed.

jay@jayport:~/code/concurrency $ clang++ -std=c++20 advanced_thread_management/interrupting_tasks.cpp -o i
jay@jayport:~/code/concurrency $ time ./i
Done submitting tasks
Completed: 5
Local Counts: 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 

real    0m5.207s
user    0m6.491s
sys     0m0.100s

There! Only 5 tasks (successfully) completed!!!

And we finally have a -

A low-contention, work-stealing, interruptible Thread Pool!

#include <condition_variable>
#include <deque>
#include <exception>
#include <functional>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
#include <type_traits>
#include <vector>

class interrupt_token {
public:
    interrupt_token() = default;

    bool operator()() { return _interrupted.load(); }

private:
    template <typename T> friend class task_state;
    void set_interrupted() { _interrupted.store(true); }
    std::atomic<bool> _interrupted{false};
};

template <typename T> class task_state {
    template <class Callable, class... Args> friend class template_task;

public:
    void post_completion(T&& result) {
        std::unique_lock<std::mutex> lock(_mutex);
        _completed = true;
        _result = std::move(result);
        _cv.notify_all();
    }

    void wait() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] { return _completed; });
    }

    bool is_ready() { return _completed; }

    void interrupt() {
        _interrupt_token.set_interrupted();
        wait();
    }

    T& get_result() { return _result; }

private:
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _completed = false;
    interrupt_token _interrupt_token;
    T _result;
};

template <typename T> class future {
public:
    future(std::shared_ptr<task_state<T>> state) : _state(state) {}

    T& get() {
        _state.get()->wait();
        return _state.get()->get_result();
    }

    void interrupt() { _state.get()->interrupt(); }

    bool is_ready() { return _state.get()->is_ready(); }

private:
    std::shared_ptr<task_state<T>> _state;
};

enum class task_kind { normal, exit, timeout };

class task_base {
public:
    virtual ~task_base() = default;
    virtual void operator()() = 0;
    virtual task_kind kind() = 0;
};

class exit_task : public task_base {
    void operator()() override {}
    task_kind kind() override { return task_kind::exit; }
};

class timeout_task : public task_base {
    void operator()() override {}
    task_kind kind() override { return task_kind::timeout; }
};

template <class Callable, class... Args>
class template_task : public task_base {
public:
    using T = std::invoke_result_t<Callable, interrupt_token&, Args...>;
    template_task(Callable&& callable, Args&&... args)
        : _state(new task_state<T>()) {
        _function = std::bind(std::forward<Callable>(callable),
                              std::ref(_state.get()->_interrupt_token),
                              std::forward<Args>(args)...);
    }

    void operator()() override {
        _state.get()->post_completion(std::move(_function()));
    }

    future<T> get_future() { return future<T>(_state); }

    ~template_task() override = default;

    task_kind kind() override { return task_kind::normal; }

private:
    std::function<T()> _function;
    std::shared_ptr<task_state<T>> _state;
};

using packaged_task = std::unique_ptr<task_base>;

class task_queue {

    packaged_task make_exit() { return std::make_unique<exit_task>(); }

    packaged_task make_timeout() { return std::make_unique<timeout_task>(); }

public:
    void submit(packaged_task&& task) {
        std::unique_lock<std::mutex> lock(_mutex);
        if (_shutdown_requested.load()) {
            throw std::runtime_error("Can not submit a task after shutdown");
        }
        _task_list.push_back(std::move(task));
        _cv.notify_one();
    }

    packaged_task get() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this] {
            return _task_list.size() > 0 || _shutdown_requested.load();
        });
        if (_shutdown_requested.load()) {
            return make_exit();
        }
        packaged_task task = std::move(_task_list.front());
        _task_list.pop_front();
        return task;
    }

    template <class Rep, class Period>
    packaged_task get(std::chrono::duration<Rep, Period> duration) {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait_for(lock, duration, [this] {
            return _task_list.size() > 0 || _shutdown_requested.load();
        });
        if (_shutdown_requested.load()) {
            return make_exit();
        }
        if (_task_list.size() == 0) {
            return make_timeout();
        }
        packaged_task task = std::move(_task_list.front());
        _task_list.pop_front();
        return task;
    }

    int estimated_size() { return _task_list.size() || _shutdown_quick; }

    void shutdown() {
        std::unique_lock<std::mutex> lock(_mutex);
        _shutdown_requested.store(true);
        _shutdown_quick = true;
        _cv.notify_all();
    }

private:
    std::mutex _mutex;
    std::atomic<bool> _shutdown_requested = false;
    bool _shutdown_quick = false;
    std::condition_variable _cv;
    std::deque<packaged_task> _task_list;
};

class thread_pool {

public:
    thread_pool(int count)
        : _count(count), _worker_queues(count), _local_count(count) {
        for (int i = 0; i < count; i++) {
            _threads.push_back(
                std::thread(&thread_pool::_worker_loop, this, i));
            _local_count[i] = 0;
        }
    }

    template <class Callable, class... Args>
    auto submit(Callable&& callable, Args&&... args)
        -> future<std::invoke_result_t<Callable, interrupt_token&, Args...>> {
        using T = std::invoke_result_t<Callable, interrupt_token&, Args...>;
        auto fn_ptr = new template_task(std::forward<Callable>(callable),
                                        std::forward<Args>(args)...);
        future<T> fut = fn_ptr->get_future();
        packaged_task function = std::unique_ptr<task_base>(fn_ptr);

        int wrap_index = _count + 1;

        int index = ++_approx_submission_index;
        if (index % wrap_index == 0) {
            _queue.submit(std::move(function));
        } else {
            index = (index - 1) % _count;
            _worker_queues[index].submit(std::move(function));
        }
        return fut;
    }

    void shutdown() {
        _queue.shutdown();
        for (int i = 0; i < _count; i++) {
            _worker_queues[i].shutdown();
        }
    }

    ~thread_pool() {
        try {
            for (std::thread& t : _threads) {
                if (t.joinable()) {
                    t.join();
                }
            }
        } catch (const std::exception& e) {
            _queue.shutdown();
            std::cerr << e.what() << std::endl;
            throw e;
        }
    }
    std::vector<int> _local_count;

private:
    void _worker_loop(int my_index) {
        auto& my_queue = _worker_queues[my_index];
        std::chrono::microseconds timeout(100);
        bool done = false;

        size_t index_ = my_index;
        auto index = [&index_, this]() -> size_t {
            return index_++ % (_count + 1);
        };

        while (!done) {
        start:
            packaged_task task;
            if (my_queue.estimated_size() == 0) {
                size_t queue_index = index();
                if (queue_index == _count) {
                    if (_queue.estimated_size() > 0) {
                        task = _queue.get(timeout);
                    } else {
                        goto start;
                    }
                } else {
                    if (_worker_queues[queue_index].estimated_size() > 0) {
                        task = _worker_queues[queue_index].get(timeout);
                    } else {
                        goto start;
                    }
                }
            } else {
                task = my_queue.get(timeout);
            }

            switch (task.get()->kind()) {
            case task_kind::normal:
                task.get()->operator()();
                _local_count[my_index]++;
                break;
            case task_kind::exit:
                done = true;
                break;
            case task_kind::timeout:
            default:
                // do nothing
                break;
            }
        }
    }

private:
    int _count;
    std::vector<std::thread> _threads;
    task_queue _queue;
    std::vector<task_queue> _worker_queues;
    size_t _approx_submission_index = 0;
};