Commit 95143e50 authored by pryanikov's avatar pryanikov
Browse files

genuine crap

PROJECT (replicator)
set(REPLICATOR_NAME "replicatord")
set(REPLICATOR_CFLAGS "-DTB_LOCAL=${REPLICATOR_ROOT}/lib/tarantool-c/lib -std=c++0x -g")
CONFIGURE_COMMAND ${REPLICATOR_ROOT}/lib/libconfig/configure --prefix=${CMAKE_CURRENT_SOURCE_DIR}/lib/libconfig
include_directories("${REPLICATOR_ROOT}" "${REPLICATOR_ROOT}/lib/libslave" "${REPLICATOR_ROOT}/lib/msgpuck" "${REPLICATOR_ROOT}/lib/tarantool-c" "${REPLICATOR_ROOT}/lib/libconfig/include")
find_library(LMYSQL_CLIENT_R mysqlclient_r PATH_SUFFIXES mysql)
find_library(LPTHREAD pthread)
find_library(LZMQ zmq)
find_library(LBOOST_SYSTEM_MT boost_system-mt)
find_library(LBOOST_SERIALIZATION_MT boost_serialization-mt)
add_executable(rp ${REPLICATOR_SRC})
target_link_libraries(rp tb slave_a ${REPLICATOR_ROOT}/lib/libconfig/lib/.libs/libconfig++.a)
install(FILES replicatord.cfg DESTINATION etc)
\ No newline at end of file
MySql binlog to Tarantool replication daemon
Please refer to for more information.
\ No newline at end of file
set -xe
err() {
echo "$@" > /dev/stderr
exit $exitval
echo "Building \"$1\""
if [ ! -f "$1" ]; then
err 1 "Spec \"$1\" not found"
GIT_VERSION="$(git rev-list HEAD -n 1)"
EXTENDED_VERSION="$(git log -n 1 --pretty=format:'%h (%ai)')"
BRANCH="$(git name-rev --name-only HEAD)"
BRANCH_FOR_RPM="$(echo $BRANCH | rev | cut -d/ -f1 | rev | sed 's/-/_/g')"
PACKAGER="$(git config <$(git config>"
LAST_COMMIT_DATETIME="$(git log -n 1 --format='%ci' | awk '{ print $1, $2 }' | sed 's/[ :]//g;s/-//g')"
CURRENT_DATETIME=`date +'%Y%m%d%H%M%S'`
if [ ! -f "$HOME/.rpmmacros" ]; then
echo "%_topdir $HOME/rpm/" > $HOME/.rpmmacros
echo "%_tmppath $HOME/rpm/tmp" >> $HOME/.rpmmacros
echo "%packager ${PACKAGER}" >> $HOME/.rpmmacros
if [ ! -d "$HOME/rpm" ]; then
echo "Creating directories need by rpmbuild"
mkdir -p ~/rpm/{BUILD,RPMS,SOURCES,SRPMS,SPECS,tmp} 2>/dev/null
mkdir ~/rpm/RPMS/{i386,i586,i686,noarch} 2>/dev/null
RPM_TOPDIR=`rpm --eval '%_topdir'`
BUILDROOT=`rpm --eval '%_tmppath'`
[ "$PACKAGE" != "/" ] && [ -d "$PACKAGE" ] && rm -rf "$PACKAGE"
mkdir -p ${RPM_TOPDIR}/RPMS/{i386,i586,i686,noarch}
mkdir -p $BUILDROOT
git archive --format=tar --prefix=${PACKAGE}/ ${BRANCH}| gzip > ${RPM_TOPDIR}/SOURCES/${PACKAGE}.tar.gz
echo '############################################################'
VERSION_SUFFIX="%{nil}" # this protects against RPM's error "macro has empty body"
if [ "${BRANCH_FOR_RPM}" != "master" ]; then
rpmbuild -ba --clean $SPECFILE \
--define "current_datetime ${CURRENT_DATETIME}" \
--define "version_suffix ${VERSION_SUFFIX}" \
--define "git_version ${GIT_VERSION}" \
--define "git_branch ${BRANCH_FOR_RPM}"
#include <sstream>
#include <boost/bind.hpp>
#include <boost/ref.hpp>
#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/algorithm/string/join.hpp>
#include "dbreader.h"
#include "serializable.h"
namespace replicator {
static SerializableRow SlaveRowToSerializableRow(const slave::Row &row)
SerializableRow srow;
for (slave::Row::const_iterator i = row.begin(); i != row.end(); ++i) {
return srow;
DBReader::DBReader(const std::string &host, const std::string &user, const std::string &password, unsigned int port, unsigned connect_retry) :
masterinfo(host, port, user, password, connect_retry), state(), slave(masterinfo, state), stopped(false), last_event_when(0)
void DBReader::AddTable(const std::string &db, const std::string &table, const std::vector<std::string> &columns)
tables.push_back(DBTable(db, table, columns));
void DBReader::AddFilterPredicate(const std::string &db, const std::string &tbl, const SimplePredicate &pred)
sfilter.AddPredicate(db, tbl, pred);
void DBReader::DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, BinlogEventCallback cb)
slave::callback dummycallback = boost::bind(&DBReader::DummyEventCallback, boost::ref(*this), _1);
// start temp slave to read DB structure
slave::Slave tempslave(masterinfo, state);
for (TableList::const_iterator i = tables.begin(); i != tables.end(); ++i) {
tempslave.setCallback(i->name.first, i->name.second, dummycallback);
last_event_when = ::time(NULL);
slave::Slave::binlog_pos_t bp = tempslave.getLastBinlog();
binlog_name = bp.first;
binlog_pos = bp.second;
state.setMasterLogNamePos(bp.first, bp.second);
// dump tables
nanomysql::Connection conn(, masterinfo.user.c_str(),
masterinfo.password.c_str(), "", masterinfo.port);
conn.query("SET NAMES utf8");
for (TableList::const_iterator t = tables.begin(); t != tables.end(); ++t) {
slave::RelayLogInfo rli = tempslave.getRli();
if (stopped) {
// build field_name -> field_ptr map for filtered columns
const boost::shared_ptr<slave::Table> rtable = rli.getTable(t->name);
std::map<std::string, std::pair<unsigned, slave::PtrField>> filtered_fields;
for (std::vector<slave::PtrField>::const_iterator f = rtable->fields.begin(); f != rtable->fields.end(); ++f) {
slave::PtrField field = *f;
const auto j = find(t->filter.begin(), t->filter.end(), field->getFieldName());
if (j != t->filter.end()) {
unsigned index = std::distance(t->filter.begin(), j);
filtered_fields[field->getFieldName()] = std::pair<unsigned, slave::PtrField>(index, field);
conn.query(std::string("USE ") + t->name.first);
conn.query(std::string("SELECT ") + boost::algorithm::join(t->filter, ",") + " FROM " + t->name.second);
conn.use(boost::bind(&DBReader::DumpTablesCallback, boost::ref(*this), boost::ref(rli), boost::cref(t->name.first), boost::cref(t->name.second),
boost::ref(conn), boost::ref(filtered_fields), _1, cb));
// send binlog position update event
if (!stopped) {
SerializableBinlogEvent ev;
ev.binlog_name = binlog_name;
ev.binlog_pos = binlog_pos;
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
stopped = cb(ev);
void DBReader::ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos, BinlogEventCallback cb)
stopped = false;
slave::callback callback = boost::bind(&DBReader::EventCallback, boost::ref(*this), _1, cb);
state.setMasterLogNamePos(binlog_name, binlog_pos);
for (TableList::const_iterator t = tables.begin(); t != tables.end(); ++t) {
slave.setCallback(t->name.first, t->name.second, callback, t->filter);
slave.setXidCallback(boost::bind(&DBReader::XidEventCallback, boost::ref(*this), _1, cb));
slave.get_remote_binlog(boost::bind(&DBReader::ReadBinlogCallback, boost::ref(*this)));
void DBReader::Stop()
stopped = true;
void DBReader::EventCallback(const slave::RecordSet& event, BinlogEventCallback cb)
last_event_when = event.when;
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
if (sfilter.PassEvent(event.db_name, event.tbl_name, event.m_row)) {
ev.database = event.db_name;
ev.table = event.tbl_name;
switch (event.type_event) {
case slave::RecordSet::Update: ev.event = "UPDATE"; break;
case slave::RecordSet::Delete: ev.event = "DELETE"; break;
case slave::RecordSet::Write: ev.event = "INSERT"; break;
default: break;
ev.row = SlaveRowToSerializableRow(event.m_row);
else {
// TEST: do not pass filtered events to ZMQ/TPWriter, this will not update binlog position
stopped = cb(ev);
void DBReader::XidEventCallback(unsigned int server_id, BinlogEventCallback cb)
last_event_when = ::time(NULL);
// send binlog position update event
SerializableBinlogEvent ev;
ev.binlog_name = state.getMasterLogName();
ev.binlog_pos = state.getMasterLogPos();
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
ev.event = "IGNORE";
stopped = cb(ev);
bool DBReader::ReadBinlogCallback()
return stopped != 0;
void DBReader::DumpTablesCallback(slave::RelayLogInfo &rli, const std::string &db_name, const std::string &tbl_name,
nanomysql::Connection &conn, std::map<std::string, std::pair<unsigned, slave::PtrField>> &filter, const nanomysql::fields_t &f, BinlogEventCallback cb)
SerializableBinlogEvent ev;
ev.binlog_name = "";
ev.binlog_pos = 0;
ev.database = db_name;
ev.table = tbl_name;
ev.event = "INSERT";
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
for (auto i = filter.begin(); i != filter.end(); ++i) {
if (stopped) {
unsigned index = i->second.first;
slave::PtrField field = i->second.second;
std::map<std::string, nanomysql::field>::const_iterator z = f.find(field->getFieldName());
ev.row[index] = field->getFieldData();
if (!stopped && sfilter.PassEvent(db_name, tbl_name, ev.row)) {
if (!stopped && cb(ev)) {
stopped = true;
if (stopped) {
unsigned DBReader::GetSecondsBehindMaster() const
::time_t now = ::time(NULL);
if (last_event_when >= now) {
return 0;
return now - last_event_when;
} // replicator
#include <vector>
#include <string>
#include <utility>
#include <boost/function.hpp>
#include <Slave.h>
#include <DefaultExtState.h>
#include <nanomysql.h>
#include "serializable.h"
#include "simplefilter.h"
namespace replicator {
typedef unsigned long BinlogPos;
typedef boost::function<bool (const SerializableBinlogEvent &ev)> BinlogEventCallback;
struct DBTable
DBTable(const std::string db_name, const std::string tbl_name, std::vector<std::string> filter) :
name(db_name, tbl_name), filter(filter)
std::pair<std::string, std::string> name;
std::vector<std::string> filter;
class DBReader
DBReader (const std::string &host, const std::string &user, const std::string &password, unsigned int port = 3306, unsigned int connect_retry = 60);
void AddTable(const std::string &db, const std::string &table, const std::vector<std::string> &columns);
void AddFilterPredicate(const std::string &db, const std::string &tbl, const SimplePredicate &pred);
void DumpTables(std::string &binlog_name, BinlogPos &binlog_pos, BinlogEventCallback f);
void ReadBinlog(const std::string &binlog_name, BinlogPos binlog_pos, BinlogEventCallback cb);
void Stop();
void EventCallback(const slave::RecordSet& event, BinlogEventCallback f);
void DummyEventCallback(const slave::RecordSet& event) {};
bool ReadBinlogCallback();
void XidEventCallback(unsigned int server_id, BinlogEventCallback cb);
void DumpTablesCallback(slave::RelayLogInfo &rli, const std::string &db_name, const std::string &tbl_name,
nanomysql::Connection &conn, std::map<std::string, std::pair<unsigned, slave::PtrField>> &filter, const nanomysql::fields_t &f, BinlogEventCallback cb);
unsigned GetSecondsBehindMaster() const;
typedef std::vector<DBTable> TableList;
slave::MasterInfo masterinfo;
slave::DefaultExtState state;
slave::Slave slave;
TableList tables;
SimpleFilter sfilter;
bool stopped;
::time_t last_event_when;
} // replicator
Mark Lindner - Lead developer & maintainer.
Daniel Marjamäki - Enhancements & bugfixes.
Andrew Tytula - Windows port.
Glenn Herteg - Enhancements, bugfixes, documentation corrections.
This diff is collapsed.
This diff is collapsed.
Installation Instructions
Copyright (C) 1994, 1995, 1996, 1999, 2000, 2001, 2002, 2004 Free
Software Foundation, Inc.
This file is free documentation; the Free Software Foundation gives
unlimited permission to copy, distribute and modify it.
Basic Installation
These are generic installation instructions.
The `configure' shell script attempts to guess correct values for
various system-dependent variables used during compilation. It uses
those values to create a `Makefile' in each directory of the package.
It may also create one or more `.h' files containing system-dependent
definitions. Finally, it creates a shell script `config.status' that
you can run in the future to recreate the current configuration, and a
file `config.log' containing compiler output (useful mainly for
debugging `configure').
It can also use an optional file (typically called `config.cache'
and enabled with `--cache-file=config.cache' or simply `-C') that saves
the results of its tests to speed up reconfiguring. (Caching is
disabled by default to prevent problems with accidental use of stale
cache files.)
If you need to do unusual things to compile the package, please try
to figure out how `configure' could check whether to do them, and mail
diffs or instructions to the address given in the `README' so they can
be considered for the next release. If you are using the cache, and at
some point `config.cache' contains results you don't want to keep, you
may remove or edit it.
The file `' (or `') is used to create
`configure' by a program called `autoconf'. You only need
`' if you want to change it or regenerate `configure' using
a newer version of `autoconf'.
The simplest way to compile this package is:
1. `cd' to the directory containing the package's source code and type
`./configure' to configure the package for your system. If you're
using `csh' on an old version of System V, you might need to type
`sh ./configure' instead to prevent `csh' from trying to execute
`configure' itself.
Running `configure' takes awhile. While running, it prints some
messages telling which features it is checking for.
2. Type `make' to compile the package.
3. Optionally, type `make check' to run any self-tests that come with
the package.
4. Type `make install' to install the programs and any data files and
5. You can remove the program binaries and object files from the
source code directory by typing `make clean'. To also remove the
files that `configure' created (so you can compile the package for
a different kind of computer), type `make distclean'. There is
also a `make maintainer-clean' target, but that is intended mainly
for the package's developers. If you use it, you may have to get
all sorts of other programs in order to regenerate files that came
with the distribution.
Compilers and Options
Some systems require unusual options for compilation or linking that the
`configure' script does not know about. Run `./configure --help' for
details on some of the pertinent environment variables.
You can give `configure' initial values for configuration parameters
by setting variables in the command line or in the environment. Here
is an example:
./configure CC=c89 CFLAGS=-O2 LIBS=-lposix
*Note Defining Variables::, for more details.
Compiling For Multiple Architectures
You can compile the package for more than one kind of computer at the
same time, by placing the object files for each architecture in their
own directory. To do this, you must use a version of `make' that
supports the `VPATH' variable, such as GNU `make'. `cd' to the
directory where you want the object files and executables to go and run
the `configure' script. `configure' automatically checks for the
source code in the directory that `configure' is in and in `..'.
If you have to use a `make' that does not support the `VPATH'
variable, you have to compile the package for one architecture at a
time in the source code directory. After you have installed the
package for one architecture, use `make distclean' before reconfiguring
for another architecture.
Installation Names
By default, `make install' will install the package's files in
`/usr/local/bin', `/usr/local/man', etc. You can specify an
installation prefix other than `/usr/local' by giving `configure' the
option `--prefix=PREFIX'.
You can specify separate installation prefixes for
architecture-specific files and architecture-independent files. If you
give `configure' the option `--exec-prefix=PREFIX', the package will
use PREFIX as the prefix for installing programs and libraries.
Documentation and other data files will still use the regular prefix.
In addition, if you use an unusual directory layout you can give
options like `--bindir=DIR' to specify different values for particular
kinds of files. Run `configure --help' for a list of the directories
you can set and what kinds of files go in them.
If the package supports it, you can cause programs to be installed
with an extra prefix or suffix on their names by giving `configure' the
option `--program-prefix=PREFIX' or `--program-suffix=SUFFIX'.
Optional Features
Some packages pay attention to `--enable-FEATURE' options to
`configure', where FEATURE indicates an optional part of the package.
They may also pay attention to `--with-PACKAGE' options, where PACKAGE
is something like `gnu-as' or `x' (for the X Window System). The
`README' should mention any `--enable-' and `--with-' options that the
package recognizes.
For packages that use the X Window System, `configure' can usually
find the X include and library files automatically, but if it doesn't,
you can use the `configure' options `--x-includes=DIR' and
`--x-libraries=DIR' to specify their locations.
Specifying the System Type
There may be some features `configure' cannot figure out automatically,
but needs to determine by the type of machine the package will run on.
Usually, assuming the package is built to be run on the _same_
architectures, `configure' can figure that out, but if it prints a
message saying it cannot guess the machine type, give it the
`--build=TYPE' option. TYPE can either be a short name for the system
type, such as `sun4', or a canonical name which has the form:
where SYSTEM can have one of these forms:
See the file `config.sub' for the possible values of each field. If
`config.sub' isn't included in this package, then this package doesn't
need to know the machine type.
If you are _building_ compiler tools for cross-compiling, you should
use the `--target=TYPE' option to select the type of system they will
produce code for.
If you want to _use_ a cross compiler, that generates code for a
platform different from the build platform, you should specify the
"host" platform (i.e., that on which the generated programs will
eventually be run) with `--host=TYPE'.
Sharing Defaults
If you want to set default values for `configure' scripts to share, you
can create a site shell script called `' that gives default
values for variables like `CC', `cache_file', and `prefix'.
`configure' looks for `PREFIX/share/' if it exists, then
`PREFIX/etc/' if it exists. Or, you can set the
`CONFIG_SITE' environment variable to the location of the site script.
A warning: not all `configure' scripts look for a site script.
Defining Variables