Commit 6fd91ee5 authored by pryanikov's avatar pryanikov

some sugar

parent 26ce92bf
...@@ -53,6 +53,9 @@ static void tpwriter_worker() ...@@ -53,6 +53,9 @@ static void tpwriter_worker()
const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1); const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1);
while (!is_term) { while (!is_term) {
// for (unsigned cnt = queue.size(); cnt > 0; --cnt) {
// tpwriter->BinlogEventCallback(queue.pop());
// }
queue.try_fetch(cb_fetch, timeout); queue.try_fetch(cb_fetch, timeout);
tpwriter->Sync(); tpwriter->Sync();
tpwriter->RecvAll(); tpwriter->RecvAll();
......
...@@ -9,65 +9,75 @@ ...@@ -9,65 +9,75 @@
template<typename T> class Queue template<typename T> class Queue
{ {
private:
template<typename M> class ulock : public std::unique_lock<M> {
std::condition_variable& cv;
public:
ulock(M& m, std::condition_variable& cv_) : std::unique_lock<M>(m), cv(cv_) {}
void unlock() { std::unique_lock<M>::unlock(); cv.notify_all(); }
};
public: public:
Queue(const unsigned limit_) : limit(limit_) {} Queue(const unsigned limit_) : limit(limit_) {}
inline T pop() inline T pop()
{ {
std::unique_lock<std::mutex> lock(mutex); // std::unique_lock<std::mutex> lock_(mutex);
ulock<std::mutex> lock_(mutex, cv2);
if (queue.empty()) { if (queue.empty()) {
cv1.wait(lock, [this] { return !queue.empty(); }); cv1.wait(lock_, [this] { return !queue.empty(); });
} }
T item = queue.front(); T item = queue.front();
queue.pop_front(); queue.pop_front();
lock.unlock(); // lock_.unlock();
cv2.notify_all(); // cv2.notify_all();
return item; return item;
} }
void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout) inline void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
{ {
std::unique_lock<std::mutex> lock(mutex); // std::unique_lock<std::mutex> lock_(mutex);
ulock<std::mutex> lock_(mutex, cv2);
if (!queue.empty() || cv1.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
unsigned cnt = queue.size(); if (queue.empty() && !cv1.wait_for(lock_, timeout, [this] { return !queue.empty(); })) {
do { return;
T item = queue.front();
queue.pop_front();
lock.unlock();
cv2.notify_all();
cb(item);
if (--cnt) {
lock.lock();
continue;
}
} while (false);
} }
lock.unlock(); unsigned cnt = queue.size();
cv2.notify_all(); do {
T item = queue.front();
queue.pop_front();
lock_.unlock();
//cv2.notify_all();
cb(item);
if (--cnt) {
lock_.lock();
continue;
}
} while (false);
} }
inline void push(const T& item) inline void push(const T& item)
{ {
std::unique_lock<std::mutex> lock(mutex); // std::unique_lock<std::mutex> lock_(mutex);
ulock<std::mutex> lock_(mutex, cv1);
if (queue.size() >= limit) { if (queue.size() >= limit) {
cv2.wait(lock, [this] { return queue.size() < limit; }); cv2.wait(lock_, [this] { return queue.size() < limit; });
} }
queue.push_back(item); queue.push_back(item);
lock.unlock(); // lock_.unlock();
cv1.notify_one(); // cv1.notify_one();
} }
inline unsigned size() const { inline unsigned size() const {
// std::lock_guard<std::mutex> lock(mutex); // std::lock_guard<std::mutex> lock_(mutex);
return queue.size(); return queue.size();
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment