Commit 26ce92bf authored by pryanikov's avatar pryanikov

wtf am i doing

parent eeae2892
git submodule update --init --recursive
git submodule update --init --recursive
cmake -DCMAKE_BUILD_TYPE=Release -DYAML_CPP_BUILD_TOOLS=OFF -DYAML_CPP_BUILD_CONTRIB=OFF .
cmake -DCMAKE_BUILD_TYPE=Release -DYAML_CPP_BUILD_TOOLS=OFF -DYAML_CPP_BUILD_CONTRIB=OFF -DTARANTOOL_C_EMBEDDED=1 .
make
strip -s replicatord
......@@ -80,7 +80,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
SerializableBinlogEvent ev;
ev.binlog_name = binlog_name;
ev.binlog_pos = binlog_pos;
ev.seconds_behind_master = GetSecondsBehindMaster();
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
stopped = cb(ev);
......@@ -121,7 +121,7 @@ void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
ev.seconds_behind_master = GetSecondsBehindMaster();
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.database = event.db_name;
ev.table = event.tbl_name;
......@@ -149,7 +149,7 @@ void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
ev.seconds_behind_master = GetSecondsBehindMaster();
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
stopped = cb(ev);
......@@ -168,7 +168,7 @@ void DBReader::DumpTablesCallback(
ev.database = db_name;
ev.table = tbl_name;
ev.event = "INSERT";
ev.seconds_behind_master = GetSecondsBehindMaster();
// ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
for (const auto& it : filter) {
......
......@@ -34,7 +34,7 @@ static DBReader *dbreader = NULL;
static void sigint_handler(int sig);
static Queue<SerializableBinlogEvent> queue(100);
static Queue<SerializableBinlogEvent> queue(50);
// ===============
......@@ -42,28 +42,20 @@ static void tpwriter_worker()
{
while (!is_term)
{
if (!tpwriter->Connect()) continue;
while (!tpwriter->Connect());
// send initial binlog position to the main thread
// send initial binlog position to the db thread
try {
if (!tpwriter->ReadBinlogPos(binlog_name, binlog_pos)) {
tpwriter->Disconnect();
continue;
}
tpwriter->ReadBinlogPos(binlog_name, binlog_pos);
reset = true;
while (!is_term)
{
queue.fetch(std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1), std::chrono::milliseconds(1000), 100);
tpwriter->Sync();
const std::chrono::milliseconds timeout(1000);
const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1);
if (!is_term && tpwriter->ReadReply() < 0) {
const auto code = tpwriter->GetReplyCode();
if (code) {
std::cerr << "Tarantool error: " << tpwriter->GetReplyErrorMessage() << " (code: " << code << ")" << std::endl;
}
}
while (!is_term) {
queue.try_fetch(cb_fetch, timeout);
tpwriter->Sync();
tpwriter->RecvAll();
}
}
catch (std::range_error& ex) {
......
......@@ -12,7 +12,7 @@ template<typename T> class Queue
public:
Queue(const unsigned limit_) : limit(limit_) {}
T pop()
inline T pop()
{
std::unique_lock<std::mutex> lock(mutex);
......@@ -28,24 +28,32 @@ template<typename T> class Queue
return item;
}
void fetch(const std::function<bool (T&)>& cb, const std::chrono::milliseconds timeout, const unsigned limit_)
void try_fetch(const std::function<void (T&)>& cb, const std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(mutex);
if (!queue.empty() || cv1.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
bool predicate;
unsigned cnt = queue.size() < limit_ ? queue.size() : limit_;
unsigned cnt = queue.size();
do {
predicate = cb(queue.front());
T item = queue.front();
queue.pop_front();
} while (!predicate && --cnt);
lock.unlock();
cv2.notify_all();
cb(item);
if (--cnt) {
lock.lock();
continue;
}
} while (false);
}
lock.unlock();
cv2.notify_all();
}
void push(const T& item)
inline void push(const T& item)
{
std::unique_lock<std::mutex> lock(mutex);
......@@ -58,9 +66,14 @@ template<typename T> class Queue
cv1.notify_one();
}
inline unsigned size() const {
// std::lock_guard<std::mutex> lock(mutex);
return queue.size();
}
private:
std::deque<T> queue;
std::mutex mutex;
mutable std::mutex mutex;
std::condition_variable cv1;
std::condition_variable cv2;
const unsigned limit;
......
......@@ -74,7 +74,7 @@ struct SerializableBinlogEvent
{
std::string binlog_name;
unsigned long binlog_pos;
unsigned long seconds_behind_master;
// unsigned long seconds_behind_master;
unsigned long unix_timestamp;
std::string database;
std::string table;
......
#include <iostream>
#include <sstream>
#include <sys/time.h>
#include <tarantool/tarantool.h>
......@@ -31,14 +32,11 @@ TPWriter::TPWriter(
binlog_key(binlog_key),
binlog_name(""),
binlog_pos(0),
seconds_behind_master(0),
connect_retry(connect_retry),
sync_retry(sync_retry),
next_connect_attempt(0),
next_sync_attempt(0),
next_ping_attempt(0),
reply_server_code(0),
reply_error_msg("")
next_ping_attempt(0)
{
::tnt_net(&sess);
......@@ -61,8 +59,7 @@ bool TPWriter::Connect()
{
// connect to tarantool
if (::time(NULL) < next_connect_attempt) {
::sleep(1);
return false;
::sleep(next_connect_attempt - ::time(NULL));
}
if (::tnt_connect(&sess) < 0) {
std::cerr << "Could not connect to Tarantool: " << ::tnt_strerror(&sess) << std::endl;
......@@ -82,7 +79,7 @@ TPWriter::~TPWriter()
::tnt_stream_free(&sess);
}
bool TPWriter::ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos)
void TPWriter::ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos)
{
// read initial binlog pos
int64_t sync;
......@@ -102,36 +99,21 @@ bool TPWriter::ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos
}
__tnt_reply re;
do {
const int r = Recv(&re);
if (r == 0) {
break;
}
else if (r < 0 && reply_server_code) {
std::cerr << "ReadBinlogPos Tarantool error: " << reply_error_msg << " (code: " << reply_server_code << ")" << std::endl;
return false;
}
else {
std::cerr << "ReadBinlogPos error: no replies, weird" << std::endl;
return false;
}
} while (true);
while (!Recv(&re));
if ((&re)->sync != sync) {
std::cerr << "ReadBinlogPos error: not requested reply" << std::endl;
return false;
throw std::runtime_error("ReadBinlogPos error: not requested reply");
}
do {
const char *data = (&re)->data;
this->binlog_name = binlog_name = "";
this->binlog_pos = binlog_pos = 0;
const char *data = (&re)->data;
// result rows
if (mp_unlikely(mp_typeof(*data) != MP_ARRAY)) break;
if (mp_unlikely(mp_decode_array(&data) == 0)) {
// no binlog created yet
this->binlog_name = binlog_name = "";
this->binlog_pos = binlog_pos = 0;
return true;
return;
}
// row
......@@ -154,13 +136,11 @@ bool TPWriter::ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos
next_ping_attempt = Milliseconds() + TPWriter::PING_TIMEOUT;
return true;
return;
} while (0);
std::cerr << "binlog record format error" << std::endl;
this->binlog_name = binlog_name = "";
this->binlog_pos = binlog_pos = 0;
return true;
std::cerr << "ReadBinlogPos error: bad state record format" << std::endl;
}
void TPWriter::Disconnect()
......@@ -168,24 +148,16 @@ void TPWriter::Disconnect()
::tnt_close(&sess);
}
inline void TPWriter::Ping()
{
__tnt_request req;
::tnt_request_ping(&req);
Send(&req);
}
void TPWriter::AddTable(
const std::string &db,
const std::string &table,
const std::string& db,
const std::string& table,
const unsigned space,
const Tuple &keys,
const std::string &insert_call,
const std::string &update_call,
const std::string &delete_call
const std::string& insert_call,
const std::string& update_call,
const std::string& delete_call
) {
TableMap &d = dbs[db];
TableSpace &s = d[table];
TableSpace& s = dbs[db][table];
s.space = space;
s.keys = keys;
s.insert_call = insert_call;
......@@ -193,48 +165,33 @@ void TPWriter::AddTable(
s.delete_call = delete_call;
}
inline void TPWriter::SaveBinlogPos()
{
__tnt_object tuple;
::tnt_object_add_array(&tuple, 3);
::tnt_object_add_uint(&tuple, binlog_key);
::tnt_object_add_str(&tuple, binlog_name.c_str(), binlog_name.length());
::tnt_object_add_uint(&tuple, binlog_pos);
::tnt_object_container_close(&tuple);
__tnt_request req;
::tnt_request_replace(&req);
::tnt_request_set_space(&req, binlog_key_space);
::tnt_request_set_tuple(&req, &tuple);
Send(&req);
}
bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
void TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
{
// spacial case event "IGNORE", which only updates binlog position
// but doesn't modify any table data
if (ev.event == "IGNORE")
return false;
if (ev.binlog_name != "") {
binlog_name = ev.binlog_name;
binlog_pos = ev.binlog_pos;
if (ev.event == "IGNORE") {
return;
}
const auto idb = dbs.find(ev.database);
if (idb == dbs.end())
return false;
const TableSpace& ts = dbs.at(ev.database).at(ev.table);
const TableMap &tm = idb->second;
const auto itm = tm.find(ev.table);
if (itm == tm.end())
return false;
// check if doing same key as last time
static std::string prev_key;
std::string curnt_key;
const TableSpace &ts = itm->second;
for (const auto i : ts.keys) {
curnt_key += ev.row.at(i).to_string();
}
if (prev_key == curnt_key) {
// sync first
RecvAll();
} else {
prev_key = curnt_key;
}
const auto irn = replace_null.find(ts.space);
const std::map<unsigned, SerializableValue>* replace_null_;
const auto irn = replace_null.find(ts.space);
if (irn != replace_null.end()) {
replace_null_ = &irn->second;
} else {
......@@ -256,7 +213,9 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
} else if (v.is<unsigned long long>()) {
::tnt_object_add_uint(o, v.as<unsigned long long>());
} else {
throw std::range_error(std::string("Typecasting error for non-null value for column: " + index));
std::ostringstream s;
s << "Typecasting error for non-null value for column: " << index;
throw std::range_error(s.str());
}
return;
......@@ -299,7 +258,9 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
}
}
catch (boost::bad_any_cast &ex) {
throw std::range_error(std::string("Typecasting error for column: ") + ex.what());
std::ostringstream s;
s << "Typecasting error for column: " << ex.what();
throw std::range_error(s.str());
}
};
......@@ -419,22 +380,15 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
make_call(ts.update_call);
}
} else {
throw std::range_error("Uknown binlog event: " + ev.event);
std::ostringstream s;
s << "Uknown binlog event: " << ev.event;
throw std::range_error(s.str());
}
// check if doing same key as last time
// and do sync, if true
static std::string prev_key;
std::string curnt_key;
for (const auto i : ts.keys)
curnt_key += ev.row.at(i).to_string();
if (prev_key == curnt_key)
return true;
prev_key = curnt_key;
return false;
if (ev.binlog_name != "") {
binlog_name = ev.binlog_name;
binlog_pos = ev.binlog_pos;
}
}
// blocking send
......@@ -448,93 +402,91 @@ int64_t TPWriter::Send(struct ::tnt_request *req)
char *buf = TNT_SBUF_DATA(&sbuf);
while (len > 0) {
const ssize_t r = sess.write(&sess, buf, len);
if (r < 0) {
const ssize_t sent = sess.write(&sess, buf, len);
if (sent < 0) {
const int _errno = ::tnt_errno(&sess);
if (_errno == EWOULDBLOCK || _errno == EAGAIN) {
continue;
}
::tnt_stream_free(&sbuf);
throw std::runtime_error("Send failed: " + std::string(::tnt_strerror(&sess)));
std::ostringstream s;
s << "Send failed: " << ::tnt_strerror(&sess);
throw std::runtime_error(s.str());
}
len -= r;
buf += r;
len -= sent;
buf += sent;
}
::tnt_stream_free(&sbuf);
return sync;
}
bool TPWriter::Sync(bool force)
void TPWriter::Sync()
{
if (next_ping_attempt == 0 || Milliseconds() > next_ping_attempt) {
bool force = false;
if (Milliseconds() > next_ping_attempt) {
force = true;
__tnt_request req;
::tnt_request_ping(&req);
Send(&req);
next_ping_attempt = Milliseconds() + TPWriter::PING_TIMEOUT;
Ping();
}
if (force || next_sync_attempt == 0 || Milliseconds() >= next_sync_attempt) {
SaveBinlogPos();
if (force || Milliseconds() >= next_sync_attempt) {
__tnt_object tuple;
::tnt_object_add_array(&tuple, 3);
::tnt_object_add_uint(&tuple, binlog_key);
::tnt_object_add_str(&tuple, binlog_name.c_str(), binlog_name.length());
::tnt_object_add_uint(&tuple, binlog_pos);
::tnt_object_container_close(&tuple);
__tnt_request req;
::tnt_request_replace(&req);
::tnt_request_set_space(&req, binlog_key_space);
::tnt_request_set_tuple(&req, &tuple);
Send(&req);
next_sync_attempt = Milliseconds() + sync_retry;
}
return true;
}
// non-blocking receive
int TPWriter::Recv(struct ::tnt_reply *re)
bool TPWriter::Recv(struct ::tnt_reply *re)
{
const int r = sess.read_reply(&sess, re);
const int result = sess.read_reply(&sess, re);
if (r == 0) {
if (result == 0) {
if (re->code) {
reply_server_code = re->code;
reply_error_msg = std::move(std::string(re->error, re->error_end - re->error));
return -1;
} else if (reply_server_code) {
reply_server_code = 0;
reply_error_msg = "";
std::ostringstream s;
s << "Tarantool error: " << std::string(re->error, re->error_end - re->error) << " (code: " << re->code << ")";
throw std::range_error(s.str());
}
return r;
return true;
}
if (r < 0) {
else if (result < 0) {
const int _errno = ::tnt_errno(&sess);
if (_errno == EWOULDBLOCK || _errno == EAGAIN) {
return r;
return false;
}
throw std::runtime_error("Recv failed: " + std::string(::tnt_strerror(&sess)));
std::ostringstream s;
s << "Recv failed: " << ::tnt_strerror(&sess);
throw std::runtime_error(s.str());
}
return r;
// no complete replies in buffer
return false;
}
int TPWriter::ReadReply()
void TPWriter::RecvAll()
{
int r;
bool result;
do {
__tnt_reply re;
r = Recv(&re);
}
while (r == 0);
return r;
}
uint64_t TPWriter::GetReplyCode() const
{
return reply_server_code;
}
const std::string& TPWriter::GetReplyErrorMessage() const
{
return reply_error_msg;
}
uint64_t TPWriter::Milliseconds()
{
struct timeval tp;
::gettimeofday( &tp, NULL );
if (!secbase) {
secbase = tp.tv_sec;
return tp.tv_usec / 1000;
result = Recv(&re);
}
return (uint64_t)(tp.tv_sec - secbase)*1000 + tp.tv_usec / 1000;
while (result);
}
}
......@@ -4,122 +4,110 @@
#include <string>
#include <map>
#include <vector>
#include "serializable.h"
#include <tarantool/tarantool.h>
#include "serializable.h"
namespace replicator {
class TPWriter
{
public:
TPWriter(
const std::string &host,
const std::string &user,
const std::string &password,
uint32_t binlog_key_space,
uint32_t binlog_key,
unsigned connect_retry = 15,
unsigned sync_retry = 1000
);
~TPWriter();
bool Connect();
void Disconnect();
bool ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos);
bool Sync(bool force = false);
bool BinlogEventCallback(const SerializableBinlogEvent& ev);
inline void Ping();
// return values:
// -1 for error
// otherwise returns number of complete replies read from socket
int ReadReply();
uint64_t GetReplyCode() const;
const std::string& GetReplyErrorMessage() const;
typedef std::vector<unsigned> Tuple;
void AddTable(
const std::string &db,
const std::string &table,
const unsigned space,
const Tuple &keys,
const std::string &insert_call = empty_call,
const std::string &update_call = empty_call,
const std::string &delete_call = empty_call
);
static const std::string empty_call;
std::map<uint32_t, unsigned> space_last_id;
std::map<unsigned, std::map<unsigned, SerializableValue>> replace_null;
private:
static const unsigned int PING_TIMEOUT = 5000;
std::string host;
std::string user;
std::string password;
uint32_t binlog_key_space;
uint32_t binlog_key;
std::string binlog_name;
unsigned long binlog_pos;
unsigned long seconds_behind_master;
unsigned connect_retry;
unsigned sync_retry;
::time_t next_connect_attempt; /* seconds */
uint64_t next_sync_attempt; /* milliseconds */
uint64_t next_ping_attempt; /* milliseconds */
struct ::tnt_stream sess;
// blocking send
int64_t Send(struct ::tnt_request *req);
// non-blocking receive
int Recv(struct ::tnt_reply *re);
inline void SaveBinlogPos();
uint64_t Milliseconds();
uint64_t reply_server_code;
std::string reply_error_msg;
uint64_t secbase;
class TableSpace
{
public:
TableSpace() : space(0), insert_call(""), update_call(""), delete_call("") {}
uint32_t space;
Tuple keys;
std::string insert_call;
std::string update_call;
std::string delete_call;
};
typedef struct ::tnt_stream s_tnt_stream;
struct __tnt_object : s_tnt_stream {