Thread-Safe Queue - Two Serious Errors -- Rainer Grimm

concurrentarchitecture-grimm.pngIn my last post “Monitor Object”  I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.

Thread-Safe Queue - Two Serious Errors

by Rainer Grimm

From the article:

First, I want to show you again the erroneous implementation from my last post to understand the context.

  1. // monitorObject.cpp
  2.  
  3. #include <condition_variable>
  4. #include <functional>
  5. #include <queue>
  6. #include <iostream>
  7. #include <mutex>
  8. #include <random>
  9. #include <thread>
  10.  
  11. class Monitor {
  12. public:
  13.     void lock() const {
  14.         monitMutex.lock();
  15.     }
  16.  
  17.     void unlock() const {
  18.         monitMutex.unlock();
  19.     }
  20.  
  21.     void notify_one() const noexcept {
  22.         monitCond.notify_one();
  23.     }
  24.  
  25.     template <typename Predicate>
  26.     void wait(Predicate pred) const {                 // (10)
  27.         std::unique_lock<std::mutex> monitLock(monitMutex);
  28.         monitCond.wait(monitLock, pred);
  29.     }
  30.   
  31. private:
  32.     mutable std::mutex monitMutex;
  33.     mutable std::condition_variable monitCond;
  34. };
  35.  
  36. template <typename T>                                  // (1)
  37. class ThreadSafeQueue: public Monitor {
  38. public:
  39.     void add(T val){
  40.         lock();
  41.         myQueue.push(val);                             // (6)
  42.         unlock();
  43.         notify_one();
  44.     }
  45.   
  46.     T get(){
  47.         wait( [this] { return ! myQueue.empty(); } );  // (2)
  48.         lock();
  49.         auto val = myQueue.front();                    // (4)
  50.         myQueue.pop();                                 // (5)
  51.         unlock();
  52.         return val;
  53.     }
  54.  
  55. private:
  56.     std::queue<T> myQueue;                            // (3)
  57. };
  58.  
  59.  
  60. class Dice {
  61. public:
  62.     int operator()(){ return rand(); }
  63. private:
  64.     std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
  65.                                           std::default_random_engine());
  66. };
  67.  
  68.  
  69. int main(){
  70.   
  71.     std::cout << '\n';
  72.   
  73.     constexpr auto NumberThreads = 100;
  74.   
  75.     ThreadSafeQueue<int> safeQueue;                      // (7)
  76.  
  77.     auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
  78.                                             std::cout << val << " "
  79.                                             << std::this_thread::get_id() << "; ";
  80.                                           };
  81.     auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)
  82.  
  83.     std::vector<std::thread> addThreads(NumberThreads);
  84.     Dice dice;
  85.     for (auto& thr: addThreads) thr = std::thread(addLambda, dice());
  86.  
  87.     std::vector<std::thread> getThreads(NumberThreads);
  88.     for (auto& thr: getThreads) thr = std::thread(getLambda);
  89.  
  90.     for (auto& thr: addThreads) thr.join();
  91.     for (auto& thr: getThreads) thr.join();
  92.   
  93.     std::cout << "\n\n";
  94.    
  95. }

Add a Comment

Comments are closed.

Comments (0)

There are currently no comments on this entry.