Thread-Safe Queue - Two Serious Errors -- Rainer Grimm

concurrentarchitecture-grimm.pngIn my last post "Monitor Object"  I implemented a thread-safe queue. I made two serious errors. Sorry. Today, I will fix these issues.

Thread-Safe Queue - Two Serious Errors

by Rainer Grimm

From the article:

First, I want to show you again the erroneous implementation from my last post to understand the context.

// monitorObject.cpp

#include <condition_variable>
#include <functional>
#include <queue>
#include <iostream>
#include <mutex>
#include <random>
#include <thread>

class Monitor {
public:
    void lock() const {
        monitMutex.lock();
    }

    void unlock() const {
        monitMutex.unlock();
    }

    void notify_one() const noexcept {
        monitCond.notify_one();
    }

    template <typename Predicate>
    void wait(Predicate pred) const {                 // (10)
        std::unique_lock<std::mutex> monitLock(monitMutex);
        monitCond.wait(monitLock, pred);
    }
  
private:
    mutable std::mutex monitMutex;
    mutable std::condition_variable monitCond;
};

template <typename T>                                  // (1)
class ThreadSafeQueue: public Monitor {
public:
    void add(T val){
        lock();
        myQueue.push(val);                             // (6)
        unlock();
        notify_one();
    }
  
    T get(){
        wait( [this] { return ! myQueue.empty(); } );  // (2)
        lock();
        auto val = myQueue.front();                    // (4)
        myQueue.pop();                                 // (5)
        unlock();
        return val;
    }

private:
    std::queue<T> myQueue;                            // (3)
};


class Dice {
public:
    int operator()(){ return rand(); }
private:
    std::function<int()> rand = std::bind(std::uniform_int_distribution<>(1, 6),
                                          std::default_random_engine());
};


int main(){
  
    std::cout << '\n';
  
    constexpr auto NumberThreads = 100;
  
    ThreadSafeQueue<int> safeQueue;                      // (7)

    auto addLambda = [&safeQueue](int val){ safeQueue.add(val);          // (8)
                                            std::cout << val << " "
                                            << std::this_thread::get_id() << "; ";
                                          };
    auto getLambda = [&safeQueue]{ safeQueue.get(); };  // (9)

    std::vector<std::thread> addThreads(NumberThreads);
    Dice dice;
    for (auto& thr: addThreads) thr = std::thread(addLambda, dice());

    std::vector<std::thread> getThreads(NumberThreads);
    for (auto& thr: getThreads) thr = std::thread(getLambda);

    for (auto& thr: addThreads) thr.join();
    for (auto& thr: getThreads) thr.join();
  
    std::cout << "\n\n";
   
}

Add a Comment

Comments are closed.

Comments (0)

There are currently no comments on this entry.