cpp11 library concurrency

Save to:
Instapaper Pocket Readability

C++11 Standard Library Extensions — Concurrency

Threads

A thread is a representation of an execution/computation in a program. In C++11, as in much modern computing, a thread can – and usually does – share an address space with other threads. In this, it differs from a process, which generally does not directly share data with other processes. C++ has had a host of threads implementations for a variety of hardware and operating systems in the past, what’s new is a standard-library threads library.

Many thick books and tens of thousands of papers have been written about concurrency, parallelism, and threading, this FAQ entry barely scratches the surface. It is hard to think clearly about concurrency. If you want to do concurrent programming, at least read a book. Do not rely just on a manual, a standard, or a FAQ.

A thread is launched by constructing a std::thread with a function or a function object (incl. a lambda):

    #include <thread>

    void f();

    struct F {
        void operator()();
    };

    int main()
    {
        std::thread t1{f};     // f() executes in separate thread
        std::thread t2{F()};   // F()() executes in separate thread
    }

Unfortunately, this is unlikely to give any useful results – whatever f() and F() might do. The snag is that the program may terminate before or after t1 executes f() and before or after t2 executes F(). We need to wait for the two tasks to complete:

    int main()
    {
        std::thread t1{f};  // f() executes in separate thread
        std::thread t2{F()};    // F()() executes in separate thread

        t1.join();  // wait for t1
        t2.join();  // wait for t2
    }

The join()s ensure that we don’t terminate until the threads have completed. To “join” means to wait for the thread to terminate.

Typically, we’d like to pass some arguments to the task to be executed (here we are calling something executed by a thread a “task”). For example:

    void f(vector<double>&);

    struct F {
        vector<double>& v;
        F(vector<double>& vv) :v{vv} { }
        void operator()();
    };

    int main()
    {
        std::thread t1{std::bind(f,some_vec)};  // f(some_vec) executes in separate thread
        std::thread t2{F(some_vec)};        // F(some_vec)() executes in separate thread

        t1.join();
        t2.join();
    }

Basically, the standard library bind makes a function object of its arguments.

In general, we’d also like to get a result back from an executed task. With plain tasks, there is no notion of a return value; std::future is the correct default choice for that. Alternatively, we can pass an argument to a task telling it where to put its result: For example:

    void f(vector<double>&, double* res);   // place result in res

    struct F {
        vector<double>& v;
        double* res;
        F(vector<double>& vv, double* p) :v{vv}, res{p} { }
        void operator()();  // place result in res
    };

    int main()
    {
        double res1;
        double res2;

        std::thread t1{std::bind(f,some_vec,&res1)};    // f(some_vec,&res1) executes in separate thread
        std::thread t2{F(some_vec,&res2)};      // F(some_vec,&res2)() executes in separate thread

        t1.join();
        t2.join();

        std::cout << res1 << ' ' << res2 << '\n';
    }

But what about errors? What if a task throws an exception? If a task throws an exception and doesn’t catch it itself std::terminate() is called. That typically means that the program finishes. We usually try rather hard to avoid that. A std::future can transmit an exception to the parent/calling thread; that’s one reason to like futures. Otherwise, return some sort of error code.

When a thread goes out of scope the program is terminate()d unless its task has completed. That’s obviously to be avoided.

There is no way to request a thread to terminate (i.e. request that it exit as a soon as possible and as gracefully as possible) or to force a thread to terminate (i.e. kill it). We are left with the options of

  • designing our own cooperative “interruption mechanism” (with a piece of shared data that a caller thread can set for a called thread to check, and quickly and gracefully exit when it is set),
  • “going native” by using thread::native_handle() to gain access to the operating system’s notion of a thread,
  • kill the process (std::quick_exit()),
  • kill the program (std::terminate()).

This was all the committee could agree upon. In particular, representatives from POSIX were vehemently against any form of “thread cancellation” however much C++’s model of resources rely on destructors. There is no perfect solution for every system and every possible application.

The basic problem with threads is data races; that is, two threads running in a single address space can independently access an object in ways that cause undefined results. If one (or both) writes to the object and the other (or both) reads the object they have a “race” for who gets its operation(s) done first. The results are not just undefined; they are usually completely unpredictable. Consequently, C++11 provides some rules/guarantees for the programmer to avoid data races:

  • A C++ standard library function shall not directly or indirectly access objects accessible by threads other than the current thread unless the objects are accessed directly or indirectly via the function’s arguments, including this.
  • A C++ standard library function shall not directly or indirectly modify objects accessible by threads other than the current thread unless the objects are accessed directly or indirectly via the function’s nonconst arguments, including this.
  • C++ standard library implementations are required to avoid data races when different elements in the same sequence are modified concurrently.

Concurrent access to a stream object, stream buffer object, or C Library stream by multiple threads may result in a data race unless otherwise specified. So don’t share an output stream between two threads unless you somehow control the access to it.

You can:

See also:

Mutual exclusion

A mutex is a primitive object used for controlling access in a multi-threaded system. The most basic use is:

    std::mutex m;
    int sh; // shared data
    // ...
    m.lock();
    // manipulate shared data:
    sh+=1;
    m.unlock();

Only one thread at a time can be in the region of code between the lock() and the unlock() (often called a critical region). If a second thread tries m.lock() while a first thread is executing in that region, that second thread is blocked until the first executes the m.unlock(). This is simple. What is not simple is to use mutexes in a way that doesn’t cause serious problems: What if a thread “forgets” to unlock()? What if a thread tries to lock() the same mutex twice? What if a thread waits a very long time before doing an unlock()? What if a thread needs to lock() two mutexes to do its job? The complete answers fill books. Here (and in the Locks section) are just the raw basics.

In addition to lock(), a mutex has a try_lock() operation which can be used to try to get into the critical region without the risk of getting blocked:

    std::mutex m;
    int sh; // shared data
    // ...
    if (m.try_lock()) {
        // manipulate shared data:
        sh+=1;
        m.unlock();
    else {
        // maybe do something else
    }

A recursive_mutex is a mutex that can be acquired more than once by a thread:

    std::recursive_mutex m;
    int sh; // shared data
    // ...
    void f(int i)
    {
        // ...
        m.lock();
        // manipulate shared data:
        sh+=1;
        if (--i>0) f(i);
        m.unlock();
        // ...
    }

Here, we have been blatant and let f() call itself. Typically, the code is more subtle. The recursive call will be indirect along the line of f() calls g() that calls h() that calls f().

What if you need to acquire a mutex within the next ten seconds? The timed_mutex class is offered for that. Its operations are specialized versions of try_lock() with an associated time limit:

    std::timed_mutex m;
    int sh; // shared data
    // ...
    if (m.try_lock_for(std::chrono::seconds(10))) {
        // manipulate shared data:
        sh+=1;
        m.unlock();
    }
    else {
        // we didn't get the mutex; do something else
    }

The try_lock_for() takes a relative time, a duration as its argument. If instead you want to wait until a fixed point in time, a time_point you can use try_lock_until():

    std::timed_mutex m;
    int sh; // shared data
    // ...
    if (m.try_lock_until(midnight)) {
        // manipulate shared data:
        sh+=1;
        m.unlock();
    }
    else {
        // we didn't get the mutex; do something else
    }

The midnight is a feeble joke: For a mechanism as low level as mutexes, the timescale is more likely to be milliseconds than hours.

There is of course also a recursive_timed_mutex.

A mutex is considered a resource (as it is typically used to represent a real resource) and must be visible to at least two threads to be useful. Consequently, it cannot be copied or moved (you couldn’t just make another copy of a hardware input register).

It can be surprisingly difficult to get the lock()s and unlock()s to match. Think of complicated control structures, errors, and exceptions. If you have a choice, use locks to manage your mutexes; that will save you and your users a lot of sleep.

See also:

Locks

A lock is an object that can hold a reference to a mutex and may unlock() the mutex during the lock’s destruction (such as when leaving block scope). A thread may use a lock to aid in managing mutex ownership in an exception safe manner. In other words, a lock implements Resource Acquisition Is Initialization for mutual exclusion. For example:

    std::mutex m;
    int sh; // shared data
    // ...
    void f()
    {
        // ...
        std::unique_lock lck(m);
        // manipulate shared data: lock will be released even if this code throws an exception
        sh+=1;
    }

A lock can be moved (the purpose of a lock is to represent local ownership of a non-local resource), but not copied (which copy would own the resource/mutex?).

This straightforward picture of a lock is clouded by unique_lock having facilities to do just about everything a mutex can, but safer. For example, we can use a unique_lock to do try_lock:

    std::mutex m;
    int sh; // shared data
    // ...
    void f()
    {
        // ...
        std::unique_lock lck(m,std::defer_lock);    // make a lock, but don't acquire the mutex
        // ...
        if (lck.try_lock()) {
            // manipulate shared data:
            sh+=1;
        }
        else {
            // maybe do something else
        }
    }

Similarly, unique_lock supports try_lock_for() and try_lock_until(). What you get from using a lock rather than the mutex directly is exception handling and protection against forgetting to unlock(). In concurrent programming, we need all the help we can get.

What if we need two resources represented by two mutexes? The naive way is to acquire the mutexes in order:

    std::mutex m1;
    std::mutex m2;
    int sh1;    // shared data
    int sh2
    // ...
    void f()
    {
        // ...
        std::unique_lock lck1(m1);
        std::unique_lock lck2(m2);
        // manipulate shared data:
        sh1+=sh2;
    }

This has the potentially deadly flaw that some other thread could try to acquire m1 and m2 in the opposite order so that each had one of the locks needed to proceed and would wait forever for the second (that’s one classic form of deadlock). With many locks in a system, that’s a real danger. Consequently, the standard locks provide two functions for (safely) trying to acquire two or more locks:

    void f()
    {
        // ...
        std::unique_lock lck1(m1,std::defer_lock);  // make locks but don't yet try to acquire the mutexes
        std::unique_lock lck2(m2,std::defer_lock);
        std::unique_lock lck3(m3,std::defer_lock);
        lock(lck1,lck2,lck3);
        // manipulate shared data
    }

Obviously, the implementation of lock() has to be carefully crafted to avoid deadlock. In essence, it will do the equivalent to careful use of try_lock()s. If lock() fails to acquire all locks it will throw an exception. Actually, lock() can take any argument with lock(), try_lock(), and unlock() member functions (e.g. a mutex), so we can’t be specific about which exception lock() might throw; that depends on its arguments.

If you prefer to use try_lock()s yourself, there is an equivalent to lock() to help:

    void f()
    {
        // ...
        std::unique_lock lck1(m1,std::defer_lock);  // make locks but don't yet try to acquire the mutexes
        std::unique_lock lck2(m2,std::defer_lock);
        std::unique_lock lck3(m3,std::defer_lock);
        int x;
        if ((x = try_lock(lck1,lck2,lck3))==-1) {   // welcome to C land
            // manipulate shared data
        }
        else {
            // x holds the index of a mutex we could not acquire
            // e.g. if lck2.try_lock() failed x==1
        }
    }

See also:

  • Standard: 30.4.3 Locks [thread.lock]

Condition variables

Condition variables provide synchronization primitives used to block a thread until notified by some other thread that some condition is met or until a system time is reached.

See also:

  • Standard: 30.5 Condition variables [thread.condition]

Atomics

To be written.

In the meantime, see:

Futures and promises

Concurrent programming can be hard, especially if you try to be clever with threads, and locks. It is harder still if you must use condition variables or use atomics (for lock-free programming). C++11 offers future and promise for returning a value from a task spawned on a separate thread, and packaged_task to help launch tasks. The important point about future and promise is that they enable a transfer of a value between two tasks without explicit use of a lock; “the system” implements the transfer efficiently. The basic idea is simple: When a task wants to return a value to the thread that launched it, it puts the value into a promise. Somehow, the implementation makes that value appear in the future attached to the promise. The caller (typically the launcher of the task) can then read the value. For added simplicity, see async().

The standard provides three kinds of futures, future for most simple uses, and shared_future and atomic_future for some trickier cases. Here, we’ll just present future because it’s the simplest and does all we need done. If we have a future<X> called f, we can get() a value of type X from it:

    X v = f.get();  // if necessary wait for the value to get computed

If the value isn’t there yet, our thread is blocked until it arrives. If the value couldn’t be computed and the task threw an exception, calling get() will rethrow that exception to the code calling get().

We might not want to wait for a result, so we can ask the future if a result has arrived:

    if (f.wait_for(0)) {    // there is a value to get()
        // do something
    }
    else {
        // do something else
    }

However, the main purpose of future is to provide that simple get().

The main purpose of promise is to provide a simple set() to match future’s get(). The names future and promise are historical; they are also a fertile source of puns.

If you have a promise and need to send a result of type X (back) to a future, there are basically two things you can do: pass a value and pass an exception:

    try {
        X res;
        // compute a value for res
        p.set_value(res);
    }
    catch (...) {   // oops: couldn't compute res
        p.set_exception(std::current_exception());
    }

So far so good, but how do I get a matching future/promise pair, one in my thread and one in some other thread? Well, since futures and promises can be moved (not copied) around there is a wide variety of possibilities. The most obvious idea is for whoever wants a task done to create a thread and give the promise to it while keeping the corresponding future as the place for the result. Using async() is the most extreme/elegant variant of the latter technique.

The packaged_task type is provided to simplify launching a thread to execute a task. In particular, it takes care of setting up a future connected to a promise and to provides the wrapper code to put the return value or exception from the task into the promise. For example:

    double comp(vector<double>& v)
    {
        // package the tasks:
        // (the task here is the standard accumulate() for an array of doubles):
        packaged_task<double(double*,double*,double)> pt0{std::accumulate<double*,double*,double>};
        packaged_task<double(double*,double*,double)> pt1{std::accumulate<double*,double*,double>};

        auto f0 = pt0.get_future(); // get hold of the futures
        auto f1 = pt1.get_future();

        pt0(&v[0],&v[v.size()/2],0);    // start the threads
        pt1(&v[v.size()/2],&v[size()],0);

        return f0.get()+f1.get();   // get the results
    }

See also:

async

Here is an example of a way for the programmer to rise above the messy threads-plus-lock level of concurrent programming:

    template<class T, class V> struct Accum  {  // simple accumulator function object
        T* b;
        T* e;
        V val;
        Accum(T* bb, T* ee, const V& v) : b{bb}, e{ee}, val{vv} {}
        V operator() () { return std::accumulate(b,e,val); }
    };

    double comp(vector<double>& v)
        // spawn many tasks if v is large enough
    {
        if (v.size()<10000) return std::accumulate(v.begin(),v.end(),0.0);

        auto f0 {async(Accum{&v[0],&v[v.size()/4],0.0})};
        auto f1 {async(Accum{&v[v.size()/4],&v[v.size()/2],0.0})};
        auto f2 {async(Accum{&v[v.size()/2],&v[v.size()*3/4],0.0})};
        auto f3 {async(Accum{&v[v.size()*3/4],&v[v.size()],0.0})};

        return f0.get()+f1.get()+f2.get()+f3.get();
    }

This is a very simple-minded use of concurrency (note the “magic number”), but note the absence of explicit threads, locks, buffers, etc. The type of the f-variables are determined by the return type of the standard-library function async() which is a future. If necessary, get() on a future waits for a thread to finish. Here, it is async()’s job to spawn threads as needed and the future’s job to join() the threads appropriately. “Simple” is the most important aspect of the async()/future design; futures can also be used with threads in general, but don’t even think of using async() to launch tasks that do I/O, manipulate mutexes, or in other ways interact with other tasks. The idea behind async() is the same as the idea behind the range-for statement: Provide a simple way to handle the simplest, rather common, case and leave the more complex examples to the fully general mechanism.

An async() can be requested to launch in a new thread, in any thread but the caller’s, or to launch in a different thread only if async() “thinks” that it is a good idea. The latter is the simplest from the user’s perspective and potentially the most efficient (for simple tasks only).

The committee is actively working on providing a much more general form of executors (execution agents) that will subsume std::async. In the meantime, std::async is the simple facility included in the standard.

See also:

Abandoning a process

To be written.

In the meantime, see: