Commit 33c7e097 authored by pryanikov's avatar pryanikov

another refuktoring

parent 80600919
......@@ -6,23 +6,9 @@ using std::placeholders::_1;
namespace replicator {
DBReader::DBReader(nanomysql::mysql_conn_opts &opts, unsigned connect_retry)
: state(), slave(slave::MasterInfo(opts, connect_retry), state), stopped(false), last_event_when(0) {}
DBReader::~DBReader()
{
slave.close_connection();
}
void DBReader::AddTable(const std::string& db, const std::string& table, std::map<std::string, unsigned>& filter, bool do_dump)
{
tables.emplace_back(db, table, filter, do_dump);
}
void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, BinlogEventCallback cb)
void DBReader::DumpTables(std::string& binlog_name, unsigned long& binlog_pos, const BinlogEventCallback& cb)
{
slave::callback dummycallback = std::bind(&DBReader::DummyEventCallback, this, _1);
const static slave::callback dummycallback = [] (const slave::RecordSet& event) {};
// start temp slave to read DB structure
slave::Slave tempslave(slave.masterInfo(), state);
for (auto i = tables.begin(), end = tables.end(); i != end; ++i) {
......@@ -32,7 +18,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
tempslave.init();
tempslave.createDatabaseStructure();
last_event_when = ::time(NULL);
// last_event_when = ::time(NULL);
slave::Slave::binlog_pos_t bp = tempslave.getLastBinlog();
binlog_name = bp.first;
......@@ -42,12 +28,9 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
// dump tables
nanomysql::Connection conn(slave.masterInfo().conn_options);
conn.query("SET NAMES utf8");
for (auto table = tables.begin(), end = tables.end(); !stopped && table != end; ++table) {
if (!table->do_dump) continue;
// build field_name -> field_ptr map for filtered columns
std::vector<std::pair<unsigned, slave::PtrField>> filter_;
const auto rtable = tempslave.getRli().getTable(table->name);
......@@ -56,7 +39,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
for (const auto ptr_field : rtable->fields) {
const auto it = table->filter.find(ptr_field->field_name);
if (it != table->filter.end()) {
filter_.emplace_back(it->second, ptr_field);
filter_.emplace_back(it->second.first, ptr_field);
s_fields += ptr_field->field_name;
s_fields += ',';
}
......@@ -71,7 +54,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
std::cref(table->name.second),
std::cref(filter_),
_1,
cb
std::cref(cb)
));
}
......@@ -81,7 +64,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
ev->binlog_name = binlog_name;
ev->binlog_pos = binlog_pos;
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
// ev->unix_timestamp = long(time(NULL));
ev->event = "IGNORE";
stopped = cb(std::move(ev));
}
......@@ -89,7 +72,7 @@ void DBReader::DumpTables(std::string &binlog_name, unsigned long &binlog_pos, B
//tempslave.close_connection();
}
void DBReader::ReadBinlog(const std::string &binlog_name, unsigned long binlog_pos, BinlogEventCallback cb)
void DBReader::ReadBinlog(const std::string& binlog_name, unsigned long binlog_pos, const BinlogEventCallback& cb)
{
stopped = false;
state.setMasterLogNamePos(binlog_name, binlog_pos);
......@@ -98,10 +81,15 @@ void DBReader::ReadBinlog(const std::string &binlog_name, unsigned long binlog_p
slave.setCallback(
table->name.first,
table->name.second,
std::bind(&DBReader::EventCallback, this, _1, std::cref(table->filter), cb)
std::bind(
table->is_primary
? &DBReader::EventCallbackNormal
: &DBReader::EventCallbackNullify,
this, _1, std::cref(table->filter), std::cref(cb)
)
);
}
slave.setXidCallback(std::bind(&DBReader::XidEventCallback, this, _1, cb));
slave.setXidCallback(std::bind(&DBReader::XidEventCallback, this, _1, std::cref(cb)));
slave.init();
slave.createDatabaseStructure();
......@@ -114,54 +102,98 @@ void DBReader::Stop()
slave.close_connection();
}
void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::string, unsigned>& filter, BinlogEventCallback cb)
{
last_event_when = event.when;
void DBReader::EventCallbackNormal(
const slave::RecordSet& event,
const std::map<std::string, std::pair<unsigned, bool>>& filter,
const BinlogEventCallback& cb
) {
if (stopped) return;
// last_event_when = event.when;
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
switch (event.type_event) {
case slave::RecordSet::Delete: ev->event = "DELETE"; break;
case slave::RecordSet::Update: ev->event = "UPDATE"; break;
case slave::RecordSet::Write: ev->event = "INSERT"; break;
default: return;
}
ev->binlog_name = state.getMasterLogName();
ev->binlog_pos = state.getMasterLogPos();
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
// ev->unix_timestamp = long(time(NULL));
ev->database = event.db_name;
ev->table = event.tbl_name;
for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
const auto ri = event.m_row.find(fi->first);
if (ri != event.m_row.end()) {
ev->row[ fi->second.first ] = ri->second;
}
}
stopped = cb(std::move(ev));
}
void DBReader::EventCallbackNullify(
const slave::RecordSet& event,
const std::map<std::string, std::pair<unsigned, bool>>& filter,
const BinlogEventCallback& cb
) {
if (stopped) return;
// last_event_when = event.when;
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
bool is_delete = false;
switch (event.type_event) {
case slave::RecordSet::Delete: ev->event = "DELETE"; break;
case slave::RecordSet::Delete: ev->event = "DELETE"; is_delete = true;
case slave::RecordSet::Update: ev->event = "UPDATE"; break;
case slave::RecordSet::Write: ev->event = "INSERT"; break;
default: ev->event = "IGNORE";
default: return;
}
ev->binlog_name = state.getMasterLogName();
ev->binlog_pos = state.getMasterLogPos();
// ev->seconds_behind_master = GetSecondsBehindMaster();
// ev->unix_timestamp = long(time(NULL));
ev->database = event.db_name;
ev->table = event.tbl_name;
for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
const auto ri = event.m_row.find(fi->first);
if (ri != event.m_row.end()) {
ev->row[ fi->second ] = ri->second;
// if it's not a key and event is delete - don't actually delete, just nullify
ev->row[ fi->second.first ] = !fi->second.second && is_delete ? boost::any() : ri->second;
}
}
stopped = cb(std::move(ev));
}
void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
void DBReader::XidEventCallback(unsigned int server_id, const BinlogEventCallback& cb)
{
last_event_when = ::time(NULL);
if (stopped) return;
// last_event_when = ::time(NULL);
// send binlog position update event
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
ev->binlog_name = state.getMasterLogName();
ev->binlog_pos = state.getMasterLogPos();
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
// ev->unix_timestamp = long(time(NULL));
ev->event = "IGNORE";
stopped = cb(std::move(ev));
}
void DBReader::DumpTablesCallback(
const std::string &db_name,
const std::string &tbl_name,
const std::string& db_name,
const std::string& tbl_name,
const std::vector<std::pair<unsigned, slave::PtrField>>& filter,
const nanomysql::fields_t& fields,
BinlogEventCallback cb
const BinlogEventCallback& cb
) {
if (stopped) return;
SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
ev->binlog_name = "";
ev->binlog_pos = 0;
......@@ -169,7 +201,7 @@ void DBReader::DumpTablesCallback(
ev->table = tbl_name;
ev->event = "INSERT";
// ev->seconds_behind_master = GetSecondsBehindMaster();
ev->unix_timestamp = long(time(NULL));
// ev->unix_timestamp = long(time(NULL));
for (const auto& it : filter) {
slave::PtrField ptr_field = it.second;
......@@ -181,13 +213,11 @@ void DBReader::DumpTablesCallback(
ev->row[ it.first ] = ptr_field->field_data;
}
}
if (!stopped) {
stopped = cb(std::move(ev));
}
stopped = cb(std::move(ev));
}
unsigned DBReader::GetSecondsBehindMaster() const {
return std::max(::time(NULL) - last_event_when, 0L);
}
// unsigned DBReader::GetSecondsBehindMaster() const {
// return std::max(::time(NULL) - last_event_when, 0L);
// }
} // replicator
......@@ -18,40 +18,63 @@ typedef std::function<bool (SerializableBinlogEventPtr&&)> BinlogEventCallback;
struct DBTable
{
DBTable() {}
DBTable(const std::string& db, const std::string& table, std::map<std::string, unsigned>& filter, bool do_dump_)
: name(db, table), filter(filter), do_dump(do_dump_) {}
std::pair<std::string, std::string> name;
std::map<std::string, unsigned> filter;
bool do_dump;
DBTable(
const std::string& db,
const std::string& table,
const std::map<std::string, std::pair<unsigned, bool>>& filter_,
bool is_primary_
) :
name(db, table), filter(filter_), is_primary(is_primary_)
{}
const std::pair<std::string, std::string> name;
const std::map<std::string, std::pair<unsigned, bool>> filter;
const bool is_primary;
};
class DBReader
{
public:
DBReader (nanomysql::mysql_conn_opts &opts, unsigned int connect_retry = 60);
~DBReader();
void AddTable(const std::string &db, const std::string &table, std::map<std::string, unsigned>& filter, bool do_dump);
void DumpTables(std::string &binlog_name, unsigned long &binlog_pos, BinlogEventCallback f);
void ReadBinlog(const std::string &binlog_name, unsigned long binlog_pos, BinlogEventCallback cb);
DBReader(nanomysql::mysql_conn_opts& opts, unsigned connect_retry)
: state(), slave(slave::MasterInfo(opts, connect_retry), state), stopped(false) {}
~DBReader() {
slave.close_connection();
}
void AddTable(
const std::string& db,
const std::string& table,
const std::map<std::string, std::pair<unsigned, bool>>& filter,
bool is_primary
) {
tables.emplace_back(db, table, filter, is_primary);
}
void DumpTables(std::string& binlog_name, unsigned long& binlog_pos, const BinlogEventCallback& cb);
void ReadBinlog(const std::string& binlog_name, unsigned long binlog_pos, const BinlogEventCallback& cb);
void Stop();
void EventCallback(const slave::RecordSet& event, const std::map<std::string, unsigned>& filter, BinlogEventCallback cb);
void DummyEventCallback(const slave::RecordSet& event) {};
void XidEventCallback(unsigned int server_id, BinlogEventCallback cb);
void EventCallbackNormal(
const slave::RecordSet& event,
const std::map<std::string, std::pair<unsigned, bool>>& filter,
const BinlogEventCallback& cb
);
void EventCallbackNullify(
const slave::RecordSet& event,
const std::map<std::string, std::pair<unsigned, bool>>& filter,
const BinlogEventCallback& cb
);
void XidEventCallback(unsigned int server_id, const BinlogEventCallback& cb);
void DumpTablesCallback(
const std::string &db_name,
const std::string &tbl_name,
const std::string& db_name,
const std::string& tbl_name,
const std::vector<std::pair<unsigned, slave::PtrField>>& filter,
const nanomysql::fields_t &f,
BinlogEventCallback cb
const nanomysql::fields_t& f,
const BinlogEventCallback& cb
);
unsigned GetSecondsBehindMaster() const;
// unsigned GetSecondsBehindMaster() const;
private:
slave::DefaultExtState state;
......@@ -59,7 +82,7 @@ private:
std::vector<DBTable> tables;
bool stopped;
::time_t last_event_when;
// ::time_t last_event_when;
};
} // replicator
......
......@@ -140,6 +140,7 @@ static void init(YAML::Node& cfg)
}
// read Mysql to Tarantool mappings (each table maps to a single Tarantool space)
{
std::map<unsigned, bool> has_primary;
const YAML::Node& mappings = cfg["mappings"];
for (int i = 0; i < mappings.size(); i++) {
......@@ -153,10 +154,17 @@ static void init(YAML::Node& cfg)
std::string delete_call = mapping["delete_call"] ? mapping["delete_call"].as<std::string>() : TPWriter::empty_call;
const unsigned space = mapping["space"].as<unsigned>();
std::map<std::string, unsigned> columns;
TPWriter::Tuple keys;
std::map<std::string, std::pair<unsigned, bool>> columns;
std::vector<unsigned> keys;
unsigned index_max = tpwriter->space_last_id[space];
bool is_primary;
if (has_primary.find(space) == has_primary.end()) {
is_primary = has_primary[space] = true;;
} else {
is_primary = false;
}
// read key tarantool fields we'll use for delete requests
{
const YAML::Node& keys_ = mapping["key_fields"];
......@@ -170,12 +178,18 @@ static void init(YAML::Node& cfg)
{
const YAML::Node& columns_ = mapping["columns"];
for (int i = 0; i < columns_.size(); i++) {
unsigned index = i < keys.size() ? keys[i] : ++index_max;
columns[ columns_[i].as<std::string>() ] = index;
const bool is_key = i < keys.size();
const unsigned index = is_key ? keys[i] : ++index_max;
columns.emplace(
std::piecewise_construct,
std::forward_as_tuple(columns_[i].as<std::string>()),
std::forward_as_tuple(index, is_key)
);
}
}
dbreader->AddTable(database, table, columns, mapping["dump"].as<bool>());
dbreader->AddTable(database, table, columns, is_primary);
std::sort(keys.begin(), keys.end());
tpwriter->AddTable(database, table, space, keys, insert_call, update_call, delete_call);
tpwriter->space_last_id[space] = index_max;
}
......@@ -194,7 +208,7 @@ static void init(YAML::Node& cfg)
auto itf = field.begin();
if (itf == field.end()) continue;
unsigned index = itrn->first.as<unsigned>();
const unsigned index = itrn->first.as<unsigned>();
std::string type = itf->first.as<std::string>();
const YAML::Node& value = itf->second;
......
#ifndef REPLICATOR_REMOTEMON_H
#define REPLICATOR_REMOTEMON_H
#include <netinet/in.h>
#include <sys/socket.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <string>
#include <sstream>
namespace replicator {
class Graphite
{
public:
Graphite(const std::string &host, unsigned port, const std::string &prefix) : host(host), port(port), prefix(prefix),
initialized(false), sock(-1), last_packet_time(0) {}
~Graphite() { if (sock >= 0) close(sock); }
template<typename T>
void SendStat(const std::string &graph, T value)
{
Init();
if (sock < 0) {
return;
}
std::ostringstream oss;
oss << prefix << graph << " " << value << " " << ::time(NULL) << std::endl;
sendto(sock, static_cast<const void *>(oss.str().c_str()), oss.str().length(), 0, (struct sockaddr*)&server, sizeof(server));
last_packet_time = ::time(NULL);
}
::time_t GetLastPacketTime() const { return last_packet_time; }
unsigned GetInterval() const { return 60; }
private:
std::string host;
unsigned port;
std::string prefix;
bool initialized;
int sock;
struct sockaddr_in server;
::time_t last_packet_time;
void Init()
{
if (initialized) {
return;
}
initialized = true;
InitSocket();
InitPrefix();
}
void InitPrefix()
{
char hostname[256];
gethostname(hostname, sizeof(hostname));
const char *p = strchr(hostname, '.');
if (p) {
hostname[p - hostname] = '\0';
}
prefix.append(hostname);
prefix.append(".replicatord.");
}
void InitSocket()
{
struct sockaddr_in client;
sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
return;
}
memset((char *) &client, 0, sizeof(client));
client.sin_family = AF_INET;
client.sin_addr.s_addr = htonl(INADDR_ANY);
client.sin_port = 0;
server.sin_family = AF_INET;
server.sin_addr.s_addr = GetIpV4Addr(host);
server.sin_port = htons(port);
if (server.sin_addr.s_addr == 0) {
std::cerr << "Host " << host << " not found";
sock = -1;
return;
}
if (bind(sock, (struct sockaddr*)&client, sizeof(client)) < 0) {
std::cerr << "Could not bind socket: " << strerror(errno);
sock = -1;
return;
}
}
unsigned GetIpV4Addr(const std::string &hostname) const
{
struct addrinfo hints = { 0 }, *servinfo;
unsigned res = 0;
hints.ai_family = AF_INET; // use AF_INET6 to force IPv6
if (getaddrinfo(hostname.c_str(), NULL, &hints, &servinfo) == 0) {
if (servinfo) {
res = ((struct sockaddr_in *)servinfo->ai_addr)->sin_addr.s_addr;
}
freeaddrinfo(servinfo);
}
return res;
}
};
} // replicator
#endif // REPLICATOR_REMOTEMON_H
......@@ -3,24 +3,27 @@ mysql:
port: 3306
user: tarantool
password: tarant00l
connect_retry: 15
connect_retry: 15 # seconds
tarantool:
host: 10.5.4.248:5000
user: ""
password: ""
binlog_pos_space: 99
binlog_pos_space: 512
binlog_pos_key: 0
connect_retry: 15
sync_retry: 1000
connect_retry: 15 # seconds
sync_retry: 1000 # milliseconds
mappings:
# Первая встреченная в конфиге таблица считается основной для спейса,
# удаление записи в ней вызовет удаление записи и в Tarantool'е.
# Удаление в следующих таблицах с таким-же id спейса удаления записи в Tarantool'е не вызовет,
# вместо этого все поля будут забиты null'ами (с подстановкой, см. ниже)
- database: Monamour2
table: User
columns: [ oid, email, login, prefix_id, confirmed, password, secret ]
space: 100
space: 513
key_fields: [ 0 ]
dump: true
# insert_call: function_name
# update_call: function_name
# delete_call: function_name
......@@ -28,37 +31,32 @@ mappings:
- database: Monamour2
table: AnketaMini
columns: [ oid, package_id, birthdate, country_id, region_id, city_id, metro_id, gender, orientation, lookfor, age, iam, target, status, system_status, language_id, partner_id, type, type2, type3, type4, search_mode, changed, created, updated, to_vip_status, theme_id ]
space: 100
space: 513
key_fields: [ 0 ]
dump: true
- database: Monamour2
table: AnketaNamesDescr
columns: [ oid, name ]
space: 100
space: 513
key_fields: [ 0 ]
dump: true
- database: Monamour2
table: DefaultPhotos
columns: [ anketa_id, oid, platform_id, extension, small_face, updated, moderated, huge_adult, rating_id, album_id, width, height, photo_params ]
space: 100
space: 513
key_fields: [ 0 ]
dump: true
- database: Monamour2
table: RealUser_Phone
columns: [ anketa_id, number ]
space: 100
space: 513
key_fields: [ 0 ]
dump: true
- database: Monamour2
table: AnketaCounter
columns: [anketa_id, photos_active, photos_adult ]
space: 100
space: 513
key_fields: [ 0 ]
dump: true
spaces:
# Tarantool не умеет вставлять null'ы в поля, по которым есть индекс, а также при upsert'е зачем-то
......@@ -67,7 +65,7 @@ spaces:
# приходится дополнять null'ами, чтобы количество полей было всегда одинаковым.
#
# column_id: { (string|integer|unsigned): value }
100:
513:
replace_null:
1: { string: "" }
2: { string: "" }
......@@ -14,19 +14,25 @@ class SerializableValue {
boost::any value;
public:
SerializableValue () {}
SerializableValue (const boost::any &value_) : value(value_) {}
template<typename T>
inline SerializableValue& operator= (T value_) {
inline SerializableValue& operator= (const T& value_) {
value = boost::any(value_);
return *this;
}
template<typename T>
inline SerializableValue& operator= (T&& value_) {
value = boost::any(std::forward<T>(value_));
return *this;
}
inline SerializableValue& operator= (const boost::any& value_) {
value = value_;
return *this;
}
inline SerializableValue& operator= (boost::any&& value_) {
value = std::forward<boost::any>(value_);
return *this;
}
template<typename T> inline bool is() const {
return value.type() == typeid(T);
......@@ -76,7 +82,7 @@ struct SerializableBinlogEvent
std::string binlog_name;
unsigned long binlog_pos;
// unsigned long seconds_behind_master;
unsigned long unix_timestamp;
// unsigned long unix_timestamp;
std::string database;
std::string table;
std::string event;
......
</