main.cpp 9.63 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
3
4
#include <time.h>
#include <iostream>
#include <sstream>
#include <fstream>
pryanikov's avatar
pryanikov committed
5
#include <thread>
pryanikov's avatar
pryanikov committed
6
7
#include <signal.h>

pryanikov's avatar
pryanikov committed
8
#include <yaml-cpp/yaml.h>
pryanikov's avatar
pryanikov committed
9

pryanikov's avatar
pryanikov committed
10
#include "serializable.h"
pryanikov's avatar
pryanikov committed
11
12
13
#include "dbreader.h"
#include "tpwriter.h"
#include "serializable.h"
pryanikov's avatar
pryanikov committed
14
#include "queue.h"
pryanikov's avatar
pryanikov committed
15
16
17
#include "logger.h"

// =========
pryanikov's avatar
pryanikov committed
18
using std::placeholders::_1;
pryanikov's avatar
pryanikov committed
19
20
21
22
23
24
25
26

namespace replicator {

static const char *default_pid_filename = "/var/run/replicatord.pid";
static const char *default_log_filename = "/var/log/replicatord.log";
static const char *default_config_filename = "/usr/local/etc/replicatord.cfg";

static volatile bool is_term = false;
pryanikov's avatar
pryanikov committed
27
static volatile bool reset = false;
pryanikov's avatar
pryanikov committed
28

pryanikov's avatar
pryanikov committed
29
30
static std::string binlog_name;
static unsigned long binlog_pos;
pryanikov's avatar
pryanikov committed
31

pryanikov's avatar
pryanikov committed
32
33
static TPWriter *tpwriter = NULL;
static DBReader *dbreader = NULL;
pryanikov's avatar
pryanikov committed
34

pryanikov's avatar
pryanikov committed
35
static void sigint_handler(int sig);
pryanikov's avatar
pryanikov committed
36

37
static Queue<SerializableBinlogEventPtr> queue(50);
pryanikov's avatar
pryanikov committed
38
39
40

// ===============

pryanikov's avatar
pryanikov committed
41
static void tpwriter_worker()
pryanikov's avatar
pryanikov committed
42
{
pryanikov's avatar
pryanikov committed
43
44
	while (!is_term)
	{
pryanikov's avatar
pryanikov committed
45
		while (!tpwriter->Connect());
pryanikov's avatar
pryanikov committed
46

pryanikov's avatar
pryanikov committed
47
		// send initial binlog position to the db thread
pryanikov's avatar
pryanikov committed
48
		try {
pryanikov's avatar
pryanikov committed
49
			tpwriter->ReadBinlogPos(binlog_name, binlog_pos);
pryanikov's avatar
pryanikov committed
50
			reset = true;
pryanikov's avatar
pryanikov committed
51
			const std::chrono::milliseconds timeout(1000);
pryanikov's avatar
pryanikov committed
52

pryanikov's avatar
pryanikov committed
53
			while (!is_term) {
54
55
56
				for (unsigned cnt = queue.wait(timeout); cnt > 0; --cnt) {
					tpwriter->BinlogEventCallback(queue.pop());
				}
pryanikov's avatar
pryanikov committed
57
58
				tpwriter->Sync();
				tpwriter->RecvAll();
pryanikov's avatar
pryanikov committed
59
60
61
			}
		}
		catch (std::range_error& ex) {
pryanikov's avatar
pryanikov committed
62
			is_term = true;
pryanikov's avatar
pryanikov committed
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
			std::cout << ex.what() << std::endl;
			// loop exit
		}
		catch (std::exception& ex) {
			std::cout << ex.what() << std::endl;
			tpwriter->Disconnect();
			// reconnect
		}
	}

	tpwriter->Disconnect();
}

// ====================

78
static bool dbread_callback(SerializableBinlogEventPtr&& ev)
pryanikov's avatar
pryanikov committed
79
{
pryanikov's avatar
pryanikov committed
80
81
	if (is_term || reset) {
		return true;
pryanikov's avatar
pryanikov committed
82
83
	}

84
	queue.push(std::forward<SerializableBinlogEventPtr>(ev));
pryanikov's avatar
pryanikov committed
85
86
87
	return false;
}

pryanikov's avatar
pryanikov committed
88
static void mysql_worker()
pryanikov's avatar
pryanikov committed
89
90
{
	while (!is_term) {
pryanikov's avatar
pryanikov committed
91
92
93
		// read initial binlog pos from Tarantool
		while (!reset) ::sleep(1);
		reset = false;
pryanikov's avatar
pryanikov committed
94

pryanikov's avatar
pryanikov committed
95
96
97
98
99
100
101
102
103
104
105
106
107
		try {
			if (!is_term && !reset && binlog_name == "" && binlog_pos == 0) {
				std::cout << "Tarantool reported null binlog position. Dumping tables..." << std::endl;
				dbreader->DumpTables(binlog_name, binlog_pos, dbread_callback);
			}
			if (!is_term && !reset) {
				std::cout << "Reading binlogs (" << binlog_name << ", " << binlog_pos << ")..." << std::endl;
				dbreader->ReadBinlog(binlog_name, binlog_pos, dbread_callback);
			}
		}
		catch (std::exception& ex) {
			std::cerr << "Error in reading binlogs: " << ex.what() << std::endl;
			sigint_handler(0);
pryanikov's avatar
pryanikov committed
108
109
110
111
		}
	}
}

pryanikov's avatar
pryanikov committed
112
static void init(YAML::Node& cfg)
pryanikov's avatar
pryanikov committed
113
{
pryanikov's avatar
pryanikov committed
114
	try {
pryanikov's avatar
pryanikov committed
115
116
		// read Mysql settings
		{
pryanikov's avatar
pryanikov committed
117
			const YAML::Node& mysql = cfg["mysql"];
pryanikov's avatar
pryanikov committed
118

pryanikov's avatar
pryanikov committed
119
			nanomysql::mysql_conn_opts opts;
pryanikov's avatar
pryanikov committed
120
121
122
123
			opts.mysql_host = mysql["host"].as<std::string>();
			opts.mysql_port = mysql["port"].as<unsigned>();
			opts.mysql_user = mysql["user"].as<std::string>();
			opts.mysql_pass = mysql["password"].as<std::string>();
pryanikov's avatar
pryanikov committed
124

pryanikov's avatar
pryanikov committed
125
			dbreader = new DBReader(opts, mysql["connect_retry"].as<unsigned>());
pryanikov's avatar
pryanikov committed
126
127
128
		}
		// read Tarantool config
		{
pryanikov's avatar
pryanikov committed
129
			const YAML::Node& tarantool = cfg["tarantool"];
pryanikov's avatar
pryanikov committed
130

pryanikov's avatar
pryanikov committed
131
			tpwriter = new TPWriter(
pryanikov's avatar
pryanikov committed
132
133
134
135
136
137
				tarantool["host"].as<std::string>(),
				tarantool["user"].as<std::string>(),
				tarantool["password"].as<std::string>(),
				tarantool["binlog_pos_space"].as<unsigned>(),
				tarantool["binlog_pos_key"].as<unsigned>(),
				tarantool["connect_retry"].as<unsigned>(),
pryanikov's avatar
pryanikov committed
138
				tarantool["sync_retry"].as<unsigned>()
pryanikov's avatar
pryanikov committed
139
			);
pryanikov's avatar
pryanikov committed
140
141
142
		}
		// read Mysql to Tarantool mappings (each table maps to a single Tarantool space)
		{
pryanikov's avatar
pryanikov committed
143
			std::map<unsigned, bool> has_primary;
pryanikov's avatar
pryanikov committed
144
145
146
147
148
149
150
151
152
153
154
155
156
			const YAML::Node& mappings = cfg["mappings"];

			for (int i = 0; i < mappings.size(); i++) {
				const YAML::Node& mapping = mappings[i];

				const std::string database = mapping["database"].as<std::string>();
				const std::string table = mapping["table"].as<std::string>();

				std::string insert_call = mapping["insert_call"] ? mapping["insert_call"].as<std::string>() : TPWriter::empty_call;
				std::string update_call = mapping["update_call"] ? mapping["update_call"].as<std::string>() : TPWriter::empty_call;
				std::string delete_call = mapping["delete_call"] ? mapping["delete_call"].as<std::string>() : TPWriter::empty_call;

				const unsigned space = mapping["space"].as<unsigned>();
pryanikov's avatar
pryanikov committed
157
158
				std::map<std::string, std::pair<unsigned, bool>> columns;
				std::vector<unsigned> keys;
pryanikov's avatar
pryanikov committed
159
				unsigned index_max = tpwriter->space_last_id[space];
pryanikov's avatar
pryanikov committed
160

pryanikov's avatar
pryanikov committed
161
162
163
164
165
166
167
				bool is_primary;
				if (has_primary.find(space) == has_primary.end()) {
					is_primary = has_primary[space] = true;;
				} else {
					is_primary = false;
				}

pryanikov's avatar
pryanikov committed
168
				// read key tarantool fields we'll use for delete requests
pryanikov's avatar
pryanikov committed
169
				{
pryanikov's avatar
pryanikov committed
170
171
172
					const YAML::Node& keys_ = mapping["key_fields"];
					for (int i = 0; i < keys_.size(); i++) {
						unsigned key = keys_[i].as<unsigned>();
pryanikov's avatar
pryanikov committed
173
174
						index_max = std::max(index_max, key);
						keys.push_back(key);
pryanikov's avatar
pryanikov committed
175
176
					}
				}
pryanikov's avatar
pryanikov committed
177
				// read columns tuple
pryanikov's avatar
pryanikov committed
178
				{
pryanikov's avatar
pryanikov committed
179
180
					const YAML::Node& columns_ = mapping["columns"];
					for (int i = 0; i < columns_.size(); i++) {
pryanikov's avatar
pryanikov committed
181
182
183
184
185
186
187
						const bool is_key = i < keys.size();
						const unsigned index = is_key ? keys[i] : ++index_max;
						columns.emplace(
							std::piecewise_construct,
							std::forward_as_tuple(columns_[i].as<std::string>()),
							std::forward_as_tuple(index, is_key)
						);
pryanikov's avatar
pryanikov committed
188
189
190
					}
				}

pryanikov's avatar
pryanikov committed
191
192
				dbreader->AddTable(database, table, columns, is_primary);
				std::sort(keys.begin(), keys.end());
pryanikov's avatar
pryanikov committed
193
194
				tpwriter->AddTable(database, table, space, keys, insert_call, update_call, delete_call);
				tpwriter->space_last_id[space] = index_max;
pryanikov's avatar
pryanikov committed
195
196
			}
		}
pryanikov's avatar
pryanikov committed
197
		// read space settings
pryanikov's avatar
pryanikov committed
198
		{
pryanikov's avatar
pryanikov committed
199
200
201
202
203
204
205
206
207
208
209
210
			const YAML::Node& spaces = cfg["spaces"];

			for (auto its = spaces.begin(); its != spaces.end(); ++its) {
				unsigned space = its->first.as<unsigned>();
				std::map<unsigned, SerializableValue>& rn_ = tpwriter->replace_null[ space ];
				const YAML::Node& replace_null = its->second["replace_null"];

				for (auto itrn = replace_null.begin(); itrn != replace_null.end(); ++itrn) {
					const YAML::Node& field = itrn->second;
					auto itf = field.begin();
					if (itf == field.end()) continue;

pryanikov's avatar
pryanikov committed
211
					const unsigned index = itrn->first.as<unsigned>();
pryanikov's avatar
pryanikov committed
212
213
214
					std::string type = itf->first.as<std::string>();
					const YAML::Node& value = itf->second;

pryanikov's avatar
pryanikov committed
215
216
					if (type == "str" || type == "string") {
						rn_[ index ] = value.as<std::string>();
pryanikov's avatar
pryanikov committed
217
					} else if (type == "unsigned") {
pryanikov's avatar
pryanikov committed
218
219
220
						rn_[ index ] = value.as<unsigned long long>();
					} else if (type == "int" || type == "integer") {
						rn_[ index ] = value.as<long long>();
pryanikov's avatar
pryanikov committed
221
222
223
224
225
					} else {
						std::cerr << "Config error: unknown type for non-null value for column " << index << std::endl;
						exit(EXIT_FAILURE);
					}
				}
pryanikov's avatar
pryanikov committed
226
227
228
			}
		}
	}
pryanikov's avatar
pryanikov committed
229
	catch(YAML::Exception& ex)
pryanikov's avatar
pryanikov committed
230
	{
pryanikov's avatar
pryanikov committed
231
		std::cerr << "Config error: " << ex.what() << std::endl;
pryanikov's avatar
pryanikov committed
232
233
234
235
236
237
238
239
		exit(EXIT_FAILURE);
	}
}

static void shutdown()
{
	if (dbreader) {
		// sighandler protection
pryanikov's avatar
pryanikov committed
240
		auto dbreader_ = dbreader;
pryanikov's avatar
pryanikov committed
241
242
243
		dbreader = NULL;
		delete dbreader_;
	}
pryanikov's avatar
pryanikov committed
244
245
246
247
	if (tpwriter) {
		auto tpwriter_ = tpwriter;
		tpwriter = NULL;
		delete tpwriter_;
pryanikov's avatar
pryanikov committed
248
249
250
	}
}

pryanikov's avatar
pryanikov committed
251
static void sigint_handler(int sig)
pryanikov's avatar
pryanikov committed
252
253
254
{
	std::cerr << "Terminating" << std::endl;
	is_term = true;
pryanikov's avatar
pryanikov committed
255

pryanikov's avatar
pryanikov committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
	if (dbreader) {
		dbreader->Stop();
	}
}

}

static replicator::Logger *ol, *el;
static std::streambuf *ol_sink, *el_sink;
static std::ofstream *flog;

static std::string log_filename(replicator::default_log_filename);
static std::string pid_filename(replicator::default_pid_filename);

static void writepidtofile()
{
	// write pid to file
	std::ofstream fpid(pid_filename);
	fpid << getpid();
	fpid.flush();
	fpid.close();
}

static void removepidfile()
{
	unlink(pid_filename.c_str());
}

static void openlogfile()
{
	flog = new std::ofstream(log_filename, std::ofstream::app);

	// redirect cout and cerr streams, appending timestamps and log levels
	ol = new replicator::Logger(std::cout, 'I');
	el = new replicator::Logger(std::cerr, 'E');

	ol_sink = ol->rdsink();
	el_sink = el->rdsink();

	// redirect loggers to file
	ol->rdsink(flog->rdbuf());
	el->rdsink(flog->rdbuf());
}

static void closelogfile()
{
	if (flog == NULL) {
		return;
	}

	flog->flush();
	flog->close();

	delete flog;
	flog = NULL;

	// restore streams
	ol->rdsink(ol_sink);
	el->rdsink(el_sink);

	delete ol;
	delete el;

	ol = NULL;
	el = NULL;
}

static void sighup_handler(int sig)
{
	closelogfile();
	openlogfile();
	std::cout << "Caught SIGHUP, continuing..." << std::endl;
}

int main(int argc, char** argv)
{
	bool print_usage = false;
	std::string config_name(replicator::default_config_filename);

	int c;
	while (-1 != (c = ::getopt(argc, argv, "c:l:i:zp")))
	{
		switch (c)
		{
			case 'c': config_name = optarg; break;
			case 'p': print_usage = true; break;
			case 'l': log_filename = optarg; break;
			case 'i': pid_filename = optarg; break;
			default: print_usage = true; break;
		}
	}

	if (print_usage) {
pryanikov's avatar
pryanikov committed
349
		std::cout
pryanikov's avatar
pryanikov committed
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
			<< "Usage: " << argv[0] << " [-c <config_name>]" << " [-l <log_name>]"<< " [-i <pid_name>]" << " [-p]" << std::endl
			<< " -c configuration file (" << config_name << ")" << std::endl
			<< " -p print usage" << std::endl
			<< " -l log filename (" << log_filename << ")" << std::endl
			<< " -i pid filename (" << pid_filename << ")" << std::endl
			;
		return 1;
	}

	writepidtofile();
	atexit(removepidfile);

	openlogfile();
	atexit(closelogfile);

pryanikov's avatar
pryanikov committed
365
	YAML::Node cfg;
pryanikov's avatar
pryanikov committed
366
367

	// Read the file. If there is an error, report it and exit.
pryanikov's avatar
pryanikov committed
368
	try {
pryanikov's avatar
pryanikov committed
369
		cfg = YAML::LoadFile(config_name.c_str());
pryanikov's avatar
pryanikov committed
370
	}
pryanikov's avatar
pryanikov committed
371
	catch(YAML::Exception& ex)
pryanikov's avatar
pryanikov committed
372
	{
pryanikov's avatar
pryanikov committed
373
		std::cerr << "Config error: " << ex.what() << std::endl;
pryanikov's avatar
pryanikov committed
374
375
376
		return EXIT_FAILURE;
	}

pryanikov's avatar
pryanikov committed
377
378
	signal(SIGINT, replicator::sigint_handler);
	signal(SIGTERM, replicator::sigint_handler);
pryanikov's avatar
pryanikov committed
379
380
381
382
	signal(SIGHUP, sighup_handler);

	replicator::init(cfg);

pryanikov's avatar
pryanikov committed
383
384
	std::thread t1(replicator::mysql_worker);
	std::thread t2(replicator::tpwriter_worker);
pryanikov's avatar
pryanikov committed
385

pryanikov's avatar
pryanikov committed
386
387
	t2.join();
	t1.join();
pryanikov's avatar
pryanikov committed
388

pryanikov's avatar
pryanikov committed
389
	replicator::shutdown();
pryanikov's avatar
pryanikov committed
390
391
392

	return 0;
}