C++ Latches and Barriers

ISO/IEC JTC1 SC22 WG21 N3998 - 2014-05-21

Alasdair Mackintosh, [email protected], [email protected]

Olivier Giroux, [email protected], [email protected]

C++ Latches and Barriers

Revision History

Introduction

Solution

Concepts

ArriveAndWaitable

Latch

Barrier

Classes

Header std::latch Synopsis

Memory Ordering

Class std::barrier

Memory Ordering

Class std::notifying_barrier

Memory Ordering

Notes

Use of a scoped guard to manage latches and barriers

Sample Usage

Alternative Solutions

Synopsis

Revision History

N3666

2013-04-18

Initial Version

N3817

2013-10-11

Clarify destructor behaviour. Add comment on templatised completion functions.

N3885

2013-01-21

Add Alternative Solutions section. (Not formally published)

N3998

2014-05-21

Add Concepts, simplify latch and barrier, add notifiying_barrier

Introduction

Certain idioms that are commonly used in concurrent programming are missing from the standard libraries. Although many of these these can be relatively straightforward to implement, we believe it is more efficient to have a standard version.

In addition, although some idioms can be provided using mutexes, higher performance can often be obtained with atomic operations and lock-free algorithms. However, these algorithms are more complex to write, and are prone to error.

Other standard concurrency idioms may have difficult corner cases, and can be hard to implement correctly. For these reasons, we believe that it is valuable to provide these in the standard library.

Solution

We propose a set of commonly-used concurrency classes, some of which may be implemented using efficient lock-free algorithms where appropriate. This paper describes various concepts related to thread co-ordination, and defines the latch, barrier and notifying_barrier classes.

Latches are a thread coordination mechanism that allow one or more threads to block until an operation is completed. An individual latch is a single-use object; once the operation has been completed, it cannot be reused.

Barriers are a thread coordination mechanism that allow multiple threads to block until an operation is completed. Unlike a latch, a barrier is re-usable; once the operation has been completed, the threads can re-use the same barrier. It is thus useful for managing repeated tasks, or phases of a larger task, that are handled by multiple threads.

Notifying Barriers allow additional behaviour to be defined when an operation has completed.

A reference implementation of these classes has been written.

Concepts

In the section below, a synchronization point represents a point at which a thread may block until a given synchronization condition has been reached or at which it may notify other threads that a synchronization condition has been achieved.

We define the following concepts:

ArriveAndWaitable

Provides:

arrive_and_wait() - Allows a single thread to indicate that it has arrived at a synchronization point. The thread will block until the synchronization condition has been reached. May only be called once by a given thread.

Latch

Provides ArriveAndWaitable, plus:

wait() - The calling thread will block until the synchronization condition has been reached.

arrive() - Allows a single thread to indicate that is has arrived at the synchronization point. Does not block. May only be called once by a given thread.

count_down(N) - decrements by N the internal counter that determines when the synchronization condition has been reached. May be called more than once by a given thread.

Barrier        

Provides ArriveAndWaitable, plus:

arrive_and_wait() - Allows a single thread to indicate that it has arrived at a synchronization point. The thread will block until the synchronization condition has been reached. May be called repeatedly by a given thread.

arrive_and_drop()- Allows a single thread to indicate that it has arrived at a synchronization point. The thread will not block. Once a thread returns from this function, it shall not invoke other methods on the barrier (except the destructor, if otherwise valid).

Classes

Header std::latch Synopsis

Provides the Latch concept.

A latch maintains an internal counter that is initialized when the latch is created. The synchronization condition is reached when the counter is decremented to 0. Threads may block at a synchronization point waiting for the condition to be reached. When the condition is reached, any such blocked threads will be released.

latch( int C );

Requires: C shall be >= zero.

Effects: initializes the latch with a count of C.  [Note: If C is zero, the synchronization condition has been reached. - End note]

Synchronization: None

~latch( );

Requires: No threads are blocked at the synchronization point. Note that the latch may be destroyed if threads have not yet returned from wait() or arrive_and_wait() provided that the condition has been reached.

void arrive( );

Effects: Decrements the internal count by 1. If the count reaches 0 the synchronization condition is reached. If called more than once by a given thread the behaviour is undefined.

Throws: std::logic_error if the internal count would be decremented below 0.

Synchronization: Synchronizes with calls unblocked as a result of this call and try_wait calls on the same latch that return true as a result.

void arrive_and_wait( );

Effects: Decrements the internal count by 1. If the count reaches 0 the synchronization condition is reached. Otherwise blocks at the synchronization point until the synchronization condition is reached. If called more than once by a given thread the behaviour is undefined.

Throws: std::logic_error if the internal count would be decremented below 0.

Synchronization: Synchronizes with calls unblocked as a result of this call and try_wait calls on the same latch that return true as a result.

void count_down( int N );

Effects: Decrements the internal count by N. If the count reaches 0, the synchronization condition is reached. May be called by any thread. Does not block.

Throws: std::logic_error if the internal count would be decremented below 0.

Synchronization: Synchronizes with calls unblocked as a result of this call and try_wait calls on the same latch that return true as a result.

void wait( );

Effects: Blocks the calling thread at the synchronization point until the synchronization condition is reached. If the condition has already been reached,  the thread does not block.

bool try_wait( );

Returns: Returns true if the synchronization condition has been is reached, and false otherwise. Does not block.

latch(const latch&) = delete;
latch& operator=(const latch&) = delete;

Memory Ordering

All calls to count_down(), arrive(), and arrive_and_wait()synchronize with any calls to wait() or arrive_and_wait() that complete as a result.  All calls to count_down(), arrive(), or arrive_and_wait() synchronize with any call to try_wait() that returns true as a result.

Header std::barrier synopsis

Provides the Barrier concept.

A barrier is created with an initial value representing the number of threads that can arrive at the synchronization point. When that many threads have arrived, the  synchronization condition is reached and the threads are released. The barrier will then reset, and may be reused for a new cycle, in which the same set of threads may arrive again at the synchronization point. The same set of threads shall arrive at the barrier in each cycle, otherwise the behaviour is undefined.

barrier( int C );

Requires: C shall be >= zero. [Note: If C is zero, the synchronization condition is considered to have already been reached. In the case, the barrier may only be destroyed. End Note]

Effects: initializes the barrier with the number of participating threads C.

~barrier( );

Requires: No threads are blocked at the synchronization point.

Effects: destroys the barrier

void arrive_and_wait( );

Effects: Blocks at the synchronization point until the synchronization condition is reached. When all threads (as determined by the initial thread count parameter to the constructor) have arrived, the synchronization condition is reached and all threads are released. The barrier may then be re-used for another cycle.

[Note: it is safe for a thread to re-enter arrive_and_wait() immediately. It is not necessary to ensure that all blocked threads have exited arrive_and_wait() before one thread re-enters it. -- End note]

Synchronization: Synchronizes with calls unblocked as a result of this call.

void arrive_and_drop( );

Effects: Signals that this thread has arrived at the synchronization point. May block until the synchronization condition is reached. When the barrier resets, the current thread is removed from the set of participating threads.

If the barrier was created with an initial count of N, and all N threads call arrive_and_drop(), any further operations on the barrier are undefined, apart from calling the destructor.

Calling arrive_and_drop()modifies the requirement that the same set of threads must arrive in each cycle. The thread that drops it is no longer considered as part of this set. If a thread that has called arrive_and_drop() calls another method on the same barrier, other than the destructor, the results are undefined.

Synchronization: Synchronizes with calls unblocked as a result of this call.

barrier(const barrier&) = delete;
barrier& operator=(const barrier&) = delete;

Memory Ordering

All calls to arrive_and_wait() or arrive_and_drop()synchronize with any calls to arrive_and_wait() that complete as a result.


Header std::
notifying_barrier synppsis

Provides the Barrier concept.

A notifying barrier behaves as a barrier, but is constructed with a callable completion function that is invoked after all threads have arrived at the
synchronization point, and before the synchronization condition is reached. The completion may modify the set of threads that arrives at the barrier in each cycle.

template <typename T>

notifying_barrier( int C, T F );

Requires: C shall be >= zero. F shall conform to the callable int() concept.  [Note: If C is zero, the synchronization condition is considered to have already been reached. In the case, the barrier may only be destroyed. End Note]
Effects: initializes the barrier with a thread count of C,  and a callable object that will be invoked after the synchronization condition is reached. 

~notifying_barrier( );

Requires: No threads are blocked at the synchronization point.

Effects: destroys the barrier

void arrive_and_wait( );

Effects: Blocks at the synchronization point until the synchronization condition is reached. When all threads (as determined by the initial thread count parameter to the constructor) have arrived, the synchronization condition is reached. Before any threads are released, the callable completion object registered in the constructor will be invoked. (The completion may be invoked in the context of one of the threads that invoked arrive_and_wait() or arrive_and_drop().) When the completion returns, the internal state will be reset, and all blocked threads will be unblocked. The barrier may then be used for a new cycle.

If the completion returns 0 then the set of participating threads is unchanged. Otherwise the count of expected threads is set to the completion's return value. (If the count is altered, this relaxes the restriction that the same set of threads must arrive at the barrier in each new cycle.)

Note that it is safe for a thread to re-enter arrive_and_wait() immediately. It is not necessary to ensure that all blocked threads have exited arrive_and_wait() before one thread re-enters it.

Synchronization: Synchronizes with invocations of the completion function. Invocations of the completion function then synchronize with calls unblocked as a result of this call.

void arrive_and_drop( );

Effects: Signals that this thread has arrived at the synchronization point. When all threads (as determined by the initial thread count parameter to the constructor) have arrived, the synchronization condition is reached. May block until the synchronization condition is reached. Before any threads are released, the callable completion object registered in the constructor will be invoked. (The completion may be invoked in the context of one of the threads that invoked arrive_and_wait() or arrive_and_drop().) When the completion returns, the internal state will be reset, and all blocked threads will be unblocked. The barrier may then be used for a new cycle.

If the completion returns 0 then those threads that called arrive_and_drop() are removed from the set of expected threads. Otherwise the count of expected threads is set to the completion's return value. (If the count is altered, this relaxes the restriction that the same set of threads must arrive at the barrier in each new cycle.)

Synchronization: Synchronizes with calls unblocked as a result of this call.

notifying_barrier(const notifying_barrier&) = delete;
notifying_barrier& operator=(const notifying_barrier&) = delete;

Memory Ordering

All calls to the completion function synchronize with those calls to arrive_and_wait() or arrive_and_drop() that triggered the completion. The completion function synchronizes with all calls unblocked after it has run.

Notes

(The following notes have not changed significantly since the last revision of this paper, and may be skipped by readers familiar with that.)

Use of a scoped guard to manage latches and barriers

A future paper will propose a scoped_guard; a helper class that invokes a function when it goes out of scope. This is analagous to a std::unique_ptr, which deletes an object when it goes out of scope. The latch and barrier classes could provide scoped guards that would ensure that a latch was always decremented when a worker had finished, regardless of how the worker terminated. For example:

  void DoWork(latch& completion_latch) {

    // Automatically invokes completion_latch.count_down() when this

    // function terminates

    scoped_guard g = completion_latch.count_down_guard();

    switch (state) {

      case 0:

        return;

      case 1:

        alogorithm1();

        return;

      case 2:

        alogorithm2();

        return;

      default:

        throw std::logic_error

    }

  }

We suggest that a suitable xxx_guard() method be provided for all of the latch and barrier methods, as an aid to safe useage. This would avoid having threads waiting for termination conditions that are never triggered.

Sample Usage

Sample use cases for the latch include:

An example of the first use case would be as follows:

  void DoWork(threadpool* pool) {

    latch completion_latch(NTASKS);

    for (int i = 0; i < NTASKS; ++i) {

      pool->add_task([&] {

        // perform work

        ...

        completion_latch.count_down();

      }));

    }

    // Block until work is done

    completion_latch.wait();

  }

An example of the second use case is shown below. We need to load data and then process it using a number of threads. Loading the data is I/O bound, whereas starting threads and creating data structures is CPU bound. By running these in parallel, throughput can be increased.

  void DoWork() {

    latch start_latch(1);

    vector<thread*> workers;

    for (int i = 0; i < NTHREADS; ++i) {

      workers.push_back(new thread([&] {

        // Initialize data structures. This is CPU bound.

        ...

        start_latch.wait();

        // perform work

        ...

      }));

    }

    // Load input data. This is I/O bound.

    ...

    // Threads can now start processing

    start_latch.count_down();

    // Wait for threads to finish, delete allocated objects.

    ...

  }

The barrier can be used to co-ordinate a set of threads carrying out a repeated task.

  void DoWork() {

    Tasks& tasks;

    int n_threads;

    vector<thread*> workers;

    barrier task_barrier(n_threads);

    for (int i = 0; i < n_threads; ++i) {

      workers.push_back(new thread([&] {

        bool active = true;

        while(active) {

          Task task = tasks.get();

          // perform task

          ...

          task_barrier.arrive_and_wait();

         }

       });

    }

    // Read each stage of the task until all stages are complete.

    while (!finished()) {

      GetNextStage(tasks);

    }

  }

The notifying_barrier can be used to co-ordinate a set of threads where the number of threads can vary. In the example below, we reduce the number of threads when a task finishes. (Alternatively we could increase the number of threads if the task required it.) Note that reducing threads can be done via barrier::arrive_and_drop() method, but increasing the number of threads can only be done with a notifying_barrier.

  void DoWork() {

    Tasks& tasks;

    int initial_threads;

    atomic<int> current_threads(initial_threads)

    vector<thread*> workers;

    // Create a notifying_barrier, and set a lambda that will be

    // invoked every time the barrier counts down. If one or more

    // active threads have completed, reduce the number of threads.

    std::function rf = [&] { return current_threads;};

    notfying_barrier task_barrier(n_threads, rf);

    for (int i = 0; i < n_threads; ++i) {

      workers.push_back(new thread([&] {

        bool active = true;

        while(active) {

          Task task = tasks.get();

          // perform task

          ...

          if (finished(task)) {

            current_threads--;

            active = false;

          }

          task_barrier.arrive_and_wait();

         }

       });

    }

    // Read each stage of the task until all stages are complete.

    while (!finished()) {

      GetNextStage(tasks);

    }

  }

Alternative Solutions

Java provides a Phaser interface for thread co-ordination. See http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Phaser.html. It has been suggested that a similar concept could be used in the C++ standard instead of the current latch/barrier proposal.

The Phaser interface offers the following potential advantages:

To describe the differences between Phasers and barriers we use the term 'cycle'. Both constructs support the use-case where a number of threads block until they all arrive at a certain point. Once all threads have arrived, they are unblocked, and can continue. We refer to this as a 'cycle'. Both Phasers and barriers are reusable: after one cycle has completed a new cycle can begin.

The proposed notifying_barrier can only update the number of threads during the completion function that is invoked when the count reaches zero at the end of a cycle. The Phaser allows a thread to call register() at any point during the cycle. However it is difficult to reason about the ordering of this behaviour. To ensure that a thread is added to a particular cycle would require additional synchronization with the other threads in the cycle, with the complexity and performance overheads that this involves. If there is no such synchronization then a thread cannot control which cycle it joins. As the purpose of the barrier is to allow threads to co-operate on tasks, this feature of Phaser does not seem useful.

The Phaser class supports an arriveAndAwaitAdvance() method that corresponds to barrier's arrive_and_wait(). At also supports an arrive() method that decrements the internal count without blocking. This potentially allows some threads to arrive at a later cycle while other threads are still completing earlier cycles. Again, we feel that it can be hard to reason about the order in which threads will enter different cycles, or when the sequence of operations controlled by this Phaser has finally terminated. In addition, this capability adds to the complexity of the implementation.

We feel that it would be possible to implement a C++ Phaser, possibly using the proposed latch and barrier classes, but that the latter two offer a simpler interface that will be useful for most thread co-ordination tasks.

Synopsis

The synopsis is as follows.

class latch {
public:
 explicit latch(int count);
 ~latch();
 void arrive();
 void arrive_and_wait();
 void count_down(int n);
 void wait();
 bool try_wait();

};

class barrier {
public:
 explicit barrier(int num_threads);
 ~barrier();
 void arrive_and_wait();
 void arrive_and_drop();

};

class notifying_barrier {
public:
 template <typename F>
 notifying_barrier(int num_threads, F completion);
 ~notifying_barrier();
 void arrive_and_wait();
 void arrive_and_drop();
};