dbreader.cpp 6.5 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
3
4
#include <sstream>
#include "dbreader.h"
#include "serializable.h"

pryanikov's avatar
pryanikov committed
5
using std::placeholders::_1;
pryanikov's avatar
pryanikov committed
6

pryanikov's avatar
pryanikov committed
7
namespace replicator {
pryanikov's avatar
pryanikov committed
8

pryanikov's avatar
pryanikov committed
9
void DBReader::DumpTables(std::string& binlog_name, unsigned long& binlog_pos, const BinlogEventCallback& cb)
pryanikov's avatar
pryanikov committed
10
{
pryanikov's avatar
pryanikov committed
11
	const static slave::callback dummycallback = [] (const slave::RecordSet& event) {};
pryanikov's avatar
pryanikov committed
12
	// start temp slave to read DB structure
pryanikov's avatar
pryanikov committed
13
14
	slave::Slave tempslave(slave.masterInfo(), state);
	for (auto i = tables.begin(), end = tables.end(); i != end; ++i) {
pryanikov's avatar
pryanikov committed
15
16
		tempslave.setCallback(i->name.first, i->name.second, dummycallback);
	}
pryanikov's avatar
pryanikov committed
17

pryanikov's avatar
pryanikov committed
18
19
20
	tempslave.init();
	tempslave.createDatabaseStructure();

pryanikov's avatar
pryanikov committed
21
	// last_event_when = ::time(NULL);
pryanikov's avatar
pryanikov committed
22

pryanikov's avatar
pryanikov committed
23
24
25
26
27
28
29
	slave::Slave::binlog_pos_t bp = tempslave.getLastBinlog();
	binlog_name = bp.first;
	binlog_pos = bp.second;

	state.setMasterLogNamePos(bp.first, bp.second);

	// dump tables
pryanikov's avatar
pryanikov committed
30
	nanomysql::Connection conn(slave.masterInfo().conn_options);
pryanikov's avatar
pryanikov committed
31
32
	conn.query("SET NAMES utf8");

pryanikov's avatar
pryanikov committed
33
	for (auto table = tables.begin(), end = tables.end(); !stopped && table != end; ++table) {
pryanikov's avatar
pryanikov committed
34
		// build field_name -> field_ptr map for filtered columns
pryanikov's avatar
pryanikov committed
35
36
37
38
39
40
41
		std::vector<std::pair<unsigned, slave::PtrField>> filter_;
		const auto rtable = tempslave.getRli().getTable(table->name);
		std::string s_fields;

		for (const auto ptr_field : rtable->fields) {
			const auto it = table->filter.find(ptr_field->field_name);
			if (it != table->filter.end()) {
pryanikov's avatar
pryanikov committed
42
				filter_.emplace_back(it->second.first, ptr_field);
pryanikov's avatar
pryanikov committed
43
44
				s_fields += ptr_field->field_name;
				s_fields += ',';
pryanikov's avatar
pryanikov committed
45
46
			}
		}
pryanikov's avatar
pryanikov committed
47
48
49
50
51
52
53
54
55
56
		s_fields.pop_back();

		conn.select_db(table->name.first);
		conn.query(std::string("SELECT ") + s_fields  + " FROM " + table->name.second);
		conn.use(std::bind(&DBReader::DumpTablesCallback,
			this,
			std::cref(table->name.first),
			std::cref(table->name.second),
			std::cref(filter_),
			_1,
pryanikov's avatar
pryanikov committed
57
			std::cref(cb)
pryanikov's avatar
pryanikov committed
58
		));
pryanikov's avatar
pryanikov committed
59
60
61
62
	}

	// send binlog position update event
	if (!stopped) {
63
64
65
66
		SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
		ev->binlog_name = binlog_name;
		ev->binlog_pos = binlog_pos;
		// ev->seconds_behind_master = GetSecondsBehindMaster();
pryanikov's avatar
pryanikov committed
67
		// ev->unix_timestamp = long(time(NULL));
68
69
		ev->event = "IGNORE";
		stopped = cb(std::move(ev));
pryanikov's avatar
pryanikov committed
70
71
	}

pryanikov's avatar
pryanikov committed
72
	//tempslave.close_connection();
pryanikov's avatar
pryanikov committed
73
74
}

pryanikov's avatar
pryanikov committed
75
void DBReader::ReadBinlog(const std::string& binlog_name, unsigned long binlog_pos, const BinlogEventCallback& cb)
pryanikov's avatar
pryanikov committed
76
77
78
{
	stopped = false;
	state.setMasterLogNamePos(binlog_name, binlog_pos);
pryanikov's avatar
pryanikov committed
79
80
81
82
83

	for (auto table = tables.begin(), end = tables.end(); table != end; ++table) {
		slave.setCallback(
			table->name.first,
			table->name.second,
pryanikov's avatar
pryanikov committed
84
85
86
87
88
89
			std::bind(
				table->is_primary
					? &DBReader::EventCallbackNormal
					: &DBReader::EventCallbackNullify,
				this, _1, std::cref(table->filter), std::cref(cb)
			)
pryanikov's avatar
pryanikov committed
90
		);
pryanikov's avatar
pryanikov committed
91
	}
pryanikov's avatar
pryanikov committed
92
	slave.setXidCallback(std::bind(&DBReader::XidEventCallback, this, _1, std::cref(cb)));
pryanikov's avatar
pryanikov committed
93
94
95
	slave.init();
	slave.createDatabaseStructure();

pryanikov's avatar
pryanikov committed
96
	slave.get_remote_binlog([this] { return stopped; });
pryanikov's avatar
pryanikov committed
97
98
99
100
101
102
103
104
}

void DBReader::Stop()
{
	stopped = true;
	slave.close_connection();
}

pryanikov's avatar
pryanikov committed
105
106
107
108
109
110
111
void DBReader::EventCallbackNormal(
	const slave::RecordSet& event,
	const std::map<std::string, std::pair<unsigned, bool>>& filter,
	const BinlogEventCallback& cb
) {
	if (stopped) return;
	// last_event_when = event.when;
pryanikov's avatar
pryanikov committed
112

113
	SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
pryanikov's avatar
pryanikov committed
114
115
116
117
118
119
120
121

	switch (event.type_event) {
		case slave::RecordSet::Delete: ev->event = "DELETE"; break;
		case slave::RecordSet::Update: ev->event = "UPDATE"; break;
		case slave::RecordSet::Write:  ev->event = "INSERT"; break;
		default: return;
	}

122
123
124
	ev->binlog_name = state.getMasterLogName();
	ev->binlog_pos = state.getMasterLogPos();
	// ev->seconds_behind_master = GetSecondsBehindMaster();
pryanikov's avatar
pryanikov committed
125
	// ev->unix_timestamp = long(time(NULL));
126
127
	ev->database = event.db_name;
	ev->table = event.tbl_name;
pryanikov's avatar
pryanikov committed
128

pryanikov's avatar
pryanikov committed
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
	for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
		const auto ri = event.m_row.find(fi->first);
		if (ri != event.m_row.end()) {
			ev->row[ fi->second.first ] = ri->second;
		}
	}
	stopped = cb(std::move(ev));
}

void DBReader::EventCallbackNullify(
	const slave::RecordSet& event,
	const std::map<std::string, std::pair<unsigned, bool>>& filter,
	const BinlogEventCallback& cb
) {
	if (stopped) return;
	// last_event_when = event.when;

	SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
	bool is_delete = false;

pryanikov's avatar
pryanikov committed
149
	switch (event.type_event) {
pryanikov's avatar
pryanikov committed
150
		case slave::RecordSet::Delete: ev->event = "DELETE"; is_delete = true;
151
152
		case slave::RecordSet::Update: ev->event = "UPDATE"; break;
		case slave::RecordSet::Write:  ev->event = "INSERT"; break;
pryanikov's avatar
pryanikov committed
153
		default: return;
pryanikov's avatar
pryanikov committed
154
	}
pryanikov's avatar
pryanikov committed
155
156
157
158
159
160
161
162

	ev->binlog_name = state.getMasterLogName();
	ev->binlog_pos = state.getMasterLogPos();
	// ev->seconds_behind_master = GetSecondsBehindMaster();
	// ev->unix_timestamp = long(time(NULL));
	ev->database = event.db_name;
	ev->table = event.tbl_name;

pryanikov's avatar
pryanikov committed
163
164
165
	for (auto fi = filter.begin(), end = filter.end(); fi != end; ++fi) {
		const auto ri = event.m_row.find(fi->first);
		if (ri != event.m_row.end()) {
pryanikov's avatar
pryanikov committed
166
167
			// if it's not a key and event is delete - don't actually delete, just nullify
			ev->row[ fi->second.first ] = !fi->second.second && is_delete ? boost::any() : ri->second;
pryanikov's avatar
pryanikov committed
168
169
		}
	}
170
	stopped = cb(std::move(ev));
pryanikov's avatar
pryanikov committed
171
172
}

pryanikov's avatar
pryanikov committed
173
void DBReader::XidEventCallback(unsigned int server_id, const BinlogEventCallback& cb)
pryanikov's avatar
pryanikov committed
174
{
pryanikov's avatar
pryanikov committed
175
176
	if (stopped) return;
	// last_event_when = ::time(NULL);
pryanikov's avatar
pryanikov committed
177

pryanikov's avatar
pryanikov committed
178
	// send binlog position update event
179
180
181
182
	SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
	ev->binlog_name = state.getMasterLogName();
	ev->binlog_pos = state.getMasterLogPos();
	// ev->seconds_behind_master = GetSecondsBehindMaster();
pryanikov's avatar
pryanikov committed
183
	// ev->unix_timestamp = long(time(NULL));
184
185
	ev->event = "IGNORE";
	stopped = cb(std::move(ev));
pryanikov's avatar
pryanikov committed
186
187
}

pryanikov's avatar
pryanikov committed
188
void DBReader::DumpTablesCallback(
pryanikov's avatar
pryanikov committed
189
190
	const std::string& db_name,
	const std::string& tbl_name,
pryanikov's avatar
pryanikov committed
191
192
	const std::vector<std::pair<unsigned, slave::PtrField>>& filter,
	const nanomysql::fields_t& fields,
pryanikov's avatar
pryanikov committed
193
	const BinlogEventCallback& cb
pryanikov's avatar
pryanikov committed
194
) {
pryanikov's avatar
pryanikov committed
195
196
	if (stopped) return;

197
198
199
200
201
202
203
	SerializableBinlogEventPtr ev(new SerializableBinlogEvent);
	ev->binlog_name = "";
	ev->binlog_pos = 0;
	ev->database = db_name;
	ev->table = tbl_name;
	ev->event = "INSERT";
	// ev->seconds_behind_master = GetSecondsBehindMaster();
pryanikov's avatar
pryanikov committed
204
	// ev->unix_timestamp = long(time(NULL));
pryanikov's avatar
pryanikov committed
205

pryanikov's avatar
pryanikov committed
206
207
208
209
	for (const auto& it : filter) {
		slave::PtrField ptr_field = it.second;
		const auto& field = fields.at(ptr_field->field_name);
		if (field.is_null) {
210
			ev->row[ it.first ] = boost::any();
pryanikov's avatar
pryanikov committed
211
212
		} else {
			ptr_field->unpack_str(field.data);
213
			ev->row[ it.first ] = ptr_field->field_data;
pryanikov's avatar
pryanikov committed
214
215
		}
	}
pryanikov's avatar
pryanikov committed
216
	stopped = cb(std::move(ev));
pryanikov's avatar
pryanikov committed
217
218
}

pryanikov's avatar
pryanikov committed
219
220
221
// unsigned DBReader::GetSecondsBehindMaster() const {
// 	return std::max(::time(NULL) - last_event_when, 0L);
// }
pryanikov's avatar
pryanikov committed
222
223

} // replicator