queue.h 1.39 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#ifndef REPLICATOR_QUEUE_H
#define REPLICATOR_QUEUE_H

#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>

template<typename T> class Queue
{
	public:
		Queue(const unsigned limit_) : limit(limit_) {}

		T pop()
		{
			std::unique_lock<std::mutex> lock(mutex);

			if (queue.empty()) {
				cv1.wait(lock, [this] { return !queue.empty(); });
			}

			T item = queue.front();
			queue.pop_front();
			lock.unlock();
			cv2.notify_all();

			return item;
		}

		void fetch(const std::function<bool (T&)>& cb, const std::chrono::milliseconds timeout, const unsigned limit_)
		{
			std::unique_lock<std::mutex> lock(mutex);

			if (!queue.empty() || cv1.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
pryanikov's avatar
pryanikov committed
36
37
38
39
				bool predicate;
				unsigned cnt = queue.size() < limit_ ? queue.size() : limit_;
				do {
					predicate = cb(queue.front());
pryanikov's avatar
pryanikov committed
40
					queue.pop_front();
pryanikov's avatar
pryanikov committed
41
				} while (!predicate && --cnt);
pryanikov's avatar
pryanikov committed
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
			}

			lock.unlock();
			cv2.notify_all();
		}

		void push(const T& item)
		{
			std::unique_lock<std::mutex> lock(mutex);

			if (queue.size() >= limit) {
				cv2.wait(lock, [this] { return queue.size() < limit; });
			}

			queue.push_back(item);
			lock.unlock();
			cv1.notify_one();
		}

	private:
		std::deque<T> queue;
		std::mutex mutex;
		std::condition_variable cv1;
		std::condition_variable cv2;
		const unsigned limit;
};

#endif // REPLICATOR_QUEUE_H