Commit 80600919 authored by pryanikov's avatar pryanikov
Browse files

no sugar, it's harmful & don't copy, use pointers

parent 6fd91ee5
......@@ -2,4 +2,4 @@ git submodule update --init --recursive
git submodule update --init --recursive
cmake -DCMAKE_BUILD_TYPE=Release -DYAML_CPP_BUILD_TOOLS=OFF -DYAML_CPP_BUILD_CONTRIB=OFF -DTARANTOOL_C_EMBEDDED=1 .
make
strip -s replicatord
#strip -s replicatord
......@@ -77,13 +77,13 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
// send binlog position update event
if (!stopped) {
SerializableBinlogEvent ev;
ev.binlog_name = binlog_name;
ev.binlog_pos = binlog_pos;
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
stopped = cb(ev);
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
ev->binlog_name = binlog_name;
ev->binlog_pos = binlog_pos;
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
ev->event = "IGNORE";
stopped = cb(std::move(ev));
}
//tempslave.close_connection();
......@@ -118,27 +118,27 @@ void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::
{
last_event_when = event.when;
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.database = event.db_name;
ev.table = event.tbl_name;
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
ev->binlog_name = state.getMasterLogName();
ev->binlog_pos = state.getMasterLogPos();
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
ev->database = event.db_name;
ev->table = event.tbl_name;
switch (event.type_event) {
case slave::RecordSet::Delete: ev.event = "DELETE"; break;
case slave::RecordSet::Update: ev.event = "UPDATE"; break;
case slave::RecordSet::Write: ev.event = "INSERT"; break;
default: ev.event = "IGNORE";
case slave::RecordSet::Delete: ev->event = "DELETE"; break;
case slave::RecordSet::Update: ev->event = "UPDATE"; break;
case slave::RecordSet::Write: ev->event = "INSERT"; break;
default: ev->event = "IGNORE";
}
for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
const auto ri = event.m_row.find(fi->first);
if (ri != event.m_row.end()) {
ev.row[ fi->second ] = ri->second;
ev->row[ fi->second ] = ri->second;
}
}
stopped = cb(ev);
stopped = cb(std::move(ev));
}
void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
......@@ -146,13 +146,13 @@ void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
last_event_when = ::time(NULL);
// send binlog position update event
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
stopped = cb(ev);
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
ev->binlog_name = state.getMasterLogName();
ev->binlog_pos = state.getMasterLogPos();
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
ev->event = "IGNORE";
stopped = cb(std::move(ev));
}
void DBReader::DumpTablesCallback(
......@@ -162,27 +162,27 @@ void DBReader::DumpTablesCallback(
const nanomysql::fields_t& fields,
BinlogEventCallback cb
) {
SerializableBinlogEvent ev;
ev.binlog_name = "";
ev.binlog_pos = 0;
ev.database = db_name;
ev.table = tbl_name;
ev.event = "INSERT";
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
ev->binlog_name = "";
ev->binlog_pos = 0;
ev->database = db_name;
ev->table = tbl_name;
ev->event = "INSERT";
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
for (const auto& it : filter) {
slave::PtrField ptr_field = it.second;
const auto& field = fields.at(ptr_field->field_name);
if (field.is_null) {
ev.row[ it.first ] = boost::any();
ev->row[ it.first ] = boost::any();
} else {
ptr_field->unpack_str(field.data);
ev.row[ it.first ] = ptr_field->field_data;
ev->row[ it.first ] = ptr_field->field_data;
}
}
if (!stopped) {
stopped = cb(ev);
stopped = cb(std::move(ev));
}
}
......
......@@ -14,7 +14,7 @@
namespace replicator {
typedef std::function<bool (const SerializableBinlogEvent &ev)> BinlogEventCallback;
typedef std::function<bool (SerializableBinlogEventPtr&&)> BinlogEventCallback;
struct DBTable
{
......
......@@ -34,7 +34,7 @@ static DBReader *dbreader = NULL;
static void sigint_handler(int sig);
static Queue<SerializableBinlogEvent> queue(50);
static Queue<SerializableBinlogEventPtr> queue(50);
// ===============
......@@ -48,15 +48,12 @@ static void tpwriter_worker()
try {
tpwriter->ReadBinlogPos(binlog_name, binlog_pos);
reset = true;
const std::chrono::milliseconds timeout(1000);
const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1);
while (!is_term) {
// for (unsigned cnt = queue.size(); cnt > 0; --cnt) {
// tpwriter->BinlogEventCallback(queue.pop());
// }
queue.try_fetch(cb_fetch, timeout);
for (unsigned cnt = queue.wait(timeout); cnt > 0; --cnt) {
tpwriter->BinlogEventCallback(queue.pop());
}
tpwriter->Sync();
tpwriter->RecvAll();
}
......@@ -78,13 +75,13 @@ static void tpwriter_worker()
// ====================
static bool dbread_callback(const SerializableBinlogEvent &ev)
static bool dbread_callback(SerializableBinlogEventPtr&& ev)
{
if (is_term || reset) {
return true;
}
queue.push(ev);
queue.push(std::forward<SerializableBinlogEventPtr>(ev));
return false;
}
......
......@@ -9,83 +9,63 @@
template<typename T> class Queue
{
private:
template<typename M> class ulock : public std::unique_lock<M> {
std::condition_variable& cv;
public:
ulock(M& m, std::condition_variable& cv_) : std::unique_lock<M>(m), cv(cv_) {}
void unlock() { std::unique_lock<M>::unlock(); cv.notify_all(); }
};
public:
Queue(const unsigned limit_) : limit(limit_) {}
inline T pop()
{
// std::unique_lock<std::mutex> lock_(mutex);
ulock<std::mutex> lock_(mutex, cv2);
std::unique_lock<std::mutex> lock(mutex);
if (queue.empty()) {
cv1.wait(lock_, [this] { return !queue.empty(); });
cv1.wait(lock, [this] { return !queue.empty(); });
}
T item = queue.front();
T item = std::move(queue.front());
queue.pop_front();
// lock_.unlock();
// cv2.notify_all();
lock.unlock();
cv2.notify_all();
return item;
}
inline void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
inline void push(T&& item)
{
// std::unique_lock<std::mutex> lock_(mutex);
ulock<std::mutex> lock_(mutex, cv2);
std::unique_lock<std::mutex> lock(mutex);
if (queue.empty() && !cv1.wait_for(lock_, timeout, [this] { return !queue.empty(); })) {
return;
if (queue.size() >= limit) {
cv2.wait(lock, [this] { return queue.size() < limit; });
}
unsigned cnt = queue.size();
do {
T item = queue.front();
queue.pop_front();
lock_.unlock();
//cv2.notify_all();
queue.push_back(std::forward<T>(item));
cb(item);
if (--cnt) {
lock_.lock();
continue;
}
} while (false);
lock.unlock();
cv1.notify_one();
}
inline void push(const T& item)
inline unsigned wait(const std::chrono::milliseconds timeout) const
{
// std::unique_lock<std::mutex> lock_(mutex);
ulock<std::mutex> lock_(mutex, cv1);
std::unique_lock<std::mutex> lock(mutex);
if (queue.size() >= limit) {
cv2.wait(lock_, [this] { return queue.size() < limit; });
}
if (!queue.empty())
return queue.size();
if (cv1.wait_for(lock, timeout, [this] { return !queue.empty(); }))
return queue.size();
queue.push_back(item);
// lock_.unlock();
// cv1.notify_one();
return 0;
}
inline unsigned size() const {
// std::lock_guard<std::mutex> lock_(mutex);
// std::lock_guard<std::mutex> lock(mutex);
return queue.size();
}
private:
std::deque<T> queue;
mutable std::mutex mutex;
std::condition_variable cv1;
std::condition_variable cv2;
mutable std::condition_variable cv1;
mutable std::condition_variable cv2;
const unsigned limit;
};
......
......@@ -4,6 +4,7 @@
#include <string>
#include <map>
#include <sstream>
#include <memory>
#include <boost/any.hpp>
namespace replicator {
......@@ -82,6 +83,8 @@ struct SerializableBinlogEvent
std::map<unsigned, SerializableValue> row;
};
typedef std::unique_ptr<SerializableBinlogEvent> SerializableBinlogEventPtr;
} // replicator
#endif // REPLICATOR_SERIALIZABLE_H
......@@ -165,22 +165,22 @@ void TPWriter::AddTable(
s.delete_call = delete_call;
}
void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
void TPWriter::BinlogEventCallback(SerializableBinlogEventPtr&& ev)
{
// spacial case event "IGNORE", which only updates binlog position
// but doesn't modify any table data
if (ev.event == "IGNORE") {
if (ev->event == "IGNORE") {
return;
}
const TableSpace& ts = dbs.at(ev.database).at(ev.table);
const TableSpace& ts = dbs.at(ev->database).at(ev->table);
// check if doing same key as last time
static std::string prev_key;
std::string curnt_key;
for (const auto i : ts.keys) {
curnt_key += ev.row.at(i).to_string();
curnt_key += ev->row.at(i).to_string();
}
if (prev_key == curnt_key) {
// sync first
......@@ -267,7 +267,7 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
const auto add_key = [&] (struct ::tnt_stream *o) -> void {
::tnt_object_add_array(o, ts.keys.size());
for (const auto i : ts.keys) {
add_value(o, i, ev.row.at(i));
add_value(o, i, ev->row.at(i));
}
::tnt_object_container_close(o);
};
......@@ -277,7 +277,7 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
unsigned i_nil = 0;
// ev.row may have gaps, since it's not an array but a map
// so fill the gaps to match columns count
for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
for (auto it = ev->row.begin(), end = ev->row.end(); it != end; ++it) {
// fill gaps
for (; i_nil < it->first; ++i_nil) add_nil_with_replace(o, i_nil);
......@@ -292,7 +292,7 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
const auto add_ops = [&] (struct ::tnt_stream *o) -> void {
::tnt_update_container_reset(o);
for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
for (auto it = ev->row.begin(), end = ev->row.end(); it != end; ++it) {
__tnt_object sval;
add_value(&sval, it->first, it->second);
::tnt_update_assign(o, it->first, &sval);
......@@ -303,9 +303,9 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
const auto make_call = [&] (const std::string& func_name) -> void {
__tnt_object args;
::tnt_object_add_array(&args, 1);
::tnt_object_add_map(&args, ev.row.size());
::tnt_object_add_map(&args, ev->row.size());
for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
for (auto it = ev->row.begin(), end = ev->row.end(); it != end; ++it) {
::tnt_object_add_uint(&args, it->first);
add_value(&args, it->first, it->second);
}
......@@ -321,7 +321,7 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
};
// add Tarantool request
if (ev.event == "DELETE") {
if (ev->event == "DELETE") {
if (ts.delete_call.empty()) {
__tnt_object key;
add_key(&key);
......@@ -335,7 +335,7 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
} else {
make_call(ts.delete_call);
}
} else if (ev.event == "INSERT") {
} else if (ev->event == "INSERT") {
if (ts.insert_call.empty()) {
// __tnt_object tuple;
// add_tuple(&tuple);
......@@ -361,7 +361,7 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
} else {
make_call(ts.insert_call);
}
} else if (ev.event == "UPDATE") {
} else if (ev->event == "UPDATE") {
if (ts.update_call.empty()) {
__tnt_object key;
add_key(&key);
......@@ -381,13 +381,13 @@ void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
}
} else {
std::ostringstream s;
s << "Uknown binlog event: " << ev.event;
s << "Uknown binlog event: " << ev->event;
throw std::range_error(s.str());
}
if (ev.binlog_name != "") {
binlog_name = ev.binlog_name;
binlog_pos = ev.binlog_pos;
if (ev->binlog_name != "") {
binlog_name = ev->binlog_name;
binlog_pos = ev->binlog_pos;
}
}
......
......@@ -27,7 +27,7 @@ class TPWriter
void Disconnect();
void ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos);
void Sync();
void BinlogEventCallback(const SerializableBinlogEvent& ev);
void BinlogEventCallback(SerializableBinlogEventPtr&& ev);
void RecvAll();
typedef std::vector<unsigned> Tuple;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment