Commit 704437c2 authored by pryanikov's avatar pryanikov
Browse files

fuck zmq

parent f85e34f9
......@@ -32,7 +32,6 @@ include_directories(
find_library(LMYSQL_CLIENT_R perconaserverclient_r PATH_SUFFIXES mysql)
find_library(LPTHREAD pthread)
find_library(LZMQ zmq)
find_library(LBOOST_SYSTEM_MT boost_system)
find_library(LBOOST_SERIALIZATION_MT boost_serialization)
......@@ -40,7 +39,7 @@ add_executable(rp ${REPLICATOR_SRC})
set_target_properties(rp PROPERTIES COMPILE_FLAGS "${REPLICATOR_CFLAGS}")
set_target_properties(rp PROPERTIES OUTPUT_NAME ${REPLICATOR_NAME})
target_link_libraries(rp tnt slave_a yaml-cpp)
target_link_libraries(rp ${LMYSQL_CLIENT_R} ${LPTHREAD} ${LZMQ} ${LBOOST_SYSTEM_MT} ${LBOOST_SERIALIZATION_MT} rt dl ssl crypto z)
target_link_libraries(rp ${LMYSQL_CLIENT_R} ${LPTHREAD} ${LBOOST_SYSTEM_MT} ${LBOOST_SERIALIZATION_MT} rt dl ssl crypto z)
install(TARGETS rp RUNTIME DESTINATION sbin)
install(FILES replicatord.cfg DESTINATION etc)
git submodule update --init --recursive
git submodule update --init --recursive
cmake -DCMAKE_BUILD_TYPE=Release .
cmake -DCMAKE_BUILD_TYPE=Release -DYAML_CPP_BUILD_TOOLS=OFF -DYAML_CPP_BUILD_CONTRIB=OFF .
make
strip -s replicatord
......@@ -19,7 +19,7 @@ void DBReader::AddTable(const std::string& db, const std::string& table, std::ma
tables.emplace_back(db, table, filter, do_dump);
}
void DBReader::DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, BinlogEventCallback cb)
void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, BinlogEventCallback cb)
{
slave::callback dummycallback = std::bind(&DBReader::DummyEventCallback, this, _1);
......@@ -89,7 +89,7 @@ void DBReader::DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, Binlo
//tempslave.close_connection();
}
void DBReader::ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos, BinlogEventCallback cb)
void DBReader::ReadBinlog(const std::string &binlog_name, unsigned long binlog_pos, BinlogEventCallback cb)
{
stopped = false;
state.setMasterLogNamePos(binlog_name, binlog_pos);
......@@ -105,7 +105,7 @@ void DBReader::ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos,
slave.init();
slave.createDatabaseStructure();
slave.get_remote_binlog(std::bind(&DBReader::ReadBinlogCallback, this));
slave.get_remote_binlog([this] { return stopped; });
}
void DBReader::Stop()
......@@ -132,14 +132,12 @@ void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::
case slave::RecordSet::Write: ev.event = "INSERT"; break;
default: ev.event = "IGNORE";
}
for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
const auto ri = event.m_row.find(fi->first);
if (ri != event.m_row.end()) {
ev.row[ fi->second ] = ri->second;
}
}
stopped = cb(ev);
}
......@@ -157,11 +155,6 @@ void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
stopped = cb(ev);
}
bool DBReader::ReadBinlogCallback()
{
return stopped != 0;
}
void DBReader::DumpTablesCallback(
const std::string &db_name,
const std::string &tbl_name,
......@@ -193,8 +186,7 @@ void DBReader::DumpTablesCallback(
}
}
unsigned DBReader::GetSecondsBehindMaster() const
{
unsigned DBReader::GetSecondsBehindMaster() const {
return std::max(::time(NULL) - last_event_when, 0L);
}
......
......@@ -14,7 +14,6 @@
namespace replicator {
typedef unsigned long BinlogPos;
typedef std::function<bool (const SerializableBinlogEvent &ev)> BinlogEventCallback;
struct DBTable
......@@ -36,14 +35,13 @@ public:
~DBReader();
void AddTable(const std::string &db, const std::string &table, std::map<std::string, unsigned>& filter, bool do_dump);
void DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, BinlogEventCallback f);
void ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos, BinlogEventCallback cb);
void DumpTables(std::string &binlog_name, unsigned long &binlog_pos, BinlogEventCallback f);
void ReadBinlog(const std::string &binlog_name, unsigned long binlog_pos, BinlogEventCallback cb);
void Stop();
void EventCallback(const slave::RecordSet& event, const std::map<std::string, unsigned>& filter, BinlogEventCallback cb);
void DummyEventCallback(const slave::RecordSet& event) {};
bool ReadBinlogCallback();
void XidEventCallback(unsigned int server_id, BinlogEventCallback cb);
void DumpTablesCallback(
const std::string &db_name,
......
......@@ -2,22 +2,17 @@
#include <iostream>
#include <sstream>
#include <fstream>
#include <thread>
#include <signal.h>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
#include "serializable.h"
#include <zmq.h>
#include <zmq_utils.h>
#include <yaml-cpp/yaml.h>
#include "serializable.h"
#include "dbreader.h"
#include "tpwriter.h"
#include "serializable.h"
#include "queue.h"
#include "logger.h"
#include "remotemon.h"
// =========
using std::placeholders::_1;
......@@ -28,349 +23,106 @@ static const char *default_pid_filename = "/var/run/replicatord.pid";
static const char *default_log_filename = "/var/log/replicatord.log";
static const char *default_config_filename = "/usr/local/etc/replicatord.cfg";
static volatile bool is_halted = false;
static volatile bool is_term = false;
static TPWriter *tpwriter = NULL;
static DBReader *dbreader = NULL;
static Graphite *graphite = NULL;
static void *ZMQContext = NULL;
static void *ZMQTpSocket = NULL;
static void *ZMQTpThread = NULL;
static void *ZMQWdSocket = NULL;
static void *ZMQWdThread = NULL;
static void tpwrite_main(void *arg);
static void watchdog_main(void *arg);
static void halt(void);
static bool start_zmq(unsigned watchdog_timeout)
{
int opti;
int rc;
ZMQContext = zmq_ctx_new();
if (ZMQContext == NULL) {
return false;
}
// tarantool
//
ZMQTpSocket = zmq_socket(ZMQContext, ZMQ_PAIR);
// set high water mark
opti = 10000;
zmq_setsockopt(ZMQTpSocket, ZMQ_SNDHWM, &opti, sizeof(opti));
rc = zmq_bind(ZMQTpSocket, "inproc://tp");
if (rc) {
return false;
}
// spawn tp thread
ZMQTpThread = zmq_threadstart(tpwrite_main, NULL);
if (ZMQTpThread == NULL) {
return false;
}
// watchdog
//
ZMQWdSocket = zmq_socket(ZMQContext, ZMQ_PAIR);
rc = zmq_bind(ZMQWdSocket, "inproc://wd");
if (rc) {
return false;
}
ZMQWdThread = zmq_threadstart(watchdog_main, ((void*)((intptr_t)watchdog_timeout)));
static volatile bool reset = false;
return true;
}
static void send_zmq_event(void *socket, const SerializableBinlogEvent &ev)
{
std::ostringstream oss;
boost::archive::binary_oarchive oa(oss);
oa << ev;
zmq_send(socket, oss.str().c_str(), oss.str().length()+1, 0);
}
static bool poll_zmq_event(void *socket, unsigned timeout, BinlogEventCallback f)
{
zmq_pollitem_t items [] = {
{ socket, 0, ZMQ_POLLIN, 0 },
};
int polled = zmq_poll(items, 1, timeout);
if (polled < 0) {
// error polling
return false;
}
bool done = false;
if (polled > 0 && (items[0].revents & ZMQ_POLLIN)) {
while (polled-- > 0) {
// restart binlog
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, socket, 0) < 0) {
zmq_msg_close(&msg);
return false;
}
std::string buf;
buf.append((char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
zmq_msg_close(&msg);
SerializableBinlogEvent ev;
std::istringstream iss(buf);
boost::archive::binary_iarchive ia(iss);
ia >> ev;
done = done || f(ev);
if (done) break;
}
}
return done;
}
static void close_zmq()
{
if (ZMQTpThread != NULL) {
zmq_threadclose(ZMQTpThread);
ZMQTpThread = NULL;
}
if (ZMQWdThread != NULL) {
zmq_threadclose(ZMQWdThread);
ZMQWdThread = NULL;
}
static std::string binlog_name;
static unsigned long binlog_pos;
static TPWriter *tpwriter = NULL;
static DBReader *dbreader = NULL;
if (ZMQTpSocket != NULL) {
zmq_close(ZMQTpSocket);
ZMQTpSocket = NULL;
}
if (ZMQWdSocket != NULL) {
zmq_close(ZMQWdSocket);
ZMQWdSocket = NULL;
}
static void sigint_handler(int sig);
if (ZMQContext != NULL) {
zmq_ctx_term(ZMQContext);
ZMQContext = NULL;
}
}
static Queue<SerializableBinlogEvent> queue(100);
// ===============
static void tpwrite_run(void *ZMQTpSocket)
static void tpwriter_worker()
{
SerializableBinlogEvent ev_connect;
SerializableBinlogEvent ev_disconnect;
ev_connect.event = "CONNECT";
ev_disconnect.event = "DISCONNECT";
bool connected = true;
while (!is_term && connected) {
if (!tpwriter->Connect()) {
continue;
}
while (!is_term)
{
if (!tpwriter->Connect()) continue;
// send initial binlog position to the main thread
try {
if (!tpwriter->ReadBinlogPos(ev_connect.binlog_name, ev_connect.binlog_pos)) {
if (!tpwriter->ReadBinlogPos(binlog_name, binlog_pos)) {
tpwriter->Disconnect();
continue;
}
send_zmq_event(ZMQTpSocket, ev_connect);
reset = true;
while (!is_term && connected) {
connected = poll_zmq_event(ZMQTpSocket, 100, std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1)) == false;
if (connected) {
connected = tpwriter->Sync();
}
if (!is_term && connected) {
if (tpwriter->ReadReply() < 0) {
const uint64_t code = tpwriter->GetReplyCode();
if (code) {
std::cerr << "Tarantool error: " << tpwriter->GetReplyErrorMessage() << " (code: " << code << ")" << std::endl;
connected = !tpwriter->DisconnectOnError();
}
while (!is_term)
{
queue.fetch(std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1), std::chrono::milliseconds(1000), 100);
tpwriter->Sync();
if (!is_term && tpwriter->ReadReply() < 0) {
const auto code = tpwriter->GetReplyCode();
if (code) {
std::cerr << "Tarantool error: " << tpwriter->GetReplyErrorMessage() << " (code: " << code << ")" << std::endl;
}
}
}
}
catch (std::range_error& ex) {
connected = false;
is_term = true;
std::cout << ex.what() << std::endl;
// loop exit
}
catch (std::exception& ex) {
std::cout << ex.what() << std::endl;
tpwriter->Disconnect();
send_zmq_event(ZMQTpSocket, ev_disconnect);
// reconnect
}
}
tpwriter->Disconnect();
send_zmq_event(ZMQTpSocket, ev_disconnect);
}
static void tpwrite_main(void *arg)
{
void *ZMQTpSocket = zmq_socket(ZMQContext, ZMQ_PAIR);
if (!ZMQTpSocket) {
kill(getpid(), SIGTERM);
return;
}
// set high water mark
int opti = 10000;
zmq_setsockopt(ZMQTpSocket, ZMQ_RCVHWM, &opti, sizeof(opti));
if (zmq_connect(ZMQTpSocket, "inproc://tp")) {
kill(getpid(), SIGTERM);
return;
}
tpwrite_run(ZMQTpSocket);
zmq_close(ZMQTpSocket);
}
// ====================
static unsigned seconds_behind_master;
static unsigned max_seconds_behind_master;
static unsigned zalloc_count;
static unsigned max_zalloc_count;
static void ping_watchdog()
static bool dbread_callback(const SerializableBinlogEvent &ev)
{
SerializableBinlogEvent ev;
ev.event = "PING";
send_zmq_event(ZMQWdSocket, ev);
}
static void update_stats()
{
time_t now;
if (!dbreader) {
return;
if (is_term || reset) {
return true;
}
ping_watchdog();
/*
now = ::time(NULL);
seconds_behind_master = dbreader->GetSecondsBehindMaster();
if (seconds_behind_master > max_seconds_behind_master) max_seconds_behind_master = seconds_behind_master;
zalloc_count = zmq_get_alloc_count();
if (zalloc_count > max_zalloc_count) max_zalloc_count = zalloc_count;
if (graphite) {
if (now > graphite->GetLastPacketTime() + graphite->GetInterval()) {
graphite->SendStat("seconds_behind_master", seconds_behind_master);
graphite->SendStat("max_seconds_behind_master", max_seconds_behind_master);
max_seconds_behind_master = seconds_behind_master;
graphite->SendStat("zmq_allocs_total", zalloc_count);
graphite->SendStat("zmq_allocs_total_max", max_zalloc_count);
max_zalloc_count = zalloc_count;
}
} // */
}
// ====================
// watchdog
static time_t last_event_timestamp;
static bool watchdog_ev_callback(const SerializableBinlogEvent &ev)
{
last_event_timestamp = ::time(NULL);
queue.push(ev);
return false;
}
static void watchdog_main(void *arg)
static void mysql_worker()
{
unsigned timeout = (intptr_t)arg;
void *ZMQWdSocket = zmq_socket(ZMQContext, ZMQ_PAIR);
if (!ZMQWdSocket || zmq_connect(ZMQWdSocket, "inproc://wd")) {
kill(getpid(), SIGTERM);
return;
}
last_event_timestamp = ::time(NULL);
while (!is_term) {
poll_zmq_event(ZMQWdSocket, 1000, watchdog_ev_callback);
// read initial binlog pos from Tarantool
while (!reset) ::sleep(1);
reset = false;
if (last_event_timestamp + timeout < ::time(NULL)) {
std::cerr << "Ping timeout detected by watchdog: committing suicide now. Restarting." << std::endl;
kill(getpid(), SIGKILL);
break;
try {
if (!is_term && !reset && binlog_name == "" && binlog_pos == 0) {
std::cout << "Tarantool reported null binlog position. Dumping tables..." << std::endl;
dbreader->DumpTables(binlog_name, binlog_pos, dbread_callback);
}
if (!is_term && !reset) {
std::cout << "Reading binlogs (" << binlog_name << ", " << binlog_pos << ")..." << std::endl;
dbreader->ReadBinlog(binlog_name, binlog_pos, dbread_callback);
}
}
catch (std::exception& ex) {
std::cerr << "Error in reading binlogs: " << ex.what() << std::endl;
sigint_handler(0);
}
}
zmq_close(ZMQWdSocket);
}
// ====================
static bool tpread_zmq_callback(const SerializableBinlogEvent &ev, std::string &TpBinlogName, unsigned long &TpBinlogPos,
bool &disconnect, bool &read)
{
read = true;
disconnect = ev.event == "DISCONNECT";
TpBinlogName = ev.binlog_name;
TpBinlogPos = ev.binlog_pos;
return false;
}
static bool tpread_get_binlogpos(unsigned timeout, std::string &TpBinlogName, unsigned long &TpBinlogPos, bool &disconnect)
{
bool read = false;
poll_zmq_event(ZMQTpSocket, timeout, std::bind(tpread_zmq_callback, _1, std::ref(TpBinlogName), std::ref(TpBinlogPos),
std::ref(disconnect), std::ref(read)));
return read;
}
static bool dbread_callback(const SerializableBinlogEvent &ev, std::string &TpBinlogName, unsigned long &TpBinlogPos, bool &disconnect)
{
if (is_term) {
return true;
}
update_stats();
if (tpread_get_binlogpos(0, TpBinlogName, TpBinlogPos, disconnect)) {
return true;
}
send_zmq_event(ZMQTpSocket, ev);
return false;
}
static void init(YAML::Node& cfg)
{
unsigned watchdog_timeout;
try
{
try {
// read Mysql settings
{
const YAML::Node& mysql = cfg["mysql"];
watchdog_timeout = mysql["watchdog_timeout"].as<unsigned>();
nanomysql::mysql_conn_opts opts;
opts.mysql_host = mysql["host"].as<std::string>();
......@@ -391,8 +143,7 @@ static void init(YAML::Node& cfg)
tarantool["binlog_pos_space"].as<unsigned>(),
tarantool["binlog_pos_key"].as<unsigned>(),
tarantool["connect_retry"].as<unsigned>(),
tarantool["sync_retry"].as<unsigned>(),
tarantool["disconnect_on_error"].as<bool>()
tarantool["sync_retry"].as<unsigned>()
);
}
// read Mysql to Tarantool mappings (each table maps to a single Tarantool space)
......@@ -476,83 +227,28 @@ static void init(YAML::Node& cfg)
std::cerr << "Config error: " << ex.what() << std::endl;
exit(EXIT_FAILURE);
}
start_zmq(watchdog_timeout);
ping_watchdog();
}
static void main_loop()
{
std::string TpBinlogName;
unsigned long TpBinlogPos;
bool disconnected;
// read initial binlog pos from Tarantool
while (!is_term) {
tpread_get_binlogpos(100, TpBinlogName, TpBinlogPos, disconnected);
if (disconnected) {
ping_watchdog();
::sleep(1);
continue;
}
if (is_term) {
break;
}
try {
BinlogEventCallback cb = std::bind(dbread_callback, _1,
std::ref(TpBinlogName), std::ref(TpBinlogPos), std::ref(disconnected));
if (TpBinlogName == "" && TpBinlogPos == 0) {
std::cout << "Tarantool reported null binlog position. Dumping tables..." << std::endl;
dbreader->DumpTables(TpBinlogName, TpBinlogPos, cb);
}
std::cout << "Reading binlogs (" << TpBinlogName << ", " << TpBinlogPos << ")..." << std::endl;
dbreader->ReadBinlog(TpBinlogName, TpBinlogPos, cb);
} catch (std::exception& ex) {
std::cerr << "Error in reading binlogs: " << ex.what() << std::endl;
halt();
}
}
}
static void shutdown()
{
close_zmq();
if (dbreader) {
// sighandler protection
DBReader *dbreader_ = dbreader;
auto dbreader_ = dbreader;
dbreader = NULL;
delete dbreader_;
}
if (graphite) {
delete graphite;
graphite = NULL;
if (tpwriter) {
auto tpwriter_ = tpwriter;
tpwriter = NULL;
delete tpwriter_;
}
}
static void sighandler(int sig)
{
is_halted = false;
is_term = true;
if (dbreader) {
dbreader->Stop();
}
}
static void halt(void)
static void sigint_handler(int sig)
{
std::cerr << "Terminating" << std::endl;
is_halted = false;
is_term = true;
if (dbreader) {
dbreader->Stop();
}
......@@ -646,7 +342,7 @@ int main(int argc, char** argv)
}