Commit eeae2892 authored by pryanikov's avatar pryanikov
Browse files

use predicate for sync

parent 704437c2
......@@ -205,19 +205,17 @@ static void init(YAML::Node& cfg)
unsigned index = itrn->first.as<unsigned>();
std::string type = itf->first.as<std::string>();
const YAML::Node& value = itf->second;
boost::any anyv;
if (type == "string") {
anyv = value.as<std::string>();
if (type == "str" || type == "string") {
rn_[ index ] = value.as<std::string>();
} else if (type == "unsigned") {
anyv = value.as<unsigned long long>();
} else if (type == "integer") {
anyv = value.as<long long>();
rn_[ index ] = value.as<unsigned long long>();
} else if (type == "int" || type == "integer") {
rn_[ index ] = value.as<long long>();
} else {
std::cerr << "Config error: unknown type for non-null value for column " << index << std::endl;
exit(EXIT_FAILURE);
}
rn_[ index ] = anyv;
}
}
}
......
......@@ -33,10 +33,12 @@ template<typename T> class Queue
std::unique_lock<std::mutex> lock(mutex);
if (!queue.empty() || cv1.wait_for(lock, timeout, [this] { return !queue.empty(); })) {
for (unsigned cnt = queue.size() < limit_ ? queue.size() : limit_; cnt; cnt--) {
cb(queue.front());
bool predicate;
unsigned cnt = queue.size() < limit_ ? queue.size() : limit_;
do {
predicate = cb(queue.front());
queue.pop_front();
}
} while (!predicate && --cnt);
}
lock.unlock();
......
......@@ -9,119 +9,64 @@
namespace replicator {
class SerializableValue {
private:
std::string type_id;
private:
boost::any value;
void fromAny (const boost::any &v)
{
if (v.type() == typeid(std::string)) {
type_id = "string";
second = boost::any_cast<std::string>(v);
} else {
std::ostringstream s;
public:
SerializableValue () {}
SerializableValue (const boost::any &value_) : value(value_) {}
if (v.type() == typeid(char)) {
type_id = "int";
s << (short)boost::any_cast<char>(v);
}
else if (v.type() == typeid(unsigned char)) {
type_id = "uint";
s << (unsigned short)boost::any_cast<unsigned char>(v);
}
else if (v.type() == typeid(short)) {
type_id = "int";
s << boost::any_cast<short>(v);
}
else if (v.type() == typeid(unsigned short)) {
type_id = "uint";
s << boost::any_cast<unsigned short>(v);
}
else if (v.type() == typeid(int)) {
type_id = "int";
s << boost::any_cast<int>(v);
}
else if (v.type() == typeid(unsigned int)) {
type_id = "uint";
s << boost::any_cast<unsigned int>(v);
}
else if (v.type() == typeid(long)) {
type_id = "int";
s << boost::any_cast<long>(v);
}
else if (v.type() == typeid(unsigned long)) {
type_id = "uint";
s << boost::any_cast<unsigned long>(v);
}
else if (v.type() == typeid(long long)) {
type_id = "int";
s << boost::any_cast<long long>(v);
}
else if (v.type() == typeid(unsigned long long)) {
type_id = "uint";
s << boost::any_cast<unsigned long long>(v);
}
else if (v.type() == typeid(float)) {
type_id = "float";
s << boost::any_cast<float>(v);
}
else if (v.type() == typeid(double)) {
type_id = "double";
s << boost::any_cast<double>(v);
}
else {
type_id = "null";
}
second = s.str();
template<typename T>
inline SerializableValue& operator= (T value_) {
value = boost::any(value_);
return *this;
}
}
public:
std::string second;
SerializableValue () {}
SerializableValue (const boost::any &v) { fromAny(v); }
SerializableValue& operator= (const boost::any &v) { fromAny(v); return *this; }
boost::any operator *() const {
if (type_id == "string") {
return boost::any(second);
inline SerializableValue& operator= (const boost::any& value_) {
value = value_;
return *this;
}
std::istringstream s(second);
if (type_id == "int") {
long long val;
s >> val;
return boost::any(val);
}
if (type_id == "uint") {
unsigned long long val;
s >> val;
return boost::any(val);
}
if (type_id == "float") {
float val;
s >> val;
return boost::any(val);
}
if (type_id == "double") {
double val;
s >> val;
return boost::any(val);
template<typename T> inline bool is() const {
return value.type() == typeid(T);
}
return boost::any();
}
template<typename T> inline T as() const {
return boost::any_cast<T>(value);
}
const std::string& value_string() const {
return second;
}
std::string to_string() const {
std::ostringstream s;
const std::string& get_type_id() const {
return type_id;
}
if (is<std::string()>()) {
s << as<std::string>();
} else if (is<char>()) {
s << as<char>();
} else if (is<unsigned char>()) {
s << as<unsigned char>();
} else if (is<short>()) {
s << as<short>();
} else if (is<unsigned short>()) {
s << as<unsigned short>();
} else if (is<int>()) {
s << as<int>();
} else if (is<unsigned int>()) {
s << as<unsigned int>();
} else if (is<long>()) {
s << as<long>();
} else if (is<unsigned long>()) {
s << as<unsigned long>();
} else if (is<long long>()) {
s << as<long long>();
} else if (is<unsigned long long>()) {
s << as<unsigned long long>();
} else if (is<float>()) {
s << as<float>();
} else if (is<double>()) {
s << as<double>();
}
return s.str();
}
};
......
......@@ -217,6 +217,11 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
if (ev.event == "IGNORE")
return false;
if (ev.binlog_name != "") {
binlog_name = ev.binlog_name;
binlog_pos = ev.binlog_pos;
}
const auto idb = dbs.find(ev.database);
if (idb == dbs.end())
return false;
......@@ -233,44 +238,62 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
if (irn != replace_null.end()) {
replace_null_ = &irn->second;
} else {
replace_null_ = NULL;
replace_null_ = nullptr;
}
auto add_nil_with_replace = [&] (struct ::tnt_stream *o, const unsigned index) -> void {
if (replace_null_) {
const auto add_nil_with_replace = [&] (struct ::tnt_stream *o, const unsigned index) -> void {
do {
if (replace_null_ == nullptr) break;
const auto irnv = replace_null_->find(index);
if (irnv != irn->second.end()) {
const SerializableValue& v = irnv->second;
if (v.get_type_id() == "string") {
const std::string &vs = v.value_string();
::tnt_object_add_str(o, vs.c_str(), vs.length());
} else if (v.get_type_id() == "int") {
::tnt_object_add_int(o, boost::any_cast<long long>(*v));
} else if (v.get_type_id() == "uint") {
::tnt_object_add_uint(o, boost::any_cast<unsigned long long>(*v));
} else {
throw std::range_error(std::string("Typecasting error for non-null value for column: " + index));
}
return;
if (irnv == irn->second.end()) break;
const SerializableValue& v = irnv->second;
if (v.is<std::string>()) {
const std::string s = v.as<std::string>();
::tnt_object_add_str(o, s.c_str(), s.length());
} else if (v.is<long long>()) {
::tnt_object_add_int(o, v.as<long long>());
} else if (v.is<unsigned long long>()) {
::tnt_object_add_uint(o, v.as<unsigned long long>());
} else {
throw std::range_error(std::string("Typecasting error for non-null value for column: " + index));
}
}
return;
} while (false);
::tnt_object_add_nil(o);
};
auto add_value = [&] (struct ::tnt_stream *o, const unsigned index, const SerializableValue &v) -> void {
const auto add_value = [&] (struct ::tnt_stream *o, const unsigned index, const SerializableValue &v) -> void {
try {
if (v.get_type_id() == "string") {
const std::string &vs = v.value_string();
::tnt_object_add_str(o, vs.c_str(), vs.length());
} else if (v.get_type_id() == "int") {
::tnt_object_add_int(o, boost::any_cast<long long>(*v));
} else if (v.get_type_id() == "uint") {
::tnt_object_add_uint(o, boost::any_cast<unsigned long long>(*v));
} else if (v.get_type_id() == "float") {
::tnt_object_add_float(o, boost::any_cast<float>(*v));
} else if (v.get_type_id() == "double") {
::tnt_object_add_double(o, boost::any_cast<double>(*v));
if (v.is<std::string>()) {
const std::string s = v.as<std::string>();
::tnt_object_add_str(o, s.c_str(), s.length());
} else if (v.is<char>()) {
::tnt_object_add_int(o, v.as<char>());
} else if (v.is<unsigned char>()) {
::tnt_object_add_uint(o, v.as<unsigned char>());
} else if (v.is<short>()) {
::tnt_object_add_int(o, v.as<short>());
} else if (v.is<unsigned short>()) {
::tnt_object_add_uint(o, v.as<unsigned short>());
} else if (v.is<int>()) {
::tnt_object_add_int(o, v.as<int>());
} else if (v.is<unsigned int>()) {
::tnt_object_add_uint(o, v.as<unsigned int>());
} else if (v.is<long>()) {
::tnt_object_add_int(o, v.as<long>());
} else if (v.is<unsigned long>()) {
::tnt_object_add_uint(o, v.as<unsigned long>());
} else if (v.is<long long>()) {
::tnt_object_add_int(o, v.as<long long>());
} else if (v.is<unsigned long long>()) {
::tnt_object_add_uint(o, v.as<unsigned long long>());
} else if (v.is<float>()) {
::tnt_object_add_float(o, v.as<float>());
} else if (v.is<double>()) {
::tnt_object_add_double(o, v.as<double>());
} else {
add_nil_with_replace(o, index);
}
......@@ -280,7 +303,7 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
}
};
auto add_key = [&] (struct ::tnt_stream *o) -> void {
const auto add_key = [&] (struct ::tnt_stream *o) -> void {
::tnt_object_add_array(o, ts.keys.size());
for (const auto i : ts.keys) {
add_value(o, i, ev.row.at(i));
......@@ -288,7 +311,7 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
::tnt_object_container_close(o);
};
auto add_tuple = [&] (struct ::tnt_stream *o) -> void {
const auto add_tuple = [&] (struct ::tnt_stream *o) -> void {
::tnt_object_add_array(o, space_last_id[ts.space] + 1);
unsigned i_nil = 0;
// ev.row may have gaps, since it's not an array but a map
......@@ -306,22 +329,34 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
::tnt_object_container_close(o);
};
auto add_ops = [&] (struct ::tnt_stream *o, const bool sparse = true) -> void {
if (sparse) {
::tnt_update_container_reset(o);
} else {
::tnt_object_add_array(o, ev.row.size());
}
const auto add_ops = [&] (struct ::tnt_stream *o) -> void {
::tnt_update_container_reset(o);
for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
__tnt_object sval;
add_value(&sval, it->first, it->second);
::tnt_update_assign(o, it->first, &sval);
}
if (sparse) {
::tnt_update_container_close(o);
} else {
::tnt_object_container_close(o);
::tnt_update_container_close(o);
};
const auto make_call = [&] (const std::string& func_name) -> void {
__tnt_object args;
::tnt_object_add_array(&args, 1);
::tnt_object_add_map(&args, ev.row.size());
for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
::tnt_object_add_uint(&args, it->first);
add_value(&args, it->first, it->second);
}
::tnt_object_container_close(&args);
::tnt_object_container_close(&args);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
::tnt_request_set_func(&req, func_name.c_str(), func_name.length());
Send(&req);
};
// add Tarantool request
......@@ -337,19 +372,7 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
Send(&req);
} else {
__tnt_object args;
::tnt_object_add_array(&args, 1);
add_key(&args);
::tnt_object_container_close(&args);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
const std::string& func = ts.delete_call;
::tnt_request_set_func(&req, func.c_str(), func.length());
Send(&req);
make_call(ts.delete_call);
}
} else if (ev.event == "INSERT") {
if (ts.insert_call.empty()) {
......@@ -375,20 +398,7 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
Send(&req);
} else {
__tnt_object args;
::tnt_object_add_array(&args, 2);
add_tuple(&args);
add_ops(&args, false);
::tnt_object_container_close(&args);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
const std::string& func = ts.insert_call;
::tnt_request_set_func(&req, func.c_str(), func.length());
Send(&req);
make_call(ts.insert_call);
}
} else if (ev.event == "UPDATE") {
if (ts.update_call.empty()) {
......@@ -406,30 +416,24 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
Send(&req);
} else {
__tnt_object args;
::tnt_object_add_array(&args, 2);
add_key(&args);
add_ops(&args, false);
::tnt_object_container_close(&args);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
const std::string& func = ts.update_call;
::tnt_request_set_func(&req, func.c_str(), func.length());
Send(&req);
make_call(ts.update_call);
}
} else {
throw std::range_error("Uknown binlog event: " + ev.event);
}
if (ev.binlog_name != "") {
binlog_name = ev.binlog_name;
binlog_pos = ev.binlog_pos;
}
// check if doing same key as last time
// and do sync, if true
static std::string prev_key;
std::string curnt_key;
for (const auto i : ts.keys)
curnt_key += ev.row.at(i).to_string();
if (prev_key == curnt_key)
return true;
prev_key = curnt_key;
return false;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment