Advanced Thread Management
Author: jay@thoughtmachina.com
Modern C++ makes it surprisingly easy to spawn thousands of threads, but managing them efficiently and safely is a different challenge entirely. Real applications need structured shutdown, cooperative cancellation, clean error propagation, and a way to package user code so workers can execute it uniformly. In this article, we build a compact yet fully featured thread pool from first principles, unfolding each component step-by-step: a common task interface, a packaged_task wrapper, future based result retrieval, work distribution, and finally a cooperative interruption mechanism via interrupt_token.
A Simple Thread Pool
Before introducing interruption or futures, we start with the simplest building block: a thread-safe queue that supports blocking retrieval and clean shutdown. The queue stores pending work items in a std::deque<Task> and uses a mutex plus condition variable to synchronize access. Worker threads call get(), which sleeps until either a task becomes available or shutdown is requested. Once shutdown is triggered, get() returns nullptr, allowing workers to exit without races or dangling threads.
The Task Queue - Safe Communication Between Threads
The task queue is a classic producer-consumer structure. Producers submit tasks; worker threads consume them. Access is synchronized using a mutex and condition variable.
using Task = std::function<void()>;
class task_queue {
public:
void submit(Task&& task) {
std::unique_lock<std::mutex> lock(_mutex);
if (_shutdown_requested.load()) {
throw std::runtime_error("Can not submit a task after shutdown");
}
_task_list.push_back(std::move(task));
_cv.notify_all();
}
std::unique_ptr<Task> get() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] {
return !_task_list.empty() || _shutdown_requested.load();
});
if (_shutdown_requested.load()) {
return nullptr;
}
std::unique_ptr<Task> task =
std::make_unique<Task>(std::move(_task_list.front()));
_task_list.pop_front();
return task;
}
void shutdown() {
std::unique_lock<std::mutex> lock(_mutex);
_shutdown_requested.store(true);
_cv.notify_all();
}
private:
std::mutex _mutex;
std::atomic<bool> _shutdown_requested = false;
std::condition_variable _cv;
std::deque<Task> _task_list;
};
How it works
The queue holds tasks in a std::deque<Task> and protects it with a mutex. Worker threads call get(), which blocks on a condition variable until either:
- a task is available, or
- shutdown has been requested.
When shutdown occurs, get() returns nullptr, signaling the workers to exit.
This cleanly stops the pool without race conditions or dangling threads.
The Thread Pool - Spawning Workers and Running Tasks
The thread_pool class launches a fixed number of worker threads. Each worker repeatedly fetches a task from the queue and executes it.
class thread_pool {
public:
thread_pool(int count) : _count(count) {
for (int i = 0; i < count; i++) {
_threads.push_back(std::thread(&thread_pool::_worker_loop, this));
}
}
template<class Callable, class ...Args>
void submit(Callable&& callable, Args&& ...args) {
Task function = std::bind(std::forward<Callable>(callable),
std::forward<Args>(args)...);
_queue.submit(std::move(function));
}
void shutdown() {
_queue.shutdown();
}
~thread_pool() {
try {
for (std::thread& t : _threads) {
if (t.joinable()) {
t.join();
}
}
} catch (...) {
_queue.shutdown();
throw;
}
}
private:
void _worker_loop() {
while (true) {
std::unique_ptr<Task> task = _queue.get();
if (task == nullptr) {
break;
}
(*task)();
}
}
private:
int _count;
std::vector<std::thread> _threads;
task_queue _queue;
};
How it works
- The constructor creates
countworker threads, each running_worker_loop(). submit()binds the users function and arguments into aTaskand pushes it into the queue.- Workers repeatedly call
get():- If a task is available → run it
- If shutdown occurs → exit the loop
- The destructor joins all threads to ensure clean shutdown.
This design ensures that tasks submitted before shutdown finish executing, while preventing new tasks from being added during shutdown.
Using the Thread Pool
Here is a minimal example of how it would be used:
int main() {
thread_pool pool(4);
for (int i = 0; i < 10; i++) {
pool.submit([i] {
std::cout << "Task " << i << " running\n";
});
}
pool.shutdown();
}
Each submitted task is executed by the next available worker thread. Once shutdown is called, workers drain the queue and exit cleanly.
Waiting on Tasks
C++ provides tools like std::future, std::promise, and std::packaged_task for asynchronous programming. Understanding their internal mechanisms helps in building custom concurrency utilities. Below is a minimal implementation of a future/packaged_task system, coupled with a simple thread pool, showing how tasks can be queued, executed, and results safely retrieved.
task_state<T> - The Shared State
template <typename T> class task_state {
public:
void post_completion(T&& result) {
std::unique_lock<std::mutex> lock(_mutex);
_completed = true;
_result = std::move(result);
_cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] { return _completed; });
}
T& get_result() { return _result; }
private:
std::mutex _mutex;
std::condition_variable _cv;
bool _completed = false;
T _result;
};
This class stores the result of a task and provides safe synchronization. post_completion sets the result and notifies all waiting threads. Consumers call wait() to block until the result is ready and then use get_result() to retrieve it.
future<T> - The Public Handle
template <typename T> class future {
public:
future(std::shared_ptr<task_state<T>> state) : _state(state) {}
T& get() {
_state.get()->wait();
return _state.get()->get_result();
}
private:
std::shared_ptr<task_state<T>> _state;
};
A future is the handle through which users retrieve results. It does not perform computation but waits for the associated task_state to be completed.
template_task - Wrapping Callables
class task_base {
public:
virtual ~task_base() = default;
virtual void operator()() = 0;
};
template <class Callable, class... Args>
class template_task : public task_base {
public:
using T = std::invoke_result_t<Callable, Args...>;
template_task(Callable&& callable, Args&&... args)
: _state(new task_state<T>()) {
_function = std::bind(std::forward<Callable>(callable),
std::forward<Args>(args)...);
}
void operator()() override {
_state.get()->post_completion(std::move(_function()));
}
future<T> get_future() { return future<T>(_state); }
~template_task() override = default;
private:
std::function<T()> _function;
std::shared_ptr<task_state<T>> _state;
};
template_task stores a callable and its arguments. When executed, it calls the function and posts the result to the shared state, enabling the future to access it.
task_queue - Holding Pending Work
using packaged_task = std::unique_ptr<task_base>;
class task_queue {
public:
void submit(packaged_task&& task) {
std::unique_lock<std::mutex> lock(_mutex);
if (_shutdown_requested.load()) {
throw std::runtime_error("Can not submit a task after shutdown");
}
_task_list.push_back(std::move(task));
_cv.notify_all();
}
packaged_task get() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] {
return !_task_list.empty() || _shutdown_requested.load();
});
if (_shutdown_requested.load()) return nullptr;
packaged_task task = std::move(_task_list.front());
_task_list.pop_front();
return task;
}
void shutdown() {
std::unique_lock<std::mutex> lock(_mutex);
_shutdown_requested.store(true);
_cv.notify_all();
}
private:
std::mutex _mutex;
std::atomic<bool> _shutdown_requested = false;
std::condition_variable _cv;
std::deque<packaged_task> _task_list;
};
The task_queue allows threads to safely submit and retrieve tasks. The condition variable lets worker threads sleep until new work arrives. The shutdown mechanism ensures no tasks are added after the pool is being destroyed.
thread_pool - Running Tasks Concurrently
class thread_pool {
public:
thread_pool(int count) : _count(count) {
for (int i = 0; i < count; i++) {
_threads.push_back(std::thread(&thread_pool::_worker_loop, this));
}
}
template <class Callable, class... Args>
auto submit(Callable&& callable, Args&&... args)
-> future<std::invoke_result_t<Callable, Args...>> {
using T = std::invoke_result_t<Callable, Args...>;
auto fn_ptr = new template_task(std::forward<Callable>(callable),
std::forward<Args>(args)...);
future<T> fut = fn_ptr->get_future();
packaged_task function = std::unique_ptr<task_base>(fn_ptr);
_queue.submit(std::move(function));
return fut;
}
void shutdown() { _queue.shutdown(); }
~thread_pool() {
for (auto& t : _threads) {
if (t.joinable()) t.join();
}
}
private:
void _worker_loop() {
while (true) {
packaged_task task = _queue.get();
if (!task) break; // Shutdown requested
(*task)(); // Execute the task
}
}
private:
int _count;
std::vector<std::thread> _threads;
task_queue _queue;
};
Explanation of Key Functions:
- Constructor (
thread_pool(int count))- Spawns
countthreads. - Each thread immediately begins executing
_worker_loop, waiting for tasks to arrive in the queue.
- Spawns
submit- Wraps any callable into a
template_taskand retrieves itsfuture. - The task is then submitted to the queue.
- Returns the
futureso the caller can wait for the result asynchronously.
- Wraps any callable into a
_worker_loop- Each worker continuously fetches tasks from the queue.
- If the queue is empty, the worker thread blocks on the condition variable.
- On shutdown,
get()returnsnullptr, signaling the worker to exit the loop. - Otherwise, it executes the task by calling
operator().
shutdownand Destructor- Marks the queue as shutdown, waking up all waiting threads.
- Destructor joins all threads to ensure graceful termination.
- Any tasks submitted after shutdown will throw an exception.
Thread Pool Overview:
- Multiple threads execute tasks concurrently.
- Tasks can be submitted from any thread and are executed in the order they arrive.
- Each task has an associated
future, allowing the caller to retrieve its result when ready. - Synchronization is handled by mutexes and condition variables, ensuring thread safety.
Example Usage
class function {
public:
int operator()(int x) { return x + 10; }
};
int main() {
thread_pool pool(2);
auto fut = pool.submit(function(), 32);
std::cout << "Result: " << fut.get() << std::endl;
pool.shutdown();
}
Reducing contention
Ok, now let us see how the threadpool performs under high contention:
struct number {
int operator()() { return 1; }
};
int main() {
int task_count_per_thread = 1000;
int submission_thread_count = 1000;
int count = 0;
thread_pool pool(16);
std::vector<std::thread> submission_threads;
std::vector<int> local_results(submission_thread_count);
for (int x = 0; x < submission_thread_count; x++) {
std::thread t([&pool, task_count_per_thread, &local_results, x]() {
int count = 0;
std::deque<future<int>> futures;
for (int i = 0; i < task_count_per_thread; i++) {
future<int> res = pool.submit(number());
futures.push_back(std::move(res));
}
for (future<int>& fut : futures) {
count += fut.get();
}
local_results[x] = count;
});
submission_threads.push_back(std::move(t));
}
std::cout << "Done submitting tasks" << std::endl;
for (std::thread& t : submission_threads) {
t.join();
}
for (int x : local_results) {
count += x;
}
std::cout << count << std::endl;
pool.shutdown();
}
Let us compare the following 2 scenarios:
int task_count_per_thread = 1000;
int submission_thread_count = 2000;
jay@jayport:~/code/concurrency $ time ./x
Done submitting tasks
2000000
real 0m4.653s
user 0m5.570s
sys 0m30.235s
Let us expect more contention now:
int task_count_per_thread = 2000;
int submission_thread_count = 1000;
jay@jayport:~/code/concurrency $ time ./x
Done submitting tasks
2000000
real 0m6.064s
user 0m4.285s
sys 0m42.097s
Reducing Contention with Per-Worker Queues
In high-performance thread pools, contention on a single shared queue can become a bottleneck. Every submit() and get() operation requires locking the same mutex, which slows down task submission and execution when many threads are involved.
One way to reduce this contention is per-worker queues: each worker thread maintains its own private queue, and tasks are distributed across these queues. Workers first try to fetch tasks from their local queue. Only if their queue is empty do they access the central queue, reducing the frequency of global lock acquisition.
Here is how we implemented it in our thread pool:
std::vector<task_queue> _worker_queues;
size_t _approx_submission_index = 0;
template <class Callable, class... Args>
auto submit(Callable&& callable, Args&&... args)
-> future<std::invoke_result_t<Callable, Args...>> {
auto fn_ptr = new template_task(std::forward<Callable>(callable),
std::forward<Args>(args)...);
future<T> fut = fn_ptr->get_future();
packaged_task function = std::unique_ptr<task_base>(fn_ptr);
int wrap_index = _count + 1;
int index = ++_approx_submission_index;
if (index % wrap_index == 0) {
_queue.submit(std::move(function)); // occasional fallback to central queue
} else {
index = (index - 1) % _count;
_worker_queues[index].submit(std::move(function)); // local queue
}
return fut;
}
How it works:
_worker_queuesstores a queue for each worker thread.submit()distributes tasks in a round-robin manner, with most tasks going to worker-local queues.- The central queue
_queueis only used occasionally, preventing all threads from contending on a single lock. - Worker threads check their local queue first, and only poll the central queue when idle.
This simple adjustment drastically reduces lock contention under heavy load, which is critical when thousands of tasks are submitted concurrently.
Worker Loop with Timeout:
To support this design, workers fetch tasks from their local queue and only occasionally from the central queue, using a short timeout to avoid busy waiting:
void _worker_loop(int my_index) {
auto& my_queue = _worker_queues[my_index];
std::chrono::microseconds timeout(100);
bool done = false;
while (!done) {
packaged_task task;
if (my_queue.estimated_size() == 0) {
task = _queue.get(timeout); // central queue
} else {
task = my_queue.get(timeout); // local queue
}
switch (task.get()->kind()) {
case task_kind::normal:
task.get()->operator()();
break;
case task_kind::exit:
done = true;
break;
case task_kind::timeout:
default:
// nothing to do
break;
}
}
}
By giving each worker its own queue and using timed polling for the central queue, threads spend less time contending for a single mutex. This reduces sys time spikes seen in highly concurrent submissions.
Here is the complete code:
template <typename T> class task_state {
public:
void post_completion(T&& result) {
std::unique_lock<std::mutex> lock(_mutex);
_completed = true;
_result = std::move(result);
_cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] { return _completed; });
}
T& get_result() { return _result; }
private:
std::mutex _mutex;
std::condition_variable _cv;
bool _completed = false;
T _result;
};
template <typename T> class future {
public:
future(std::shared_ptr<task_state<T>> state) : _state(state) {}
T& get() {
_state.get()->wait();
return _state.get()->get_result();
}
private:
std::shared_ptr<task_state<T>> _state;
};
enum class task_kind { normal, exit, timeout };
class task_base {
public:
virtual ~task_base() = default;
virtual void operator()() = 0;
virtual task_kind kind() = 0;
};
class exit_task : public task_base {
void operator()() override {}
task_kind kind() override { return task_kind::exit; }
};
class timeout_task : public task_base {
void operator()() override {}
task_kind kind() override { return task_kind::timeout; }
};
template <class Callable, class... Args>
class template_task : public task_base {
public:
using T = std::invoke_result_t<Callable, Args...>;
template_task(Callable&& callable, Args&&... args)
: _state(new task_state<T>()) {
_function = std::bind(std::forward<Callable>(callable),
std::forward<Args>(args)...);
}
void operator()() override {
_state.get()->post_completion(std::move(_function()));
}
future<T> get_future() { return future<T>(_state); }
~template_task() override = default;
task_kind kind() override { return task_kind::normal; }
private:
std::function<T()> _function;
std::shared_ptr<task_state<T>> _state;
};
using packaged_task = std::unique_ptr<task_base>;
class task_queue {
packaged_task make_exit() { return std::make_unique<exit_task>(); }
packaged_task make_timeout() { return std::make_unique<timeout_task>(); }
public:
void submit(packaged_task&& task) {
std::unique_lock<std::mutex> lock(_mutex);
if (_shutdown_requested.load()) {
throw std::runtime_error("Can not submit a task after shutdown");
}
_task_list.push_back(std::move(task));
_cv.notify_all();
}
packaged_task get() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] {
return _task_list.size() > 0 || _shutdown_requested.load();
});
if (_shutdown_requested.load()) {
return make_exit();
}
packaged_task task = std::move(_task_list.front());
_task_list.pop_front();
return task;
}
template <class Rep, class Period>
packaged_task get(std::chrono::duration<Rep, Period> duration) {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, duration, [this] {
return _task_list.size() > 0 || _shutdown_requested.load();
});
if (_shutdown_requested.load()) {
return make_exit();
}
if (_task_list.size() == 0) {
return make_timeout();
}
packaged_task task = std::move(_task_list.front());
_task_list.pop_front();
return task;
}
int estimated_size() { return _task_list.size(); }
void shutdown() {
std::unique_lock<std::mutex> lock(_mutex);
_shutdown_requested.store(true);
_cv.notify_all();
}
private:
std::mutex _mutex;
std::atomic<bool> _shutdown_requested = false;
std::condition_variable _cv;
std::deque<packaged_task> _task_list;
};
class thread_pool {
public:
thread_pool(int count) : _count(count), _worker_queues(count) {
for (int i = 0; i < count; i++) {
_threads.push_back(
std::thread(&thread_pool::_worker_loop, this, i));
}
}
template <class Callable, class... Args>
auto submit(Callable&& callable, Args&&... args)
-> future<std::invoke_result_t<Callable, Args...>> {
using T = std::invoke_result_t<Callable, Args...>;
auto fn_ptr = new template_task(std::forward<Callable>(callable),
std::forward<Args>(args)...);
future<T> fut = fn_ptr->get_future();
packaged_task function = std::unique_ptr<task_base>(fn_ptr);
int wrap_index = _count + 1;
int index = ++_approx_submission_index;
if (index % wrap_index == 0) {
_queue.submit(std::move(function));
} else {
index = (index - 1) % _count;
_worker_queues[index].submit(std::move(function));
}
return fut;
}
void shutdown() { _queue.shutdown(); }
~thread_pool() {
try {
for (std::thread& t : _threads) {
if (t.joinable()) {
t.join();
}
}
} catch (const std::exception& e) {
_queue.shutdown();
std::cerr << e.what() << std::endl;
throw e;
}
}
private:
void _worker_loop(int my_index) {
auto& my_queue = _worker_queues[my_index];
std::chrono::microseconds timeout(100);
bool done = false;
while (!done) {
packaged_task task;
if (my_queue.estimated_size() == 0) {
task = _queue.get(timeout);
} else {
task = my_queue.get(timeout);
}
switch (task.get()->kind()) {
case task_kind::normal:
task.get()->operator()();
break;
case task_kind::exit:
done = true;
break;
case task_kind::timeout:
default:
// do nothing
break;
}
}
}
private:
int _count;
std::vector<std::thread> _threads;
task_queue _queue;
std::vector<task_queue> _worker_queues;
size_t _approx_submission_index = 0;
};
Let us compare the following 2 scenarios (after making per-worker queues):
int task_count_per_thread = 1000;
int submission_thread_count = 2000;
jay@jayport:~/code/concurrency $ time ./low_1k_2k
Done submitting tasks
2000000
real 0m2.008s
user 0m8.101s
sys 0m6.845s
And the second scenario:
int task_count_per_thread = 2000;
int submission_thread_count = 1000;
jay@jayport:~/code/concurrency $ time ./low_2k_1k
Done submitting tasks
2000000
real 0m2.088s
user 0m8.966s
sys 0m6.451s
Notice that the real time is now roughly 2 seconds, significantly faster than the previous implementation with a single global queue, which took 4–6 seconds. The sys time (time spent in the kernel handling locks) has. dropped dramatically.
Stealing Work
Now, let us try to simulate an even work distribution in our worker-local queues (and let us have each task be non-trivial - aka sleep for 100ms lol). Ok, we now have a recipe for a scenario where some workers have a lot of work.
template <class Callable, class... Args>
auto submit(Callable&& callable, Args&&... args)
-> future<std::invoke_result_t<Callable, Args...>> {
using T = std::invoke_result_t<Callable, Args...>;
auto fn_ptr = new template_task(std::forward<Callable>(callable),
std::forward<Args>(args)...);
future<T> fut = fn_ptr->get_future();
packaged_task function = std::unique_ptr<task_base>(fn_ptr);
int wrap_index = _count + 1;
// what if we had an uneven work distribution?
std::random_device rd;
std::mt19937 gen(rd());
std::poisson_distribution<> d(9);
int index = d(gen);
if (index % wrap_index == 0) {
_queue.submit(std::move(function));
} else {
index = (index - 1) % _count;
_worker_queues[index].submit(std::move(function));
}
return fut;
}
Here, we have std::poisson_distribution<> to create a non-uniformly distributed set of numbers. We will also update our task to do a bit more work than simply returning a number.
struct number {
int operator()() {
std::this_thread::sleep_for(std::chrono::microseconds(100));
return 1;
}
};
And the runner code:
int main() {
int task_count_per_thread = 200000;
int submission_thread_count = 10;
int count = 0;
thread_pool pool(16);
std::vector<std::thread> submission_threads;
std::vector<int> local_results(submission_thread_count);
for (int x = 0; x < submission_thread_count; x++) {
std::thread t([&pool, task_count_per_thread, &local_results, x]() {
int count = 0;
std::deque<future<int>> futures;
for (int i = 0; i < task_count_per_thread; i++) {
future<int> res = pool.submit(number());
futures.push_back(std::move(res));
}
for (future<int>& fut : futures) {
count += fut.get();
}
local_results[x] = count;
});
submission_threads.push_back(std::move(t));
}
std::cout << "Done submitting tasks" << std::endl;
for (std::thread& t : submission_threads) {
t.join();
}
for (int x : local_results) {
count += x;
}
pool.shutdown();
std::cout << count << std::endl;
std::cout << "Local Counts: ";
for (int c : pool._local_count) {
std::cout << c << ", ";
}
std::cout << std::endl;
}
Here Local Counts are the number of tasks executed on each thread.
Let us see the results, shall we?
jay@jayport:~/code/concurrency $ time ./uneven
Done submitting tasks
2000000
Local Counts: 14259, 15958, 32850, 69216, 121753, 182369, 234013, 262886, 263429, 237218, 195348, 144857, 100608, 64890, 38590, 21756,
real 0m34.952s
user 0m14.940s
sys 0m13.846s
Clearly, the uneven distribution experiment produced exactly what we expected: task starvation on some workers and work piles on others.
So let us fix that. The classic solution is work stealing: If a worker runs out of tasks in its own local queue, it should start checking other queues - including the global one - and “steal” tasks when possible.
Here is the new worker loop:
void _worker_loop(int my_index) {
auto& my_queue = _worker_queues[my_index];
std::chrono::microseconds timeout(100);
bool done = false;
size_t index_ = my_index;
auto index = [&index_, this]() -> size_t {
return index_++ % (_count + 1);
};
while (!done) {
start:
packaged_task task;
if (my_queue.estimated_size() == 0) {
size_t queue_index = index();
if (queue_index == _count) {
if (_queue.estimated_size() > 0) {
task = _queue.get(timeout);
} else {
goto start;
}
} else {
if (_worker_queues[queue_index].estimated_size() > 0) {
task = _worker_queues[queue_index].get(timeout);
} else {
goto start;
}
}
} else {
task = my_queue.get(timeout);
}
switch (task.get()->kind()) {
case task_kind::normal:
task.get()->operator()();
_local_count[my_index]++;
break;
case task_kind::exit:
done = true;
break;
case task_kind::timeout:
default:
// do nothing
break;
}
}
}
Each worker maintains its own local queue, and under normal conditions it simply pulls tasks from there. However, when its queue becomes empty, the worker does not just sit idle. Instead, it begins scanning the other worker queues in a round-robin pattern, incrementing an index that wraps around all local queues as well as the global queue. If it finds a worker queue with pending tasks, it steals one; if not, it checks the global queue. Only when all queues appear empty does it loop back and try again. This simple mechanism ensures that even when tasks are submitted unevenly or cluster heavily onto specific workers, idle threads can redistribute the load dynamically and keep overall throughput high.
Ok, let us get to the numbers now:
jay@jayport:~/code/concurrency $ time ./steal
Done submitting tasks
2000000
Local Counts: 124977, 125012, 125019, 125000, 124995, 125004, 125007, 124985, 124969, 125034, 124976, 125022, 125006, 124996, 125000, 124998,
real 0m17.341s
user 0m16.549s
sys 0m6.114s
The difference between the two runs is striking. In the first run without work stealing, the task counts per thread were wildly uneven: some threads barely touched twenty thousand tasks, while others processed over a quarter million. This imbalance meant many threads were idle much of the time, stretching the total runtime to almost 35 seconds.
With work stealing enabled, the distribution flattened almost perfectly - every thread handled roughly 125,000 tasks. As a result, all cores stayed busy throughout the execution, and the runtime dropped to just over 17 seconds, cutting the total time by about half.
Interrupting tasks
Let us change the game a bit now. Suppose you have a bunch of potentially long-running tasks, but you want the ability to exit early when needed. Here is a concrete example: imagine you are building something like cybersole - a bot that tries to buy limited drops the moment they go live. Now say FIFA 2026 tickets are being released and you want to scoop up a batch so you can resell later. Your target is 100 tickets for the final match. Given how fast these things disappear, you spin up 200 bots and plan to cancel half of them the moment you hit your target. My job here is to provide a thread pool that can cancel some of these bots functions on demand.
Before we start talking about cancellation or early-exit behavior, we need a realistic workload to test against. To keep things simple, each “task” in this example is just a small function object that sleeps for a random amount of time. This lets us simulate unpredictable workloads without involving any real computation.
Here is the task object:
struct bot_doing_work {
int operator()() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> d(10, 1000);
// Each task sleeps between ~100 ms and ~10 seconds total
int sleep_count = d(gen);
while (--sleep_count > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Return 1 on completion
return sleep_count == 0;
}
};
Every instance of bot_doing_work runs for a randomly chosen number of iterations, each iteration sleeping 10ms. A task that draws 10 finishes almost immediately; a task that draws 1000 takes around 10 seconds.
Driving the Thread Pool
The driver program spins up a pool with 10 worker threads, submits 10 of these randomized tasks, waits for all of them to complete, and prints some diagnostics:
int main() {
int max_task_count = 10;
int task_count = 5;
thread_pool pool(10);
std::atomic<int> count = 0;
std::deque<future<int>> futures;
// Submit 10 randomized tasks
for (int i = 0; i < max_task_count; i++) {
future<int> res = pool.submit(bot_doing_work());
futures.push_back(std::move(res));
}
std::cout << "Done submitting tasks" << std::endl;
// Gather results
int result = 0;
for (future<int>& fut : futures) {
result += fut.get();
}
pool.shutdown();
std::cout << "Completed: " << result << std::endl;
std::cout << "Local Counts: ";
for (int c : pool._local_count) {
std::cout << c << ", ";
}
std::cout << std::endl;
}
This is the baseline before we add cancellation. Right now, all tasks run to completion - even if we hit our “100 tickets bought” target halfway through. Next, we will make the thread pool aware of cancellation so we can stop those extra bots early. Here, we will run 10 bot functions in the hope that 5 finish and then we can cancel the rest. We will use a co-operative cancellation mechanism where the coder needs to add checks in their function for any cancellation. Note that each bot_doing_work spends about 10 seconds doing its thing.
ay@jayport:~/code/concurrency $ clang++ -std=c++20 advanced_thread_management/long_running_tasks.cpp -o l
jay@jayport:~/code/concurrency $ time ./l
Done submitting tasks
Completed: 10
Local Counts: 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
real 0m8.501s
user 0m24.263s
sys 0m0.094s
Introducing interrupt_token: A Lightweight Cancellation Signal
To enable cooperative cancellation, we need a simple mechanism for signaling that a task should stop early. The thread pool can not force a running task to halt - that would be unsafe - but it can notify the task that it should stop the next time it checks a flag.
That is where the interrupt_token comes in.
A Minimal Cancellation Primitive
The interrupt_token is nothing more than a small wrapper around an atomic boolean. Tasks receive a copy of this token, and can check it periodically to see whether they should exit early.
class interrupt_token {
public:
interrupt_token() = default;
// Returns true if cancellation has been requested
bool operator()() { return _interrupted.load(); }
private:
template <typename T> friend class task_state;
// Called by the thread-pool internals to trigger cancellation
void set_interrupted() { _interrupted.store(true); }
std::atomic<bool> _interrupted{false};
};
How This Will Be Used
- Every submitted task will receive an associated
interrupt_token. - The
futurevia can callset_interrupted()to request cancellation. - The task itself must periodically check
token()and return early when it seestrue.
This is intentional: cancellation is cooperative, not destructive. A task never gets killed while holding locks or in the middle of a critical operation. It only stops in places the programmer chooses.
The task_state object
template <typename T> class task_state {
template <class Callable, class... Args> friend class template_task;
public:
void post_completion(T&& result) {
std::unique_lock<std::mutex> lock(_mutex);
_completed = true;
_result = std::move(result);
_cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] { return _completed; });
}
bool is_ready() { return _completed; }
void interrupt() {
_interrupt_token.set_interrupted();
wait();
}
T& get_result() { return _result; }
private:
std::mutex _mutex;
std::condition_variable _cv;
bool _completed = false;
interrupt_token _interrupt_token;
T _result;
};
The future<T> object
template <typename T> class future {
public:
future(std::shared_ptr<task_state<T>> state) : _state(state) {}
T& get() {
_state.get()->wait();
return _state.get()->get_result();
}
void interrupt() { _state.get()->interrupt(); }
bool is_ready() { return _state.get()->is_ready(); }
private:
std::shared_ptr<task_state<T>> _state;
};
Injecting the interrupt_token Into User Tasks
Now that we have a lightweight interrupt_token, the next step is to make sure every task submitted to the thread pool actually receives one. Tasks cannot magically know about cancellation - they must be explicitly given access to the token so they can check it and exit early when needed.
This is the job of template_task.
Wrapping User Callables
Whenever the user submits a function, lambda, or functor to the thread pool, we wrap it in a template_task. This wrapper does three things:
- Creates a
task_state<T>object, which stores the interrupt token and future state. - Binds the user callable so that the first argument becomes
interrupt_token&. - Stores the bound callable in a type-erased
std::function.
Here is the implementation:
template <class Callable, class... Args>
class template_task : public task_base {
public:
using T = std::invoke_result_t<Callable, interrupt_token&, Args...>;
template_task(Callable&& callable, Args&&... args)
: _state(new task_state<T>()) {
_function = std::bind(
std::forward<Callable>(callable),
std::ref(_state.get()->_interrupt_token), // injected here
std::forward<Args>(args)...);
}
void operator()() override {
_state.get()->post_completion(std::move(_function()));
}
future<T> get_future() { return future<T>(_state); }
~template_task() override = default;
task_kind kind() override { return task_kind::normal; }
private:
std::function<T()> _function;
std::shared_ptr<task_state<T>> _state;
};
The Important Part: Injection
The key line is:
std::ref(_state.get()->_interrupt_token)
This is where the users callable gets “rewired” so that the first argument is a reference to the tasks own interrupt_token. From the users perspective, the callable simply needs to take a signature like:
int my_task(interrupt_token& token) {
while (!token()) {
// do work...
}
return something;
}
They do not need to know anything about task_state, futures, or how the pool manages cancellation. The token just appears.
Cooperative Cancellation in Action
Now that tasks can receive an interrupt_token, we can implement cooperative cancellation: letting some tasks exit early once enough work has been completed.
Making Tasks Cancellation-Aware
The task itself is almost unchanged from before - the only difference is that it periodically checks the stop_token:
struct bot_doing_work {
int operator()(interrupt_token& stop_token) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> d(0, 1000);
int sleep_count = d(gen);
while (!stop_token() && --sleep_count > 0) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
// Return 1 if the task completed fully, 0 if it was interrupted
return sleep_count == 0;
}
};
Each task runs a randomized number of iterations, checking the token at each step. If cancellation is requested, the task exits early.
Coordinating Task Completion
The main program submits multiple tasks to the pool, waits for a subset to complete, and then signals the rest to stop:
int main() {
int max_task_count = 10;
int task_count = 5;
thread_pool pool(10);
std::atomic<int> count = 0;
bool exit = false;
std::deque<future<int>> futures;
std::vector<std::thread> threads;
// Submit tasks
for (int i = 0; i < max_task_count; i++) {
future<int> res = pool.submit(bot_doing_work());
futures.push_back(std::move(res));
// Monitor completion and trigger cancellation when enough tasks finish
std::thread t([&futures, i, &exit, &count, task_count]() {
bool should_self_exit = false;
while (!exit && !should_self_exit) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
if (futures[i].is_ready()) {
count.fetch_add(1);
should_self_exit = true;
}
if (count.load() == task_count) {
futures[i].interrupt(); // signal remaining tasks
exit = true;
}
}
});
threads.push_back(std::move(t));
}
std::cout << "Done submitting tasks" << std::endl;
for (std::thread& t : threads) {
t.join();
}
pool.shutdown();
int result = 0;
for (future<int>& fut : futures) {
result += fut.get();
}
std::cout << "Completed: " << result << std::endl;
std::cout << "Local Counts: ";
for (int c : pool._local_count) {
std::cout << c << ", ";
}
std::cout << std::endl;
}
How It Works
- Each task gets an
interrupt_tokeninjected by thetemplate_taskwrapper. - A small monitoring thread per task checks if the task has completed.
- Once
task_counttasks finish, the monitor threads callinterrupt()on the remaining futures, setting theinterrupt_tokenfor each task. - Tasks that see
stop_token()exit early, freeing up threads for other work or preventing unnecessary computation.
This pattern demonstrates a cooperative cancellation loop, which is safer than forcibly killing threads and allows long-running tasks to exit cleanly when no longer needed.
jay@jayport:~/code/concurrency $ clang++ -std=c++20 advanced_thread_management/interrupting_tasks.cpp -o i
jay@jayport:~/code/concurrency $ time ./i
Done submitting tasks
Completed: 5
Local Counts: 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
real 0m5.207s
user 0m6.491s
sys 0m0.100s
There! Only 5 tasks (successfully) completed!!!
And we finally have a -
A low-contention, work-stealing, interruptible Thread Pool!
#include <condition_variable>
#include <deque>
#include <exception>
#include <functional>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>
#include <type_traits>
#include <vector>
class interrupt_token {
public:
interrupt_token() = default;
bool operator()() { return _interrupted.load(); }
private:
template <typename T> friend class task_state;
void set_interrupted() { _interrupted.store(true); }
std::atomic<bool> _interrupted{false};
};
template <typename T> class task_state {
template <class Callable, class... Args> friend class template_task;
public:
void post_completion(T&& result) {
std::unique_lock<std::mutex> lock(_mutex);
_completed = true;
_result = std::move(result);
_cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] { return _completed; });
}
bool is_ready() { return _completed; }
void interrupt() {
_interrupt_token.set_interrupted();
wait();
}
T& get_result() { return _result; }
private:
std::mutex _mutex;
std::condition_variable _cv;
bool _completed = false;
interrupt_token _interrupt_token;
T _result;
};
template <typename T> class future {
public:
future(std::shared_ptr<task_state<T>> state) : _state(state) {}
T& get() {
_state.get()->wait();
return _state.get()->get_result();
}
void interrupt() { _state.get()->interrupt(); }
bool is_ready() { return _state.get()->is_ready(); }
private:
std::shared_ptr<task_state<T>> _state;
};
enum class task_kind { normal, exit, timeout };
class task_base {
public:
virtual ~task_base() = default;
virtual void operator()() = 0;
virtual task_kind kind() = 0;
};
class exit_task : public task_base {
void operator()() override {}
task_kind kind() override { return task_kind::exit; }
};
class timeout_task : public task_base {
void operator()() override {}
task_kind kind() override { return task_kind::timeout; }
};
template <class Callable, class... Args>
class template_task : public task_base {
public:
using T = std::invoke_result_t<Callable, interrupt_token&, Args...>;
template_task(Callable&& callable, Args&&... args)
: _state(new task_state<T>()) {
_function = std::bind(std::forward<Callable>(callable),
std::ref(_state.get()->_interrupt_token),
std::forward<Args>(args)...);
}
void operator()() override {
_state.get()->post_completion(std::move(_function()));
}
future<T> get_future() { return future<T>(_state); }
~template_task() override = default;
task_kind kind() override { return task_kind::normal; }
private:
std::function<T()> _function;
std::shared_ptr<task_state<T>> _state;
};
using packaged_task = std::unique_ptr<task_base>;
class task_queue {
packaged_task make_exit() { return std::make_unique<exit_task>(); }
packaged_task make_timeout() { return std::make_unique<timeout_task>(); }
public:
void submit(packaged_task&& task) {
std::unique_lock<std::mutex> lock(_mutex);
if (_shutdown_requested.load()) {
throw std::runtime_error("Can not submit a task after shutdown");
}
_task_list.push_back(std::move(task));
_cv.notify_one();
}
packaged_task get() {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait(lock, [this] {
return _task_list.size() > 0 || _shutdown_requested.load();
});
if (_shutdown_requested.load()) {
return make_exit();
}
packaged_task task = std::move(_task_list.front());
_task_list.pop_front();
return task;
}
template <class Rep, class Period>
packaged_task get(std::chrono::duration<Rep, Period> duration) {
std::unique_lock<std::mutex> lock(_mutex);
_cv.wait_for(lock, duration, [this] {
return _task_list.size() > 0 || _shutdown_requested.load();
});
if (_shutdown_requested.load()) {
return make_exit();
}
if (_task_list.size() == 0) {
return make_timeout();
}
packaged_task task = std::move(_task_list.front());
_task_list.pop_front();
return task;
}
int estimated_size() { return _task_list.size() || _shutdown_quick; }
void shutdown() {
std::unique_lock<std::mutex> lock(_mutex);
_shutdown_requested.store(true);
_shutdown_quick = true;
_cv.notify_all();
}
private:
std::mutex _mutex;
std::atomic<bool> _shutdown_requested = false;
bool _shutdown_quick = false;
std::condition_variable _cv;
std::deque<packaged_task> _task_list;
};
class thread_pool {
public:
thread_pool(int count)
: _count(count), _worker_queues(count), _local_count(count) {
for (int i = 0; i < count; i++) {
_threads.push_back(
std::thread(&thread_pool::_worker_loop, this, i));
_local_count[i] = 0;
}
}
template <class Callable, class... Args>
auto submit(Callable&& callable, Args&&... args)
-> future<std::invoke_result_t<Callable, interrupt_token&, Args...>> {
using T = std::invoke_result_t<Callable, interrupt_token&, Args...>;
auto fn_ptr = new template_task(std::forward<Callable>(callable),
std::forward<Args>(args)...);
future<T> fut = fn_ptr->get_future();
packaged_task function = std::unique_ptr<task_base>(fn_ptr);
int wrap_index = _count + 1;
int index = ++_approx_submission_index;
if (index % wrap_index == 0) {
_queue.submit(std::move(function));
} else {
index = (index - 1) % _count;
_worker_queues[index].submit(std::move(function));
}
return fut;
}
void shutdown() {
_queue.shutdown();
for (int i = 0; i < _count; i++) {
_worker_queues[i].shutdown();
}
}
~thread_pool() {
try {
for (std::thread& t : _threads) {
if (t.joinable()) {
t.join();
}
}
} catch (const std::exception& e) {
_queue.shutdown();
std::cerr << e.what() << std::endl;
throw e;
}
}
std::vector<int> _local_count;
private:
void _worker_loop(int my_index) {
auto& my_queue = _worker_queues[my_index];
std::chrono::microseconds timeout(100);
bool done = false;
size_t index_ = my_index;
auto index = [&index_, this]() -> size_t {
return index_++ % (_count + 1);
};
while (!done) {
start:
packaged_task task;
if (my_queue.estimated_size() == 0) {
size_t queue_index = index();
if (queue_index == _count) {
if (_queue.estimated_size() > 0) {
task = _queue.get(timeout);
} else {
goto start;
}
} else {
if (_worker_queues[queue_index].estimated_size() > 0) {
task = _worker_queues[queue_index].get(timeout);
} else {
goto start;
}
}
} else {
task = my_queue.get(timeout);
}
switch (task.get()->kind()) {
case task_kind::normal:
task.get()->operator()();
_local_count[my_index]++;
break;
case task_kind::exit:
done = true;
break;
case task_kind::timeout:
default:
// do nothing
break;
}
}
}
private:
int _count;
std::vector<std::thread> _threads;
task_queue _queue;
std::vector<task_queue> _worker_queues;
size_t _approx_submission_index = 0;
};