Commit 245444d0 authored by pryanikov's avatar pryanikov

new libs

parent 099451af
[submodule "lib/tarantool-c"]
path = lib/tarantool-c
url = https://github.com/deadbabe/tarantool-c.git
branch = fix_request_writeout
[submodule "lib/libslave"]
path = lib/libslave
url = git@gitlab7.lan:cpp/libslave.git
branch = mamba
......@@ -8,7 +8,6 @@ set(REPLICATOR_NAME "replicatord")
set(REPLICATOR_ROOT "${CMAKE_SOURCE_DIR}")
set(REPLICATOR_CFLAGS "-DTB_LOCAL=${REPLICATOR_ROOT}/lib/tarantool-c/lib -std=c++0x -g")
set(REPLICATOR_SRC
${REPLICATOR_ROOT}/lib/tarantool-c/lib/session.c
${REPLICATOR_ROOT}/dbreader.cpp
${REPLICATOR_ROOT}/main.cpp
${REPLICATOR_ROOT}/tpwriter.cpp
......@@ -26,19 +25,30 @@ ExternalProject_Add(libconfig
BUILD_IN_SOURCE 1
)
include_directories("${REPLICATOR_ROOT}" "${REPLICATOR_ROOT}/lib/libslave" "${REPLICATOR_ROOT}/lib/msgpuck" "${REPLICATOR_ROOT}/lib/tarantool-c" "${REPLICATOR_ROOT}/lib/libconfig/include")
include(FindPackageHandleStandardArgs)
find_path(IMYSQL mysql/mysql.h)
find_package_handle_standard_args(Mysql DEFAULT_MSG IMYSQL)
include_directories(
"${REPLICATOR_ROOT}"
"${REPLICATOR_ROOT}/lib/libslave"
"${REPLICATOR_ROOT}/lib/libconfig/include"
"${REPLICATOR_ROOT}/lib/tarantool-c/include"
"${REPLICATOR_ROOT}/lib/tarantool-c/third_party/msgpuck"
"${IMYSQL}/mysql"
)
find_library(LMYSQL_CLIENT_R mysqlclient_r PATH_SUFFIXES mysql)
find_library(LMYSQL_CLIENT_R perconaserverclient_r PATH_SUFFIXES mysql)
find_library(LPTHREAD pthread)
find_library(LZMQ zmq)
find_library(LBOOST_SYSTEM_MT boost_system-mt)
find_library(LBOOST_SERIALIZATION_MT boost_serialization-mt)
find_library(LBOOST_SYSTEM_MT boost_system)
find_library(LBOOST_SERIALIZATION_MT boost_serialization)
add_executable(rp ${REPLICATOR_SRC})
set_target_properties(rp PROPERTIES COMPILE_FLAGS "${REPLICATOR_CFLAGS}")
set_target_properties(rp PROPERTIES OUTPUT_NAME ${REPLICATOR_NAME})
target_link_libraries(rp tb slave_a ${REPLICATOR_ROOT}/lib/libconfig/lib/.libs/libconfig++.a)
target_link_libraries(rp ${LMYSQL_CLIENT_R} ${LPTHREAD} ${LZMQ} ${LBOOST_SYSTEM_MT} ${LBOOST_SERIALIZATION_MT})
target_link_libraries(rp tnt slave_a ${REPLICATOR_ROOT}/lib/libconfig/lib/.libs/libconfig++.a)
target_link_libraries(rp ${LMYSQL_CLIENT_R} ${LPTHREAD} ${LZMQ} ${LBOOST_SYSTEM_MT} ${LBOOST_SERIALIZATION_MT} rt dl ssl crypto z)
install(TARGETS rp RUNTIME DESTINATION sbin)
install(FILES replicatord.cfg DESTINATION etc)
\ No newline at end of file
install(FILES replicatord.cfg DESTINATION etc)
MySql binlog to Tarantool replication daemon
Демон репликации из MySQL в Tarantool.
Please refer to https://confluence.mail.ru/display/RB/replicatord for more information.
\ No newline at end of file
. ./build.sh
git submodule update --init --recursive
git submodule update --init --recursive
cmake -DCMAKE_BUILD_TYPE=Release .
chmod 755 lib/libconfig/configure
make
strip -s replicatord
#include <sstream>
#include <boost/bind.hpp>
#include <boost/ref.hpp>
#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/algorithm/string/join.hpp>
#include "dbreader.h"
#include "serializable.h"
namespace replicator {
using std::placeholders::_1;
static SerializableRow SlaveRowToSerializableRow(const slave::Row &row)
{
SerializableRow srow;
srow.reserve(row.size());
for (slave::Row::const_iterator i = row.begin(); i != row.end(); ++i) {
srow.push_back((*i).second);
}
return srow;
}
DBReader::DBReader(const std::string &host, const std::string &user, const std::string &password, unsigned int port, unsigned connect_retry) :
masterinfo(host, port, user, password, connect_retry), state(), slave(masterinfo, state), stopped(false), last_event_when(0)
{
namespace replicator {
}
DBReader::DBReader(nanomysql::mysql_conn_opts &opts, unsigned connect_retry)
: state(), slave(slave::MasterInfo(opts, connect_retry), state), stopped(false), last_event_when(0) {}
DBReader::~DBReader()
{
slave.close_connection();
}
void DBReader::AddTable(const std::string &db, const std::string &table, const std::vector<std::string> &columns)
void DBReader::AddTable(const std::string& db, const std::string& table, std::map<std::string, unsigned>& filter, bool do_dump)
{
tables.push_back(DBTable(db, table, columns));
}
void DBReader::AddFilterPredicate(const std::string &db, const std::string &tbl, const SimplePredicate &pred)
{
sfilter.AddPredicate(db, tbl, pred);
tables.emplace_back(db, table, filter, do_dump);
}
void DBReader::DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, BinlogEventCallback cb)
{
slave::callback dummycallback = boost::bind(&DBReader::DummyEventCallback, boost::ref(*this), _1);
slave::callback dummycallback = std::bind(&DBReader::DummyEventCallback, this, _1);
// start temp slave to read DB structure
slave::Slave tempslave(masterinfo, state);
for (TableList::const_iterator i = tables.begin(); i != tables.end(); ++i) {
slave::Slave tempslave(slave.masterInfo(), state);
for (auto i = tables.begin(), end = tables.end(); i != end; ++i) {
tempslave.setCallback(i->name.first, i->name.second, dummycallback);
}
tempslave.init();
tempslave.createDatabaseStructure();
last_event_when = ::time(NULL);
slave::Slave::binlog_pos_t bp = tempslave.getLastBinlog();
binlog_name = bp.first;
binlog_pos = bp.second;
......@@ -63,34 +41,38 @@ void DBReader::DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, Binlo
state.setMasterLogNamePos(bp.first, bp.second);
// dump tables
nanomysql::Connection conn(masterinfo.host.c_str(), masterinfo.user.c_str(),
masterinfo.password.c_str(), "", masterinfo.port);
nanomysql::Connection conn(slave.masterInfo().conn_options);
conn.query("SET NAMES utf8");
for (TableList::const_iterator t = tables.begin(); t != tables.end(); ++t) {
slave::RelayLogInfo rli = tempslave.getRli();
if (stopped) {
break;
}
for (auto table = tables.begin(), end = tables.end(); !stopped && table != end; ++table) {
if (!table->do_dump) continue;
// build field_name -> field_ptr map for filtered columns
const boost::shared_ptr<slave::Table> rtable = rli.getTable(t->name);
std::map<std::string, std::pair<unsigned, slave::PtrField>> filtered_fields;
for (std::vector<slave::PtrField>::const_iterator f = rtable->fields.begin(); f != rtable->fields.end(); ++f) {
slave::PtrField field = *f;
const auto j = find(t->filter.begin(), t->filter.end(), field->getFieldName());
if (j != t->filter.end()) {
unsigned index = std::distance(t->filter.begin(), j);
filtered_fields[field->getFieldName()] = std::pair<unsigned, slave::PtrField>(index, field);
std::vector<std::pair<unsigned, slave::PtrField>> filter_;
const auto rtable = tempslave.getRli().getTable(table->name);
std::string s_fields;
for (const auto ptr_field : rtable->fields) {
const auto it = table->filter.find(ptr_field->field_name);
if (it != table->filter.end()) {
filter_.emplace_back(it->second, ptr_field);
s_fields += ptr_field->field_name;
s_fields += ',';
}
}
conn.query(std::string("USE ") + t->name.first);
conn.query(std::string("SELECT ") + boost::algorithm::join(t->filter, ",") + " FROM " + t->name.second);
conn.use(boost::bind(&DBReader::DumpTablesCallback, boost::ref(*this), boost::ref(rli), boost::cref(t->name.first), boost::cref(t->name.second),
boost::ref(conn), boost::ref(filtered_fields), _1, cb));
s_fields.pop_back();
conn.select_db(table->name.first);
conn.query(std::string("SELECT ") + s_fields + " FROM " + table->name.second);
conn.use(std::bind(&DBReader::DumpTablesCallback,
this,
std::cref(table->name.first),
std::cref(table->name.second),
std::cref(filter_),
_1,
cb
));
}
// send binlog position update event
......@@ -104,24 +86,26 @@ void DBReader::DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, Binlo
stopped = cb(ev);
}
tempslave.close_connection();
//tempslave.close_connection();
}
void DBReader::ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos, BinlogEventCallback cb)
{
stopped = false;
slave::callback callback = boost::bind(&DBReader::EventCallback, boost::ref(*this), _1, cb);
state.setMasterLogNamePos(binlog_name, binlog_pos);
for (TableList::const_iterator t = tables.begin(); t != tables.end(); ++t) {
slave.setCallback(t->name.first, t->name.second, callback, t->filter);
for (auto table = tables.begin(), end = tables.end(); table != end; ++table) {
slave.setCallback(
table->name.first,
table->name.second,
std::bind(&DBReader::EventCallback, this, _1, std::cref(table->filter), cb)
);
}
slave.setXidCallback(boost::bind(&DBReader::XidEventCallback, boost::ref(*this), _1, cb));
slave.setXidCallback(std::bind(&DBReader::XidEventCallback, this, _1, cb));
slave.init();
slave.createDatabaseStructure();
slave.get_remote_binlog(boost::bind(&DBReader::ReadBinlogCallback, boost::ref(*this)));
slave.get_remote_binlog(std::bind(&DBReader::ReadBinlogCallback, this));
}
void DBReader::Stop()
......@@ -130,31 +114,32 @@ void DBReader::Stop()
slave.close_connection();
}
void DBReader::EventCallback(const slave::RecordSet& event, BinlogEventCallback cb)
void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::string, unsigned>& filter, BinlogEventCallback cb)
{
last_event_when = event.when;
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
ev.database = event.db_name;
ev.table = event.tbl_name;
switch (event.type_event) {
case slave::RecordSet::Delete: ev.event = "DELETE"; break;
// case slave::RecordSet::Update: ev.event = "UPDATE"; break;
// case slave::RecordSet::Write: ev.event = "INSERT"; break;
case slave::RecordSet::Update:
case slave::RecordSet::Write: ev.event = "UPSERT"; break;
default: ev.event = "IGNORE";
}
if (sfilter.PassEvent(event.db_name, event.tbl_name, event.m_row)) {
ev.database = event.db_name;
ev.table = event.tbl_name;
switch (event.type_event) {
case slave::RecordSet::Update: ev.event = "UPDATE"; break;
case slave::RecordSet::Delete: ev.event = "DELETE"; break;
case slave::RecordSet::Write: ev.event = "INSERT"; break;
default: break;
for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
const auto ri = event.m_row.find(fi->first);
if (ri != event.m_row.end()) {
ev.row[ fi->second ] = ri->second;
}
ev.row = SlaveRowToSerializableRow(event.m_row);
}
else {
// TEST: do not pass filtered events to ZMQ/TPWriter, this will not update binlog position
return;
}
stopped = cb(ev);
......@@ -163,7 +148,7 @@ void DBReader::EventCallback(const slave::RecordSet& event, BinlogEventCallback
void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
{
last_event_when = ::time(NULL);
// send binlog position update event
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
......@@ -179,51 +164,41 @@ bool DBReader::ReadBinlogCallback()
return stopped != 0;
}
void DBReader::DumpTablesCallback(slave::RelayLogInfo &rli, const std::string &db_name, const std::string &tbl_name,
nanomysql::Connection &conn, std::map<std::string, std::pair<unsigned, slave::PtrField>> &filter, const nanomysql::fields_t &f, BinlogEventCallback cb)
{
void DBReader::DumpTablesCallback(
const std::string &db_name,
const std::string &tbl_name,
const std::vector<std::pair<unsigned, slave::PtrField>>& filter,
const nanomysql::fields_t& fields,
BinlogEventCallback cb
) {
SerializableBinlogEvent ev;
ev.binlog_name = "";
ev.binlog_pos = 0;
ev.database = db_name;
ev.table = tbl_name;
ev.event = "INSERT";
// ev.event = "INSERT";
ev.event = "UPSERT";
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.row.resize(f.size());
for (auto i = filter.begin(); i != filter.end(); ++i) {
if (stopped) {
break;
}
unsigned index = i->second.first;
slave::PtrField field = i->second.second;
std::map<std::string, nanomysql::field>::const_iterator z = f.find(field->getFieldName());
field->unpacka(z->second.data);
ev.row[index] = field->getFieldData();
}
if (!stopped && sfilter.PassEvent(db_name, tbl_name, ev.row)) {
if (!stopped && cb(ev)) {
stopped = true;
for (const auto& it : filter) {
slave::PtrField ptr_field = it.second;
const auto& field = fields.at(ptr_field->field_name);
if (field.is_null) {
ev.row[ it.first ] = boost::any();
} else {
ptr_field->unpack_str(field.data);
ev.row[ it.first ] = ptr_field->field_data;
}
}
if (stopped) {
conn.close();
if (!stopped) {
stopped = cb(ev);
}
}
unsigned DBReader::GetSecondsBehindMaster() const
{
::time_t now = ::time(NULL);
if (last_event_when >= now) {
return 0;
}
return now - last_event_when;
return std::max(::time(NULL) - last_event_when, 0L);
}
} // replicator
......@@ -4,70 +4,66 @@
#include <vector>
#include <string>
#include <utility>
#include <boost/function.hpp>
#include <functional>
#include <Slave.h>
#include <DefaultExtState.h>
#include <nanomysql.h>
#include "serializable.h"
#include "simplefilter.h"
namespace replicator {
typedef unsigned long BinlogPos;
typedef boost::function<bool (const SerializableBinlogEvent &ev)> BinlogEventCallback;
typedef std::function<bool (const SerializableBinlogEvent &ev)> BinlogEventCallback;
struct DBTable
{
DBTable()
{
};
DBTable() {}
DBTable(const std::string db_name, const std::string tbl_name, std::vector<std::string> filter) :
name(db_name, tbl_name), filter(filter)
{
};
DBTable(const std::string& db, const std::string& table, std::map<std::string, unsigned>& filter, bool do_dump_)
: name(db, table), filter(filter), do_dump(do_dump_) {}
std::pair<std::string, std::string> name;
std::vector<std::string> filter;
std::map<std::string, unsigned> filter;
bool do_dump;
};
class DBReader
{
public:
DBReader (const std::string &host, const std::string &user, const std::string &password, unsigned int port = 3306, unsigned int connect_retry = 60);
DBReader (nanomysql::mysql_conn_opts &opts, unsigned int connect_retry = 60);
~DBReader();
void AddTable(const std::string &db, const std::string &table, const std::vector<std::string> &columns);
void AddFilterPredicate(const std::string &db, const std::string &tbl, const SimplePredicate &pred);
void AddTable(const std::string &db, const std::string &table, std::map<std::string, unsigned>& filter, bool do_dump);
void DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, BinlogEventCallback f);
void ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos, BinlogEventCallback cb);
void Stop();
void EventCallback(const slave::RecordSet& event, BinlogEventCallback f);
void EventCallback(const slave::RecordSet& event, const std::map<std::string, unsigned>& filter, BinlogEventCallback cb);
void DummyEventCallback(const slave::RecordSet& event) {};
bool ReadBinlogCallback();
void XidEventCallback(unsigned int server_id, BinlogEventCallback cb);
void DumpTablesCallback(slave::RelayLogInfo &rli, const std::string &db_name, const std::string &tbl_name,
nanomysql::Connection &conn, std::map<std::string, std::pair<unsigned, slave::PtrField>> &filter, const nanomysql::fields_t &f, BinlogEventCallback cb);
void DumpTablesCallback(
const std::string &db_name,
const std::string &tbl_name,
const std::vector<std::pair<unsigned, slave::PtrField>>& filter,
const nanomysql::fields_t &f,
BinlogEventCallback cb
);
unsigned GetSecondsBehindMaster() const;
private:
typedef std::vector<DBTable> TableList;
slave::MasterInfo masterinfo;
slave::DefaultExtState state;
slave::Slave slave;
TableList tables;
SimpleFilter sfilter;
std::vector<DBTable> tables;
bool stopped;
::time_t last_event_when;
};
} // replicator
} // replicator
#endif // REPLICATOR_DBREADER_H
libslave @ eaffacef
Subproject commit eaffacef1e90b29af94b74e77e8873d891c324c2
Subproject commit ee717a5169d601121c9865db9b1624339ab3ee0e
......@@ -3,10 +3,7 @@
#include <sstream>
#include <fstream>
#include <signal.h>
#include <lib/tp.1.5.h>
#include <lib/session.h>
#include <boost/bind.hpp>
#include <boost/archive/binary_oarchive.hpp>
#include <boost/archive/binary_iarchive.hpp>
......@@ -22,6 +19,7 @@
#include "remotemon.h"
// =========
using std::placeholders::_1;
namespace replicator {
......@@ -189,29 +187,18 @@ static void tpwrite_run(void *ZMQTpSocket)
send_zmq_event(ZMQTpSocket, ev_connect);
while(true) {
if (is_term || !connected) {
break;
}
connected = poll_zmq_event(ZMQTpSocket, 100, boost::bind(&TPWriter::BinlogEventCallback, boost::ref(*tpwriter), _1)) == false;
while (!is_term && connected) {
connected = poll_zmq_event(ZMQTpSocket, 100, std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1)) == false;
if (connected) {
connected = tpwriter->Sync();
}
while (!is_term && connected) {
int r = tpwriter->ReadReply();
if (r == 0) {
break;
}
if (r < 0) {
connected = false;
break;
}
int code = tpwriter->GetReplyCode();
if (code) {
std::cerr << "Tarantool eror: " << tpwriter->GetReplyErrorMessage() << " (code: " << code << ")" << std::endl;
connected = !tpwriter->DisconnectOnError();
if (!is_term && connected) {
if (tpwriter->ReadReply() < 0) {
const uint64_t code = tpwriter->GetReplyCode();
if (code) {
std::cerr << "Tarantool eror: " << tpwriter->GetReplyErrorMessage() << " (code: " << code << ")" << std::endl;
connected = !tpwriter->DisconnectOnError();
}
}
}
}
......@@ -280,7 +267,7 @@ static void update_stats()
}
ping_watchdog();
/*
now = ::time(NULL);
seconds_behind_master = dbreader->GetSecondsBehindMaster();
......@@ -299,7 +286,7 @@ static void update_stats()
graphite->SendStat("zmq_allocs_total_max", max_zalloc_count);
max_zalloc_count = zalloc_count;
}
}
} // */
}
// ====================
......@@ -353,8 +340,8 @@ static bool tpread_zmq_callback(const SerializableBinlogEvent &ev, std::string &
static bool tpread_get_binlogpos(unsigned timeout, std::string &TpBinlogName, unsigned long &TpBinlogPos, bool &disconnect)
{
bool read = false;
poll_zmq_event(ZMQTpSocket, timeout, boost::bind(tpread_zmq_callback, _1, boost::ref(TpBinlogName), boost::ref(TpBinlogPos),
boost::ref(disconnect), boost::ref(read)));
poll_zmq_event(ZMQTpSocket, timeout, std::bind(tpread_zmq_callback, _1, std::ref(TpBinlogName), std::ref(TpBinlogPos),
std::ref(disconnect), std::ref(read)));
return read;
}
......@@ -391,8 +378,13 @@ static void init(libconfig::Config &cfg)
mysql.lookupValue("connect_retry", connect_retry);
mysql.lookupValue("watchdog_timeout", watchdog_timeout);
dbreader = new DBReader((const char *)mysql["host"], (const char *)mysql["user"], (const char *)mysql["password"],
port, connect_retry);
nanomysql::mysql_conn_opts opts;
opts.mysql_host = (const char*)mysql["host"];
opts.mysql_port = port;
opts.mysql_user = (const char*)mysql["user"];
opts.mysql_pass = (const char*)mysql["password"];
dbreader = new DBReader(opts, connect_retry);
}
// read Tarantool config
......@@ -400,19 +392,25 @@ static void init(libconfig::Config &cfg)
const libconfig::Setting &tarantool = root["tarantool"];
std::string user(""), password("");
unsigned port = 33013;
unsigned connect_retry = 15;
unsigned sync_retry = 1000;
bool disconnect_on_error = false;
tarantool.lookupValue("user", user);
tarantool.lookupValue("password", password);
tarantool.lookupValue("port", port);
tarantool.lookupValue("connect_retry", connect_retry);
tarantool.lookupValue("sync_retry", sync_retry);
tarantool.lookupValue("disconnect_on_error", disconnect_on_error);
tpwriter = new TPWriter((const char *)tarantool["host"], user, password, (unsigned)tarantool["binlog_pos_space"],
(unsigned)tarantool["binlog_pos_key"], port, connect_retry, sync_retry, disconnect_on_error);
tpwriter = new TPWriter(
(const char *)tarantool["host"],
user,
password,
(uint32_t)tarantool["binlog_pos_space"],
(uint32_t)tarantool["binlog_pos_key"],
connect_retry,
sync_retry,
disconnect_on_error
);
}
// read Mysql to Tarantool mappings (each table maps to a single Tarantool space)
......@@ -428,33 +426,27 @@ static void init(libconfig::Config &cfg)
std::string update_call = TPWriter::empty_call;
std::string delete_call = TPWriter::empty_call;
unsigned space((unsigned)mapping["space"]);
std::vector<std::string> columns;
TPWriter::Tuple tuple, keys;
std::map<std::string, unsigned> columns;
TPWriter::Tuple keys;
unsigned index_max = tpwriter->space_last_id[space];
// read columns tuple
// read key tarantool fields we'll use for delete requests
{
const libconfig::Setting &columns_ = mapping["columns"];
int count = columns_.getLength();
for (int i = 0; i < count; i++) {
columns.push_back((const char *)columns_[i]);
tuple.push_back(i);
const libconfig::Setting &keys_ = mapping["key_fields"];
for (int i = 0; i < keys_.getLength(); i++) {
unsigned key = keys_[i];
index_max = std::max(index_max, key);
keys.push_back(key);
}
}
// read key Tarantool fields we'll use for DELETE requests
// read columns tuple
{
const libconfig::Setting &keys_ = mapping["key_fields"];
int count = keys_.getLength();
for (int i = 0; i < count; i++) {
unsigned k = keys_[i];
if (k >= columns.size()) {
std::cerr << "Bad key field id: " << k << " (should be less than " << columns.size() << ")" << std::endl;
exit(EXIT_FAILURE);