queue.h 1.93 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
3
4
5
6
7
8
9
10
11
#ifndef REPLICATOR_QUEUE_H
#define REPLICATOR_QUEUE_H

#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>

template<typename T> class Queue
{
pryanikov's avatar
pryanikov committed
12
13
14
15
16
17
18
19
	private:
		template<typename M> class ulock : public std::unique_lock<M> {
			std::condition_variable& cv;
			public:
			ulock(M& m, std::condition_variable& cv_) : std::unique_lock<M>(m), cv(cv_) {}
			void unlock() { std::unique_lock<M>::unlock(); cv.notify_all(); }
		};

pryanikov's avatar
pryanikov committed
20
21
22
	public:
		Queue(const unsigned limit_) : limit(limit_) {}

pryanikov's avatar
pryanikov committed
23
		inline T pop()
pryanikov's avatar
pryanikov committed
24
		{
pryanikov's avatar
pryanikov committed
25
26
			// std::unique_lock<std::mutex> lock_(mutex);
			ulock<std::mutex> lock_(mutex, cv2);
pryanikov's avatar
pryanikov committed
27
28

			if (queue.empty()) {
pryanikov's avatar
pryanikov committed
29
				cv1.wait(lock_, [this] { return !queue.empty(); });
pryanikov's avatar
pryanikov committed
30
31
32
33
			}

			T item = queue.front();
			queue.pop_front();
pryanikov's avatar
pryanikov committed
34
35
			// lock_.unlock();
			// cv2.notify_all();
pryanikov's avatar
pryanikov committed
36
37
38
39

			return item;
		}

pryanikov's avatar
pryanikov committed
40
		inline void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
pryanikov's avatar
pryanikov committed
41
		{
pryanikov's avatar
pryanikov committed
42
43
44
45
46
			// std::unique_lock<std::mutex> lock_(mutex);
			ulock<std::mutex> lock_(mutex, cv2);

			if (queue.empty() && !cv1.wait_for(lock_, timeout, [this] { return !queue.empty(); })) {
				return;
pryanikov's avatar
pryanikov committed
47
48
			}

pryanikov's avatar
pryanikov committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
			unsigned cnt = queue.size();
			do {
				T item = queue.front();
				queue.pop_front();
				lock_.unlock();
				//cv2.notify_all();

				cb(item);

				if (--cnt) {
					lock_.lock();
					continue;
				}
			} while (false);
pryanikov's avatar
pryanikov committed
63
64
		}

pryanikov's avatar
pryanikov committed
65
		inline void push(const T& item)
pryanikov's avatar
pryanikov committed
66
		{
pryanikov's avatar
pryanikov committed
67
68
			// std::unique_lock<std::mutex> lock_(mutex);
			ulock<std::mutex> lock_(mutex, cv1);
pryanikov's avatar
pryanikov committed
69
70

			if (queue.size() >= limit) {
pryanikov's avatar
pryanikov committed
71
				cv2.wait(lock_, [this] { return queue.size() < limit; });
pryanikov's avatar
pryanikov committed
72
73
74
			}

			queue.push_back(item);
pryanikov's avatar
pryanikov committed
75
76
			// lock_.unlock();
			// cv1.notify_one();
pryanikov's avatar
pryanikov committed
77
78
		}

pryanikov's avatar
pryanikov committed
79
		inline unsigned size() const {
pryanikov's avatar
pryanikov committed
80
			// std::lock_guard<std::mutex> lock_(mutex);
pryanikov's avatar
pryanikov committed
81
82
83
			return queue.size();
		}

pryanikov's avatar
pryanikov committed
84
85
	private:
		std::deque<T> queue;
pryanikov's avatar
pryanikov committed
86
		mutable std::mutex mutex;
pryanikov's avatar
pryanikov committed
87
88
89
90
91
92
		std::condition_variable cv1;
		std::condition_variable cv2;
		const unsigned limit;
};

#endif // REPLICATOR_QUEUE_H