Synchronizing Concurrent Operations

Author: jay@thoughtmachina.com

Welcome to the 2nd blog in the Systems Playground blog series!

Using Mutexes in C++

Mutexes are a fundamental synchronization mechanism in C++ that protect shared data from concurrent access. While a std::mutex can be locked and unlocked manually using its lock() and unlock() member functions, doing so is error-prone-every exit path, including those created by exceptions, must correctly release the lock. To avoid this risk, C++ relies on the RAII (Resource Acquisition Is Initialization) pattern through std::lock_guard. A std::lock_guard acquires ownership of a given mutex in its constructor and automatically releases it in its destructor, guaranteeing that the mutex is always unlocked safely and consistently.

#include <mutex>
#include <iostream>
#include <thread>

int value = 0;
std::mutex m;

class Function {
public:
    void operator()() {
        std::lock_guard<std::mutex> lock(m);
        value++;
        std::cout << value << std::endl;
    }
};

int main() {
    std::thread t1{Function()};
    std::thread t2{Function()};
    t1.join();
    t2.join();
}

Waiting for a condition with condition variables

Condition variables allow threads to wait efficiently for a specific condition to become true without busy-waiting or repeatedly polling shared data. In C++, a condition variable works together with a mutex: a thread locks the mutex, checks a predicate, and then sleeps until another thread signals that the condition may have changed. Using std::unique_lock and the wait() function ensures the mutex is released while waiting and automatically reacquired when the thread wakes. This pattern makes condition variables ideal for producer-consumer scenarios, where one thread waits for work while another notifies when data becomes available.

Building a Minimal Producer-Consumer System with Condition Variables

The Signal Class - A Simple Wait/Notify Mechanism

class Signal {
    std::mutex m;
    std::condition_variable cv;
public:
    void wait(std::function<bool()> function) {
        std::unique_lock<std::mutex> lock(m);
        cv.wait(lock, function);
    }

    void notify() {
        std::unique_lock<std::mutex> lock(m);
        cv.notify_all();
    }
};

The Signal class is a tiny synchronization helper built around std::condition_variable. It exposes two operations: wait(), which blocks until a user-provided predicate becomes true, and notify(), which wakes the waiting thread. Internally, wait acquires a lock, then suspends the thread until the predicate evaluates to true. This mirrors the core mechanics of condition variables in larger systems: the waiting thread sleeps without consuming CPU cycles, while the notifying thread signals state changes. By keeping the interface minimal-only a predicate and a single notify_all-Signal forms a clean, reusable abstraction that makes producer-consumer coordination easy to express.


The Producer - Generating Work

class Producer {
    std::vector<int>& _holder;
public:
    Producer(std::vector<int>& holder) : _holder(holder) {}
    void operator()(Signal& sig) {
        _holder.push_back(2);
        sig.notify();
    }
};

The producers job is simple: modify shared state and notify anyone waiting for that state to change. It pushes an integer into the shared vector, then calls notify() on the Signal object. This pattern matches real producer logic in concurrent systems where tasks are generated and made available for consumers.


The Consumer - Waiting for Work

class Consumer {
    std::vector<int>& _holder;
public:
    Consumer(std::vector<int>& holder) : _holder(holder) {}
    void operator()(Signal& signal) {
        signal.wait([this]{return _holder.size() > 0;});
        std::cout << _holder.front() << std::endl;
    }
};

The consumer waits on the Signal object until the vector contains data. The lambda predicate is checked under a lock, and the thread sleeps efficiently until the producer notifies it. Once awakened, the consumer prints the first produced value. This matches the classic - wait for data, then process it - behavior found in real concurrent systems.


Wiring It All Together

int main() {
    std::vector<int> data;
    Producer p(data);
    Consumer c(data);
    Signal sig;
    std::thread t1(p, std::ref(sig));
    std::thread t2(c, std::ref(sig));
    t1.join();
    t2.join();
}

In main, both producer and consumer share the same vector and the same Signal object. Each runs in its own thread, allowing the producer to generate data independently of the consumers waiting logic. When the producer pushes data and notifies, the consumer wakes up and prints the result. Though small, this example illustrates the exact coordination pattern used in queues, thread pools, and message-passing systems: a blocking consumer synchronized with a producer through a condition variable.

Building a thread-safe queue with condition variables

The following threadsafe_queue template provides a minimal thread-safe FIFO queue using a mutex and a condition variable. The push() function locks the queue, moves the new value in, and notifies one waiting thread. The pop() and front() functions both acquire the lock and throw an exception if the queue is empty. The wait_and_get() function blocks until an element becomes available: it waits on the condition variable with a predicate, retrieves the front element, removes it, and returns it by value. The internal mutex ensures exclusive access to the underlying std::deque, and the condition variable allows consumers to sleep efficiently until data is pushed.

template<typename T>
class threadsafe_queue {
public:
    void push(T value) {
        std::unique_lock<std::mutex> lock(_mut);
        _queue.push_back(std::move(value));
        _cv.notify_one();
    }

    void pop() {
        std::unique_lock<std::mutex> lock(_mut);
        if (_queue.empty()) {
            throw std::runtime_error("Empty queue!");
        }
        _queue.pop_front();
    }

    T& front() {
        std::unique_lock<std::mutex> lock(_mut);
        if (_queue.empty()) {
            throw std::runtime_error("Empty queue!");
        }
        return _queue.front();
    }

    T wait_and_get() {
        std::unique_lock<std::mutex> lock(_mut);
        _cv.wait(lock, [this]{return !_queue.empty();});
        T value = _queue.front();
        _queue.pop_back();
        return std::move(value);
    }

private:
    std::mutex _mut;
    std::condition_variable _cv;
    std::deque<T> _queue;
};

Waiting for one-off events with futures

If a thread needs to wait for a specific one-off event, it obtains a future representing that event. The thread can then periodically wait on the future for short periods of time to see if the event has occurred while performing some other task between checks. Alternatively, it can do another task until it needs the event to have happened before it can proceed and then just wait for the future to become ready. A future may have data associated with it, or it may not. Once an event has happened (and the future has become ready), the future cant be reset.

1. Long-running calculations you dont need right away

Sometimes you have a computation that will take a while to finish, but you dont need its result immediately. Maybe its something complex-like a calculation that (humorously) claims to find the answer to Life, the Universe, and Everything.


2. Why not just use std::thread?

You could launch a new thread for this work.

But std::thread has a limitation: it doesnt give you a built-in way to return a value from that thread. You have to manage shared variables, synchronization, and lifetime manually.


3. This is where std::async helps

The std::async function template (from <future>) solves this cleanly.

Instead of giving you a std::thread, it returns a std::future that will eventually contain the result of the asynchronous computation.


4. Getting the result when you actually need it

When youre ready to use the result, you call:

future.get();

If the calculation is still running, get() will block until the value is ready.

Otherwise, it returns immediately with the computed result.

Writing our own async function and future object

future_state<T> - The Async Engine

template<typename T>
class future_state {
    friend class future<T>;
protected:
    future_state(std::function<T()> function) : _function(std::move(function)) {
        std::thread _thread(
            [this]{
                T temp = _function();
                {
                    std::lock_guard<std::mutex> lock(_mutex);
                    _result = std::move(temp);
                    _completed = true;
                }
                _cv.notify_all();
            }
        );
        _thread.detach();
    }

public:
    T get() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this]{ return _completed; });
        return std::move(_result);
    }

private:
    std::function<T()> _function;
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _completed = false;
    T _result;
};

The future_state<T> class is responsible for executing the user-provided function asynchronously and storing its result. When constructed, it takes a std::function<T()>, immediately launches a new detached thread, and executes the callable inside that thread. After computing the result, it locks a mutex, stores the value, marks the computation as complete, and notifies any waiting threads via a condition variable. The get() method blocks until the result has been produced, ensuring safe, synchronized access. This internal engine mirrors the hidden stateful mechanism behind std::future and std::promise, encapsulating both thread execution and synchronization.


future<T> - The Public Handle

template<typename T>
class future {
public:
    explicit future(std::function<T()>&& function)
        : _state(new future_state<T>(std::forward<std::function<T()>>(function))) {}

    T get() {
        return _state->get();
    }

private:
    std::unique_ptr<future_state<T>> _state;
};

The future<T> class is the lightweight public interface that users interact with. It simply owns a future_state<T> and exposes a single meaningful operation: get(). When a future is created, it forwards the packaged callable to future_state, which immediately starts running it in a background thread. The future itself does not perform any synchronization or computation; instead, it delegates all heavy lifting to its internal state. This minimalistic design offers a clean interface identical in spirit to std::future, allowing callers to retrieve the eventual result without needing to understand the underlying execution model.


async - Launching Work

template<class Callable, class ...Args>
auto async(Callable&& callable, Args&& ...args)
    -> future<std::invoke_result_t<Callable, Args...>>
{
    using T = std::invoke_result_t<Callable, Args...>;
    auto function = std::bind(std::forward<Callable>(callable),
                              std::forward<Args>(args)...);
    return future<T>(std::move(function));
}

The custom async function is a simplified re-creation of std::async, providing type deduction and a convenient interface for launching asynchronous tasks. It determines the return type using std::invoke_result_t, binds the callable and its arguments into a zero-argument std::function<T()>, and constructs a future<T> from this packaged function. Creating the future triggers the background thread inside future_state, so calling the function is all the user needs to start asynchronous work. This allows usage like auto f = async(fn, 1, 2);, where f automatically becomes a future<return_type_of_fn> without requiring any template parameters at the call site.

Building Our Own packaged_task and a (better) future

Below is a complete, minimal implementation of a future/packaged_task pairing, modeled after the standard librarys std::future, std::promise, and std::packaged_task. It shows how to synchronize asynchronous results manually, package work, and retrieve results safely.


task_state<T> - Shared State Between Task and Future

template<class return_type>
class task_state {

public:
    void post_completion(return_type&& result) {
        std::unique_lock<std::mutex> lock(_mutex);
        _completed = true;
        _result = std::move(result);
        _cv.notify_all();
    }

    void wait_to_completion() {
        std::unique_lock<std::mutex> lock(_mutex);
        _cv.wait(lock, [this]{return _completed;});
    }

    return_type& get() {
        return _result;
    }

private:
    std::mutex _mutex;
    std::condition_variable _cv;
    bool _completed = false;
    return_type _result;
};

The task_state<T> class is the shared communication channel between a packaged_task and the corresponding future. It stores the eventual result of the computation along with synchronization primitives that allow threads to wait safely until that result is ready. When a task finishes executing, it calls post_completion(), which stores the result, marks the task as completed, and wakes up any threads waiting on the condition variable. The future interacts with this state indirectly, using wait_to_completion() to block until the computation is done and get() to retrieve the stored value. This design mirrors the hidden shared state used in the standard library to coordinate asynchronous operations.


future<T> - A Handle for the Result

template<class return_type>
class future {
public:
    explicit future(std::shared_ptr<task_state<return_type>> state) : _state(state) {}

    return_type& get() {
        _state.get()->wait_to_completion();
        return _state.get()->get();
    }

private:
    std::shared_ptr<task_state<return_type>> _state;
};

The future<T> type acts as the public-facing handle to an asynchronous result. Rather than launching computation itself, the future simply holds a shared_ptr to the internally managed task_state<T>. Calling get() blocks until the result becomes available, ensuring safe and deterministic access to the tasks output. Since both future and packaged_task share the same state, they can coordinate without explicit locking or manual thread management from the user. This structure is conceptually identical to what std::future provides in the standard library.


packaged_task - Wrapping Work and Producing a Future

template<class Callable, class ...Args>
class packaged_task {
    using return_type = std::invoke_result_t<Callable, Args...>;
public:
    packaged_task(Callable&& callable_)
        : _callable(std::move(callable_)),
          _state(std::make_shared<task_state<return_type>>()) {}

    void operator()(Args... args_) {
        return_type result =
            std::invoke(_callable, std::forward<Args>(args_)...);
        _state->post_completion(std::move(result));
    }

    future<return_type> get_future() {
        return future<return_type>(_state);
    }

private:
    Callable _callable;
    std::shared_ptr<task_state<return_type>> _state;
};

A packaged_task bundles together two things: a callable object and a mechanism for storing its eventual result. It exposes a function-call operator that takes whatever arguments the task expects and invokes the callable using std::invoke. Once the work is completed, the task publishes the result to the shared task_state, making it available to any associated future. The get_future() method constructs and returns a future that shares access to this state. This mirrors the behavior of std::packaged_task, allowing work to be executed in one place while results are retrieved in another.


Example: Using Our Packaged Task and Future

class function {
public:
    int operator()(int x) {
        return x + 5;
    }
};

int main() {
    packaged_task<function,int> task{function()};
    future<int> future = task.get_future();
    task(4);
    std::cout << future.get() << std::endl;
}

In this example, a function object is wrapped inside a packaged_task, parameterized with a single int argument. Calling task(4) executes the callable and stores the result in the shared state. The future blocks until the computation completes and then returns the result, which is printed to the console. This demonstrates the core workflow: encapsulating a computation, running it, and retrieving the result in a synchronized and type-safe way.