main.cpp 9.41 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
3
4
#include <time.h>
#include <iostream>
#include <sstream>
#include <fstream>
pryanikov's avatar
pryanikov committed
5
#include <thread>
pryanikov's avatar
pryanikov committed
6
7
#include <signal.h>

pryanikov's avatar
pryanikov committed
8
#include <yaml-cpp/yaml.h>
pryanikov's avatar
pryanikov committed
9

pryanikov's avatar
pryanikov committed
10
#include "serializable.h"
pryanikov's avatar
pryanikov committed
11
12
13
#include "dbreader.h"
#include "tpwriter.h"
#include "serializable.h"
pryanikov's avatar
pryanikov committed
14
#include "queue.h"
pryanikov's avatar
pryanikov committed
15
16
17
#include "logger.h"

// =========
pryanikov's avatar
pryanikov committed
18
using std::placeholders::_1;
pryanikov's avatar
pryanikov committed
19
20
21
22
23
24
25
26

namespace replicator {

static const char *default_pid_filename = "/var/run/replicatord.pid";
static const char *default_log_filename = "/var/log/replicatord.log";
static const char *default_config_filename = "/usr/local/etc/replicatord.cfg";

static volatile bool is_term = false;
pryanikov's avatar
pryanikov committed
27
static volatile bool reset = false;
pryanikov's avatar
pryanikov committed
28

pryanikov's avatar
pryanikov committed
29
30
static std::string binlog_name;
static unsigned long binlog_pos;
pryanikov's avatar
pryanikov committed
31

pryanikov's avatar
pryanikov committed
32
33
static TPWriter *tpwriter = NULL;
static DBReader *dbreader = NULL;
pryanikov's avatar
pryanikov committed
34

pryanikov's avatar
pryanikov committed
35
static void sigint_handler(int sig);
pryanikov's avatar
pryanikov committed
36

pryanikov's avatar
pryanikov committed
37
static Queue<SerializableBinlogEvent> queue(100);
pryanikov's avatar
pryanikov committed
38
39
40

// ===============

pryanikov's avatar
pryanikov committed
41
static void tpwriter_worker()
pryanikov's avatar
pryanikov committed
42
{
pryanikov's avatar
pryanikov committed
43
44
45
	while (!is_term)
	{
		if (!tpwriter->Connect()) continue;
pryanikov's avatar
pryanikov committed
46
47
48

		// send initial binlog position to the main thread
		try {
pryanikov's avatar
pryanikov committed
49
			if (!tpwriter->ReadBinlogPos(binlog_name, binlog_pos)) {
pryanikov's avatar
pryanikov committed
50
51
52
53
				tpwriter->Disconnect();
				continue;
			}

pryanikov's avatar
pryanikov committed
54
			reset = true;
pryanikov's avatar
pryanikov committed
55

pryanikov's avatar
pryanikov committed
56
57
58
59
60
61
62
63
64
			while (!is_term)
			{
				queue.fetch(std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1), std::chrono::milliseconds(1000), 100);
				tpwriter->Sync();

				if (!is_term && tpwriter->ReadReply() < 0) {
					const auto code = tpwriter->GetReplyCode();
					if (code) {
						std::cerr << "Tarantool error: " << tpwriter->GetReplyErrorMessage() << " (code: " << code << ")" << std::endl;
pryanikov's avatar
pryanikov committed
65
66
67
68
69
					}
				}
			}
		}
		catch (std::range_error& ex) {
pryanikov's avatar
pryanikov committed
70
			is_term = true;
pryanikov's avatar
pryanikov committed
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
			std::cout << ex.what() << std::endl;
			// loop exit
		}
		catch (std::exception& ex) {
			std::cout << ex.what() << std::endl;
			tpwriter->Disconnect();
			// reconnect
		}
	}

	tpwriter->Disconnect();
}

// ====================

pryanikov's avatar
pryanikov committed
86
static bool dbread_callback(const SerializableBinlogEvent &ev)
pryanikov's avatar
pryanikov committed
87
{
pryanikov's avatar
pryanikov committed
88
89
	if (is_term || reset) {
		return true;
pryanikov's avatar
pryanikov committed
90
91
	}

pryanikov's avatar
pryanikov committed
92
	queue.push(ev);
pryanikov's avatar
pryanikov committed
93
94
95
	return false;
}

pryanikov's avatar
pryanikov committed
96
static void mysql_worker()
pryanikov's avatar
pryanikov committed
97
98
{
	while (!is_term) {
pryanikov's avatar
pryanikov committed
99
100
101
		// read initial binlog pos from Tarantool
		while (!reset) ::sleep(1);
		reset = false;
pryanikov's avatar
pryanikov committed
102

pryanikov's avatar
pryanikov committed
103
104
105
106
107
108
109
110
111
112
113
114
115
		try {
			if (!is_term && !reset && binlog_name == "" && binlog_pos == 0) {
				std::cout << "Tarantool reported null binlog position. Dumping tables..." << std::endl;
				dbreader->DumpTables(binlog_name, binlog_pos, dbread_callback);
			}
			if (!is_term && !reset) {
				std::cout << "Reading binlogs (" << binlog_name << ", " << binlog_pos << ")..." << std::endl;
				dbreader->ReadBinlog(binlog_name, binlog_pos, dbread_callback);
			}
		}
		catch (std::exception& ex) {
			std::cerr << "Error in reading binlogs: " << ex.what() << std::endl;
			sigint_handler(0);
pryanikov's avatar
pryanikov committed
116
117
118
119
		}
	}
}

pryanikov's avatar
pryanikov committed
120
static void init(YAML::Node& cfg)
pryanikov's avatar
pryanikov committed
121
{
pryanikov's avatar
pryanikov committed
122
	try {
pryanikov's avatar
pryanikov committed
123
124
		// read Mysql settings
		{
pryanikov's avatar
pryanikov committed
125
			const YAML::Node& mysql = cfg["mysql"];
pryanikov's avatar
pryanikov committed
126

pryanikov's avatar
pryanikov committed
127
			nanomysql::mysql_conn_opts opts;
pryanikov's avatar
pryanikov committed
128
129
130
131
			opts.mysql_host = mysql["host"].as<std::string>();
			opts.mysql_port = mysql["port"].as<unsigned>();
			opts.mysql_user = mysql["user"].as<std::string>();
			opts.mysql_pass = mysql["password"].as<std::string>();
pryanikov's avatar
pryanikov committed
132

pryanikov's avatar
pryanikov committed
133
			dbreader = new DBReader(opts, mysql["connect_retry"].as<unsigned>());
pryanikov's avatar
pryanikov committed
134
135
136
		}
		// read Tarantool config
		{
pryanikov's avatar
pryanikov committed
137
			const YAML::Node& tarantool = cfg["tarantool"];
pryanikov's avatar
pryanikov committed
138

pryanikov's avatar
pryanikov committed
139
			tpwriter = new TPWriter(
pryanikov's avatar
pryanikov committed
140
141
142
143
144
145
				tarantool["host"].as<std::string>(),
				tarantool["user"].as<std::string>(),
				tarantool["password"].as<std::string>(),
				tarantool["binlog_pos_space"].as<unsigned>(),
				tarantool["binlog_pos_key"].as<unsigned>(),
				tarantool["connect_retry"].as<unsigned>(),
pryanikov's avatar
pryanikov committed
146
				tarantool["sync_retry"].as<unsigned>()
pryanikov's avatar
pryanikov committed
147
			);
pryanikov's avatar
pryanikov committed
148
149
150
		}
		// read Mysql to Tarantool mappings (each table maps to a single Tarantool space)
		{
pryanikov's avatar
pryanikov committed
151
152
153
154
155
156
157
158
159
160
161
162
163
			const YAML::Node& mappings = cfg["mappings"];

			for (int i = 0; i < mappings.size(); i++) {
				const YAML::Node& mapping = mappings[i];

				const std::string database = mapping["database"].as<std::string>();
				const std::string table = mapping["table"].as<std::string>();

				std::string insert_call = mapping["insert_call"] ? mapping["insert_call"].as<std::string>() : TPWriter::empty_call;
				std::string update_call = mapping["update_call"] ? mapping["update_call"].as<std::string>() : TPWriter::empty_call;
				std::string delete_call = mapping["delete_call"] ? mapping["delete_call"].as<std::string>() : TPWriter::empty_call;

				const unsigned space = mapping["space"].as<unsigned>();
pryanikov's avatar
pryanikov committed
164
165
166
				std::map<std::string, unsigned> columns;
				TPWriter::Tuple keys;
				unsigned index_max = tpwriter->space_last_id[space];
pryanikov's avatar
pryanikov committed
167

pryanikov's avatar
pryanikov committed
168
				// read key tarantool fields we'll use for delete requests
pryanikov's avatar
pryanikov committed
169
				{
pryanikov's avatar
pryanikov committed
170
171
172
					const YAML::Node& keys_ = mapping["key_fields"];
					for (int i = 0; i < keys_.size(); i++) {
						unsigned key = keys_[i].as<unsigned>();
pryanikov's avatar
pryanikov committed
173
174
						index_max = std::max(index_max, key);
						keys.push_back(key);
pryanikov's avatar
pryanikov committed
175
176
					}
				}
pryanikov's avatar
pryanikov committed
177
				// read columns tuple
pryanikov's avatar
pryanikov committed
178
				{
pryanikov's avatar
pryanikov committed
179
180
					const YAML::Node& columns_ = mapping["columns"];
					for (int i = 0; i < columns_.size(); i++) {
pryanikov's avatar
pryanikov committed
181
						unsigned index = i < keys.size() ? keys[i] : ++index_max;
pryanikov's avatar
pryanikov committed
182
						columns[ columns_[i].as<std::string>() ] = index;
pryanikov's avatar
pryanikov committed
183
184
185
					}
				}

pryanikov's avatar
pryanikov committed
186
				dbreader->AddTable(database, table, columns, mapping["dump"].as<bool>());
pryanikov's avatar
pryanikov committed
187
188
				tpwriter->AddTable(database, table, space, keys, insert_call, update_call, delete_call);
				tpwriter->space_last_id[space] = index_max;
pryanikov's avatar
pryanikov committed
189
190
			}
		}
pryanikov's avatar
pryanikov committed
191
		// read space settings
pryanikov's avatar
pryanikov committed
192
		{
pryanikov's avatar
pryanikov committed
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
			const YAML::Node& spaces = cfg["spaces"];

			for (auto its = spaces.begin(); its != spaces.end(); ++its) {
				unsigned space = its->first.as<unsigned>();
				std::map<unsigned, SerializableValue>& rn_ = tpwriter->replace_null[ space ];
				const YAML::Node& replace_null = its->second["replace_null"];

				for (auto itrn = replace_null.begin(); itrn != replace_null.end(); ++itrn) {
					const YAML::Node& field = itrn->second;
					auto itf = field.begin();
					if (itf == field.end()) continue;

					unsigned index = itrn->first.as<unsigned>();
					std::string type = itf->first.as<std::string>();
					const YAML::Node& value = itf->second;

pryanikov's avatar
pryanikov committed
209
210
					if (type == "str" || type == "string") {
						rn_[ index ] = value.as<std::string>();
pryanikov's avatar
pryanikov committed
211
					} else if (type == "unsigned") {
pryanikov's avatar
pryanikov committed
212
213
214
						rn_[ index ] = value.as<unsigned long long>();
					} else if (type == "int" || type == "integer") {
						rn_[ index ] = value.as<long long>();
pryanikov's avatar
pryanikov committed
215
216
217
218
219
					} else {
						std::cerr << "Config error: unknown type for non-null value for column " << index << std::endl;
						exit(EXIT_FAILURE);
					}
				}
pryanikov's avatar
pryanikov committed
220
221
222
			}
		}
	}
pryanikov's avatar
pryanikov committed
223
	catch(YAML::Exception& ex)
pryanikov's avatar
pryanikov committed
224
	{
pryanikov's avatar
pryanikov committed
225
		std::cerr << "Config error: " << ex.what() << std::endl;
pryanikov's avatar
pryanikov committed
226
227
228
229
230
231
232
233
		exit(EXIT_FAILURE);
	}
}

static void shutdown()
{
	if (dbreader) {
		// sighandler protection
pryanikov's avatar
pryanikov committed
234
		auto dbreader_ = dbreader;
pryanikov's avatar
pryanikov committed
235
236
237
		dbreader = NULL;
		delete dbreader_;
	}
pryanikov's avatar
pryanikov committed
238
239
240
241
	if (tpwriter) {
		auto tpwriter_ = tpwriter;
		tpwriter = NULL;
		delete tpwriter_;
pryanikov's avatar
pryanikov committed
242
243
244
	}
}

pryanikov's avatar
pryanikov committed
245
static void sigint_handler(int sig)
pryanikov's avatar
pryanikov committed
246
247
248
{
	std::cerr << "Terminating" << std::endl;
	is_term = true;
pryanikov's avatar
pryanikov committed
249

pryanikov's avatar
pryanikov committed
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
	if (dbreader) {
		dbreader->Stop();
	}
}

}

static replicator::Logger *ol, *el;
static std::streambuf *ol_sink, *el_sink;
static std::ofstream *flog;

static std::string log_filename(replicator::default_log_filename);
static std::string pid_filename(replicator::default_pid_filename);

static void writepidtofile()
{
	// write pid to file
	std::ofstream fpid(pid_filename);
	fpid << getpid();
	fpid.flush();
	fpid.close();
}

static void removepidfile()
{
	unlink(pid_filename.c_str());
}

static void openlogfile()
{
	flog = new std::ofstream(log_filename, std::ofstream::app);

	// redirect cout and cerr streams, appending timestamps and log levels
	ol = new replicator::Logger(std::cout, 'I');
	el = new replicator::Logger(std::cerr, 'E');

	ol_sink = ol->rdsink();
	el_sink = el->rdsink();

	// redirect loggers to file
	ol->rdsink(flog->rdbuf());
	el->rdsink(flog->rdbuf());
}

static void closelogfile()
{
	if (flog == NULL) {
		return;
	}

	flog->flush();
	flog->close();

	delete flog;
	flog = NULL;

	// restore streams
	ol->rdsink(ol_sink);
	el->rdsink(el_sink);

	delete ol;
	delete el;

	ol = NULL;
	el = NULL;
}

static void sighup_handler(int sig)
{
	closelogfile();
	openlogfile();
	std::cout << "Caught SIGHUP, continuing..." << std::endl;
}

int main(int argc, char** argv)
{
	bool print_usage = false;
	std::string config_name(replicator::default_config_filename);

	int c;
	while (-1 != (c = ::getopt(argc, argv, "c:l:i:zp")))
	{
		switch (c)
		{
			case 'c': config_name = optarg; break;
			case 'p': print_usage = true; break;
			case 'l': log_filename = optarg; break;
			case 'i': pid_filename = optarg; break;
			default: print_usage = true; break;
		}
	}

	if (print_usage) {
pryanikov's avatar
pryanikov committed
343
		std::cout
pryanikov's avatar
pryanikov committed
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
			<< "Usage: " << argv[0] << " [-c <config_name>]" << " [-l <log_name>]"<< " [-i <pid_name>]" << " [-p]" << std::endl
			<< " -c configuration file (" << config_name << ")" << std::endl
			<< " -p print usage" << std::endl
			<< " -l log filename (" << log_filename << ")" << std::endl
			<< " -i pid filename (" << pid_filename << ")" << std::endl
			;
		return 1;
	}

	writepidtofile();
	atexit(removepidfile);

	openlogfile();
	atexit(closelogfile);

pryanikov's avatar
pryanikov committed
359
	YAML::Node cfg;
pryanikov's avatar
pryanikov committed
360
361

	// Read the file. If there is an error, report it and exit.
pryanikov's avatar
pryanikov committed
362
	try {
pryanikov's avatar
pryanikov committed
363
		cfg = YAML::LoadFile(config_name.c_str());
pryanikov's avatar
pryanikov committed
364
	}
pryanikov's avatar
pryanikov committed
365
	catch(YAML::Exception& ex)
pryanikov's avatar
pryanikov committed
366
	{
pryanikov's avatar
pryanikov committed
367
		std::cerr << "Config error: " << ex.what() << std::endl;
pryanikov's avatar
pryanikov committed
368
369
370
		return EXIT_FAILURE;
	}

pryanikov's avatar
pryanikov committed
371
372
	signal(SIGINT, replicator::sigint_handler);
	signal(SIGTERM, replicator::sigint_handler);
pryanikov's avatar
pryanikov committed
373
374
375
376
	signal(SIGHUP, sighup_handler);

	replicator::init(cfg);

pryanikov's avatar
pryanikov committed
377
378
	std::thread t1(replicator::mysql_worker);
	std::thread t2(replicator::tpwriter_worker);
pryanikov's avatar
pryanikov committed
379

pryanikov's avatar
pryanikov committed
380
381
	t2.join();
	t1.join();
pryanikov's avatar
pryanikov committed
382

pryanikov's avatar
pryanikov committed
383
	replicator::shutdown();
pryanikov's avatar
pryanikov committed
384
385
386

	return 0;
}