Commit 549eb038 authored by pryanikov's avatar pryanikov

revert forced upsert & calls params rework

parent 245444d0
......@@ -128,10 +128,8 @@ void DBReader::EventCallback(const slave::RecordSet& event, const std::map<std::
switch (event.type_event) {
case slave::RecordSet::Delete: ev.event = "DELETE"; break;
// case slave::RecordSet::Update: ev.event = "UPDATE"; break;
// case slave::RecordSet::Write: ev.event = "INSERT"; break;
case slave::RecordSet::Update:
case slave::RecordSet::Write: ev.event = "UPSERT"; break;
case slave::RecordSet::Update: ev.event = "UPDATE"; break;
case slave::RecordSet::Write: ev.event = "INSERT"; break;
default: ev.event = "IGNORE";
}
......@@ -176,8 +174,7 @@ void DBReader::DumpTablesCallback(
ev.binlog_pos = 0;
ev.database = db_name;
ev.table = tbl_name;
// ev.event = "INSERT";
ev.event = "UPSERT";
ev.event = "INSERT";
ev.seconds_behind_master = GetSecondsBehindMaster();
ev.unix_timestamp = long(time(NULL));
......
......@@ -293,18 +293,6 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent &ev)
::tnt_update_container_close(o);
};
auto make_call = [&] (const std::string &func) -> void {
__tnt_object tuple;
add_tuple(&tuple);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_func(&req, func.c_str(), func.length());
::tnt_request_set_tuple(&req, &tuple);
Send(&req);
};
// add Tarantool request
if (ev.event == "DELETE") {
if (ts.delete_call.empty()) {
......@@ -318,21 +306,58 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent &ev)
Send(&req);
} else {
make_call(ts.delete_call);
__tnt_object args;
::tnt_object_add_array(&args, 1);
add_key(&args);
::tnt_object_container_close(&args);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
const std::string& func = ts.delete_call;
::tnt_request_set_func(&req, func.c_str(), func.length());
Send(&req);
}
} else if (ev.event == "INSERT") {
if (ts.insert_call.empty()) {
// __tnt_object tuple;
// add_tuple(&tuple);
// __tnt_request req;
// ::tnt_request_replace(&req);
// ::tnt_request_set_space(&req, ts.space);
// ::tnt_request_set_tuple(&req, &tuple);
__tnt_object tuple;
add_tuple(&tuple);
__tnt_object ops;
add_ops(&ops);
__tnt_request req;
::tnt_request_replace(&req);
::tnt_request_upsert(&req);
::tnt_request_set_space(&req, ts.space);
::tnt_request_set_tuple(&req, &tuple);
::tnt_request_set_ops(&req, &ops);
Send(&req);
} else {
make_call(ts.insert_call);
__tnt_object args;
::tnt_object_add_array(&args, 2);
add_tuple(&args);
add_ops(&args);
::tnt_object_container_close(&args);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
const std::string& func = ts.insert_call;
::tnt_request_set_func(&req, func.c_str(), func.length());
Send(&req);
}
} else if (ev.event == "UPDATE") {
if (ts.update_call.empty()) {
......@@ -350,22 +375,21 @@ bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent &ev)
Send(&req);
} else {
make_call(ts.update_call);
}
} else if (ev.event == "UPSERT") {
__tnt_object tuple;
add_tuple(&tuple);
__tnt_object args;
::tnt_object_add_array(&args, 2);
add_key(&args);
add_ops(&args);
::tnt_object_container_close(&args);
__tnt_object ops;
add_ops(&ops);
__tnt_request req;
::tnt_request_call(&req);
::tnt_request_set_tuple(&req, &args);
__tnt_request req;
::tnt_request_upsert(&req);
::tnt_request_set_space(&req, ts.space);
::tnt_request_set_tuple(&req, &tuple);
::tnt_request_set_ops(&req, &ops);
const std::string& func = ts.update_call;
::tnt_request_set_func(&req, func.c_str(), func.length());
Send(&req);
Send(&req);
}
} else {
throw std::range_error("Uknown binlog event: " + ev.event);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment