C++ Latches and Barriers

ISO/IEC JTC1 SC22 WG21 N3600 - 2013-03-16

Alasdair Mackintosh, [email protected], [email protected]

Introduction
Solution
    Latch Operations
    Barrier Operations
    Sample Usage
Synopsis

Introduction

Certain idioms that are commonly used in concurrent programming are missing from the standard libraries. Although many of these these can be relatively straightforward to implement, we believe it is more efficient to have a standard version.

In addition, although some idioms can be provided using mutexes, higher performance can often be obtained with atomic operations and lock-free algorithms. However, these algorithms are more complex to write, and are prone to error.

Other standard concurrency idioms may have difficult corner cases, and can be hard to implement correctly. For these reasons, we believe that it is valuable to provide these in the standard library.

Solution

We propose a set of commonly-used concurrency classes, some of which may be implemented using efficient lock-free algorithms where appropriate. This paper describes the latch and barrier classes.

Latches are a thread co-ordination mechanism that allow one or more threads to block until an operation is completed. An individual latch is a single-use object; once the operation has been completed, it cannot be re-used.

Barriers are a thread co-ordination mechanism that allow multiple threads to block until an operation is completed. Unlike a latch, a barrier is re-usable; once the operation has been completed, the threads can re-use the same barrier. It is thus useful for managing repeated tasks handled by multiple threads.

A reference implementation of these two classes has been written.

Latch Operations

A latch maintains an internal counter that is initialized when the latch is created. One or more threads may block waiting until the counter is decremented to 0.
constructor( size_t );

The parameter is the initial value of the internal counter.

destructor( );

Destroys the latch. If the latch is destroyed while other threads are in wait(), or are invoking count_down(), the behaviour is undefined.

void count_down( );

Decrements the internal count by 1, and returns. If the count reaches 0, any threads blocked in wait() will be released.

Throws std::logic_error if the internal count is already 0.

void wait( );

Blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down(). If the count is already 0, this is a no-op.

bool try_wait( );

Returns true if the internal count has been decremented to 0 by one or more other threads calling count_down(), and false otherwise. Does not block the calling thread.

void count_down_and_wait( );

Decrements the internal count by 1. If the resulting count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down().

There are no copy or assignment operations.

Memory Ordering

All calls to count_down() synchronize with any thread returning from wait(). All calls to count_down() synchronize with any thread that gets a true value from try_wait().

Barrier Operations

A barrier maintains an internal thread counter that is initialized when the barrier is created. Threads may decrement the counter and then block waiting until the specified number of threads are blocked. All threads will then be woken up, and the barrier will reset. In addition, there is a mechanism to change the thread count value after the count reaches 0.

constructor( size_t );

The parameter is the initial value of the internal thread counter.

Throws std::invalid_argument if the specified count is 0.

constructor( size_t, std::function<void()> );

The parameters are the initial value of the internal thread counter, and a function that will be invoked when the counter reaches 0.

Throws std::invalid_argument if the specified count is 0.

destructor( );

Destroys the barrier. If the barrier is destroyed while other threads are in count_down_and_wait(), the behaviour is undefined.

void count_down_and_wait( );

Decrements the internal thread count by 1. If the resulting count is not 0, blocks the calling thread until the internal count is decremented to 0 by one or more other threads calling count_down_and_wait().

Before any threads are released, the completion function registered in the constructor will be invoked (if specified and non-NULL). Note that the completion function may be invoked in the context of one of the blocked threads. When the completion function returns, the internal thread count will be reset to its initial value, and all blocked threads will be unblocked.

Note that it is safe for a thread to re-enter count_down_and_wait() immediately. It is not necessary to ensure that all blocked threads have exited count_down_and_wait() before one thread re-enters it.

reset( size_t );

Resets the barrier with a new value for the initial thread count. This method may only be invoked when there are no other threads currently inside the count_down_and_wait() method. It may also be invoked from within the registered completion function.

Once reset() is called, the barrier will automatically reset itself to the new thread count as soon as the internal count reaches 0 and all blocked threads are released.

reset( std::function<void()> );

Resets the barrier with the new completion function. The next time the internal thread count reaches 0, this function will be invoked. This method may only be invoked when there are no other threads currently inside the count_down_and_wait() method. It may also be invoked from within the registered completion function.

If NULL is passed in, no function will be invoked when the count reaches 0.

There are no copy or assignment operations.

Note that the barrier does not have separate count_down() and wait() methods. Every thread that counts down will then block until all threads have counted down. Hence only the count_down_and_wait() method is supported.

Memory Ordering

For threads X and Y that call count_down_and_wait(), the call to count_down_and_wait() in X synchronizes with the return from count_down_and_wait() in Y.

Sample Usage

Sample use cases for the latch include: An example of the first use case would be as follows:

  void DoWork(threadpool* pool) {
    latch completion_latch(NTASKS);
    for (int i = 0; i < NTASKS; ++i) {
      pool->add_task([&] {
        // perform work
        ...
        completion_latch.count_down();
      }));
    }
    // Block until work is done
    completion_latch.wait();
  }

An example of the second use case is shown below. We need to load data and then process it using a number of threads. Loading the data is I/O bound, whereas starting threads and creating data structures is CPU bound. By running these in parallel, throughput can be increased.

  void DoWork() {
    latch start_latch(1);
    vector<thread*> workers;
    for (int i = 0; i < NTHREADS; ++i) {
      workers.push_back(new thread([&] {
        // Initialize data structures. This is CPU bound.
        ...
        start_latch.wait();
        // perform work
        ...
      }));
    }
    // Load input data. This is I/O bound.
    ...
    // Threads can now start processing
    start_latch.count_down();
    }

The barrier can be used to co-ordinate a set of threads carrying out a repeated task. The number of threads can be adjusted dynamically to respond to changing requirements.

In the example below, a number of threads are performing a multi-stage task. Some tasks may require fewer steps thanothers, meaning that some threads may finish before others. We reduce the number of threads waiting on the barrier when this happens.



  void DoWork() {
    Tasks& tasks;
    size_t initial_threads;
    atomic<size_t> current_threads(initial_threads)
    vector<thread*> workers;

    // Create a barrier, and set a lambda that will be invoked every time the
    // barrier counts down. If one or more active threads have completed,
    // reduce the number of threads.
    barrier task_barrier(n_threads);
    task_barrier.reset([&] {
      task_barrier.reset(current_threads);
    });

    for (int i = 0; i < n_threads; ++i) {
      workers.push_back(new thread([&] {
        bool active = true;
        while(active) {
          Task task = tasks.get();
          // perform task
          ...
          if (finished(task)) {
            current_threads--;
            active = false;
          }
          task_barrier.count_down_and_wait();
         }
       });
    }

    // Read each stage of the task until all stages are complete.
    while (!finished()) {
      GetNextStage(tasks);
    }
  }

Synopsis

The synopsis is as follows.


class latch {
 public:
  explicit latch(size_t count);
  ~latch();

  void count_down();

  void wait();

  bool try_wait();

  void count_down_and_wait();
};

class barrier {
 public:
  explicit barrier(size_t num_threads) throw (std::invalid_argument);
  explicit barrier(size_t num_threads, std::function<void()> f) throw (std::invalid_argument);
  ~barrier();

  void count_down_and_wait();

  void reset(size_t num_threads);
  void reset(function f);
};