main.cpp 9.3 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
3
4
#include <time.h>
#include <iostream>
#include <sstream>
#include <fstream>
pryanikov's avatar
pryanikov committed
5
#include <thread>
pryanikov's avatar
pryanikov committed
6
7
#include <signal.h>

pryanikov's avatar
pryanikov committed
8
#include <yaml-cpp/yaml.h>
pryanikov's avatar
pryanikov committed
9

pryanikov's avatar
pryanikov committed
10
#include "serializable.h"
pryanikov's avatar
pryanikov committed
11
12
13
#include "dbreader.h"
#include "tpwriter.h"
#include "serializable.h"
pryanikov's avatar
pryanikov committed
14
#include "queue.h"
pryanikov's avatar
pryanikov committed
15
16
17
#include "logger.h"

// =========
pryanikov's avatar
pryanikov committed
18
using std::placeholders::_1;
pryanikov's avatar
pryanikov committed
19
20
21
22
23
24
25
26

namespace replicator {

static const char *default_pid_filename = "/var/run/replicatord.pid";
static const char *default_log_filename = "/var/log/replicatord.log";
static const char *default_config_filename = "/usr/local/etc/replicatord.cfg";

static volatile bool is_term = false;
pryanikov's avatar
pryanikov committed
27
static volatile bool reset = false;
pryanikov's avatar
pryanikov committed
28

pryanikov's avatar
pryanikov committed
29
30
static std::string binlog_name;
static unsigned long binlog_pos;
pryanikov's avatar
pryanikov committed
31

pryanikov's avatar
pryanikov committed
32
33
static TPWriter *tpwriter = NULL;
static DBReader *dbreader = NULL;
pryanikov's avatar
pryanikov committed
34

pryanikov's avatar
pryanikov committed
35
static void sigint_handler(int sig);
pryanikov's avatar
pryanikov committed
36

pryanikov's avatar
pryanikov committed
37
static Queue<SerializableBinlogEvent> queue(50);
pryanikov's avatar
pryanikov committed
38
39
40

// ===============

pryanikov's avatar
pryanikov committed
41
static void tpwriter_worker()
pryanikov's avatar
pryanikov committed
42
{
pryanikov's avatar
pryanikov committed
43
44
	while (!is_term)
	{
pryanikov's avatar
pryanikov committed
45
		while (!tpwriter->Connect());
pryanikov's avatar
pryanikov committed
46

pryanikov's avatar
pryanikov committed
47
		// send initial binlog position to the db thread
pryanikov's avatar
pryanikov committed
48
		try {
pryanikov's avatar
pryanikov committed
49
			tpwriter->ReadBinlogPos(binlog_name, binlog_pos);
pryanikov's avatar
pryanikov committed
50
			reset = true;
pryanikov's avatar
pryanikov committed
51

pryanikov's avatar
pryanikov committed
52
53
			const std::chrono::milliseconds timeout(1000);
			const auto cb_fetch = std::bind(&TPWriter::BinlogEventCallback, tpwriter, _1);
pryanikov's avatar
pryanikov committed
54

pryanikov's avatar
pryanikov committed
55
			while (!is_term) {
pryanikov's avatar
pryanikov committed
56
57
58
				// for (unsigned cnt = queue.size(); cnt > 0; --cnt) {
				// 	tpwriter->BinlogEventCallback(queue.pop());
				// }
pryanikov's avatar
pryanikov committed
59
60
61
				queue.try_fetch(cb_fetch, timeout);
				tpwriter->Sync();
				tpwriter->RecvAll();
pryanikov's avatar
pryanikov committed
62
63
64
			}
		}
		catch (std::range_error& ex) {
pryanikov's avatar
pryanikov committed
65
			is_term = true;
pryanikov's avatar
pryanikov committed
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
			std::cout << ex.what() << std::endl;
			// loop exit
		}
		catch (std::exception& ex) {
			std::cout << ex.what() << std::endl;
			tpwriter->Disconnect();
			// reconnect
		}
	}

	tpwriter->Disconnect();
}

// ====================

pryanikov's avatar
pryanikov committed
81
static bool dbread_callback(const SerializableBinlogEvent &ev)
pryanikov's avatar
pryanikov committed
82
{
pryanikov's avatar
pryanikov committed
83
84
	if (is_term || reset) {
		return true;
pryanikov's avatar
pryanikov committed
85
86
	}

pryanikov's avatar
pryanikov committed
87
	queue.push(ev);
pryanikov's avatar
pryanikov committed
88
89
90
	return false;
}

pryanikov's avatar
pryanikov committed
91
static void mysql_worker()
pryanikov's avatar
pryanikov committed
92
93
{
	while (!is_term) {
pryanikov's avatar
pryanikov committed
94
95
96
		// read initial binlog pos from Tarantool
		while (!reset) ::sleep(1);
		reset = false;
pryanikov's avatar
pryanikov committed
97

pryanikov's avatar
pryanikov committed
98
99
100
101
102
103
104
105
106
107
108
109
110
		try {
			if (!is_term && !reset && binlog_name == "" && binlog_pos == 0) {
				std::cout << "Tarantool reported null binlog position. Dumping tables..." << std::endl;
				dbreader->DumpTables(binlog_name, binlog_pos, dbread_callback);
			}
			if (!is_term && !reset) {
				std::cout << "Reading binlogs (" << binlog_name << ", " << binlog_pos << ")..." << std::endl;
				dbreader->ReadBinlog(binlog_name, binlog_pos, dbread_callback);
			}
		}
		catch (std::exception& ex) {
			std::cerr << "Error in reading binlogs: " << ex.what() << std::endl;
			sigint_handler(0);
pryanikov's avatar
pryanikov committed
111
112
113
114
		}
	}
}

pryanikov's avatar
pryanikov committed
115
static void init(YAML::Node& cfg)
pryanikov's avatar
pryanikov committed
116
{
pryanikov's avatar
pryanikov committed
117
	try {
pryanikov's avatar
pryanikov committed
118
119
		// read Mysql settings
		{
pryanikov's avatar
pryanikov committed
120
			const YAML::Node& mysql = cfg["mysql"];
pryanikov's avatar
pryanikov committed
121

pryanikov's avatar
pryanikov committed
122
			nanomysql::mysql_conn_opts opts;
pryanikov's avatar
pryanikov committed
123
124
125
126
			opts.mysql_host = mysql["host"].as<std::string>();
			opts.mysql_port = mysql["port"].as<unsigned>();
			opts.mysql_user = mysql["user"].as<std::string>();
			opts.mysql_pass = mysql["password"].as<std::string>();
pryanikov's avatar
pryanikov committed
127

pryanikov's avatar
pryanikov committed
128
			dbreader = new DBReader(opts, mysql["connect_retry"].as<unsigned>());
pryanikov's avatar
pryanikov committed
129
130
131
		}
		// read Tarantool config
		{
pryanikov's avatar
pryanikov committed
132
			const YAML::Node& tarantool = cfg["tarantool"];
pryanikov's avatar
pryanikov committed
133

pryanikov's avatar
pryanikov committed
134
			tpwriter = new TPWriter(
pryanikov's avatar
pryanikov committed
135
136
137
138
139
140
				tarantool["host"].as<std::string>(),
				tarantool["user"].as<std::string>(),
				tarantool["password"].as<std::string>(),
				tarantool["binlog_pos_space"].as<unsigned>(),
				tarantool["binlog_pos_key"].as<unsigned>(),
				tarantool["connect_retry"].as<unsigned>(),
pryanikov's avatar
pryanikov committed
141
				tarantool["sync_retry"].as<unsigned>()
pryanikov's avatar
pryanikov committed
142
			);
pryanikov's avatar
pryanikov committed
143
144
145
		}
		// read Mysql to Tarantool mappings (each table maps to a single Tarantool space)
		{
pryanikov's avatar
pryanikov committed
146
147
148
149
150
151
152
153
154
155
156
157
158
			const YAML::Node& mappings = cfg["mappings"];

			for (int i = 0; i < mappings.size(); i++) {
				const YAML::Node& mapping = mappings[i];

				const std::string database = mapping["database"].as<std::string>();
				const std::string table = mapping["table"].as<std::string>();

				std::string insert_call = mapping["insert_call"] ? mapping["insert_call"].as<std::string>() : TPWriter::empty_call;
				std::string update_call = mapping["update_call"] ? mapping["update_call"].as<std::string>() : TPWriter::empty_call;
				std::string delete_call = mapping["delete_call"] ? mapping["delete_call"].as<std::string>() : TPWriter::empty_call;

				const unsigned space = mapping["space"].as<unsigned>();
pryanikov's avatar
pryanikov committed
159
160
161
				std::map<std::string, unsigned> columns;
				TPWriter::Tuple keys;
				unsigned index_max = tpwriter->space_last_id[space];
pryanikov's avatar
pryanikov committed
162

pryanikov's avatar
pryanikov committed
163
				// read key tarantool fields we'll use for delete requests
pryanikov's avatar
pryanikov committed
164
				{
pryanikov's avatar
pryanikov committed
165
166
167
					const YAML::Node& keys_ = mapping["key_fields"];
					for (int i = 0; i < keys_.size(); i++) {
						unsigned key = keys_[i].as<unsigned>();
pryanikov's avatar
pryanikov committed
168
169
						index_max = std::max(index_max, key);
						keys.push_back(key);
pryanikov's avatar
pryanikov committed
170
171
					}
				}
pryanikov's avatar
pryanikov committed
172
				// read columns tuple
pryanikov's avatar
pryanikov committed
173
				{
pryanikov's avatar
pryanikov committed
174
175
					const YAML::Node& columns_ = mapping["columns"];
					for (int i = 0; i < columns_.size(); i++) {
pryanikov's avatar
pryanikov committed
176
						unsigned index = i < keys.size() ? keys[i] : ++index_max;
pryanikov's avatar
pryanikov committed
177
						columns[ columns_[i].as<std::string>() ] = index;
pryanikov's avatar
pryanikov committed
178
179
180
					}
				}

pryanikov's avatar
pryanikov committed
181
				dbreader->AddTable(database, table, columns, mapping["dump"].as<bool>());
pryanikov's avatar
pryanikov committed
182
183
				tpwriter->AddTable(database, table, space, keys, insert_call, update_call, delete_call);
				tpwriter->space_last_id[space] = index_max;
pryanikov's avatar
pryanikov committed
184
185
			}
		}
pryanikov's avatar
pryanikov committed
186
		// read space settings
pryanikov's avatar
pryanikov committed
187
		{
pryanikov's avatar
pryanikov committed
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
			const YAML::Node& spaces = cfg["spaces"];

			for (auto its = spaces.begin(); its != spaces.end(); ++its) {
				unsigned space = its->first.as<unsigned>();
				std::map<unsigned, SerializableValue>& rn_ = tpwriter->replace_null[ space ];
				const YAML::Node& replace_null = its->second["replace_null"];

				for (auto itrn = replace_null.begin(); itrn != replace_null.end(); ++itrn) {
					const YAML::Node& field = itrn->second;
					auto itf = field.begin();
					if (itf == field.end()) continue;

					unsigned index = itrn->first.as<unsigned>();
					std::string type = itf->first.as<std::string>();
					const YAML::Node& value = itf->second;

pryanikov's avatar
pryanikov committed
204
205
					if (type == "str" || type == "string") {
						rn_[ index ] = value.as<std::string>();
pryanikov's avatar
pryanikov committed
206
					} else if (type == "unsigned") {
pryanikov's avatar
pryanikov committed
207
208
209
						rn_[ index ] = value.as<unsigned long long>();
					} else if (type == "int" || type == "integer") {
						rn_[ index ] = value.as<long long>();
pryanikov's avatar
pryanikov committed
210
211
212
213
214
					} else {
						std::cerr << "Config error: unknown type for non-null value for column " << index << std::endl;
						exit(EXIT_FAILURE);
					}
				}
pryanikov's avatar
pryanikov committed
215
216
217
			}
		}
	}
pryanikov's avatar
pryanikov committed
218
	catch(YAML::Exception& ex)
pryanikov's avatar
pryanikov committed
219
	{
pryanikov's avatar
pryanikov committed
220
		std::cerr << "Config error: " << ex.what() << std::endl;
pryanikov's avatar
pryanikov committed
221
222
223
224
225
226
227
228
		exit(EXIT_FAILURE);
	}
}

static void shutdown()
{
	if (dbreader) {
		// sighandler protection
pryanikov's avatar
pryanikov committed
229
		auto dbreader_ = dbreader;
pryanikov's avatar
pryanikov committed
230
231
232
		dbreader = NULL;
		delete dbreader_;
	}
pryanikov's avatar
pryanikov committed
233
234
235
236
	if (tpwriter) {
		auto tpwriter_ = tpwriter;
		tpwriter = NULL;
		delete tpwriter_;
pryanikov's avatar
pryanikov committed
237
238
239
	}
}

pryanikov's avatar
pryanikov committed
240
static void sigint_handler(int sig)
pryanikov's avatar
pryanikov committed
241
242
243
{
	std::cerr << "Terminating" << std::endl;
	is_term = true;
pryanikov's avatar
pryanikov committed
244

pryanikov's avatar
pryanikov committed
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
	if (dbreader) {
		dbreader->Stop();
	}
}

}

static replicator::Logger *ol, *el;
static std::streambuf *ol_sink, *el_sink;
static std::ofstream *flog;

static std::string log_filename(replicator::default_log_filename);
static std::string pid_filename(replicator::default_pid_filename);

static void writepidtofile()
{
	// write pid to file
	std::ofstream fpid(pid_filename);
	fpid << getpid();
	fpid.flush();
	fpid.close();
}

static void removepidfile()
{
	unlink(pid_filename.c_str());
}

static void openlogfile()
{
	flog = new std::ofstream(log_filename, std::ofstream::app);

	// redirect cout and cerr streams, appending timestamps and log levels
	ol = new replicator::Logger(std::cout, 'I');
	el = new replicator::Logger(std::cerr, 'E');

	ol_sink = ol->rdsink();
	el_sink = el->rdsink();

	// redirect loggers to file
	ol->rdsink(flog->rdbuf());
	el->rdsink(flog->rdbuf());
}

static void closelogfile()
{
	if (flog == NULL) {
		return;
	}

	flog->flush();
	flog->close();

	delete flog;
	flog = NULL;

	// restore streams
	ol->rdsink(ol_sink);
	el->rdsink(el_sink);

	delete ol;
	delete el;

	ol = NULL;
	el = NULL;
}

static void sighup_handler(int sig)
{
	closelogfile();
	openlogfile();
	std::cout << "Caught SIGHUP, continuing..." << std::endl;
}

int main(int argc, char** argv)
{
	bool print_usage = false;
	std::string config_name(replicator::default_config_filename);

	int c;
	while (-1 != (c = ::getopt(argc, argv, "c:l:i:zp")))
	{
		switch (c)
		{
			case 'c': config_name = optarg; break;
			case 'p': print_usage = true; break;
			case 'l': log_filename = optarg; break;
			case 'i': pid_filename = optarg; break;
			default: print_usage = true; break;
		}
	}

	if (print_usage) {
pryanikov's avatar
pryanikov committed
338
		std::cout
pryanikov's avatar
pryanikov committed
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
			<< "Usage: " << argv[0] << " [-c <config_name>]" << " [-l <log_name>]"<< " [-i <pid_name>]" << " [-p]" << std::endl
			<< " -c configuration file (" << config_name << ")" << std::endl
			<< " -p print usage" << std::endl
			<< " -l log filename (" << log_filename << ")" << std::endl
			<< " -i pid filename (" << pid_filename << ")" << std::endl
			;
		return 1;
	}

	writepidtofile();
	atexit(removepidfile);

	openlogfile();
	atexit(closelogfile);

pryanikov's avatar
pryanikov committed
354
	YAML::Node cfg;
pryanikov's avatar
pryanikov committed
355
356

	// Read the file. If there is an error, report it and exit.
pryanikov's avatar
pryanikov committed
357
	try {
pryanikov's avatar
pryanikov committed
358
		cfg = YAML::LoadFile(config_name.c_str());
pryanikov's avatar
pryanikov committed
359
	}
pryanikov's avatar
pryanikov committed
360
	catch(YAML::Exception& ex)
pryanikov's avatar
pryanikov committed
361
	{
pryanikov's avatar
pryanikov committed
362
		std::cerr << "Config error: " << ex.what() << std::endl;
pryanikov's avatar
pryanikov committed
363
364
365
		return EXIT_FAILURE;
	}

pryanikov's avatar
pryanikov committed
366
367
	signal(SIGINT, replicator::sigint_handler);
	signal(SIGTERM, replicator::sigint_handler);
pryanikov's avatar
pryanikov committed
368
369
370
371
	signal(SIGHUP, sighup_handler);

	replicator::init(cfg);

pryanikov's avatar
pryanikov committed
372
373
	std::thread t1(replicator::mysql_worker);
	std::thread t2(replicator::tpwriter_worker);
pryanikov's avatar
pryanikov committed
374

pryanikov's avatar
pryanikov committed
375
376
	t2.join();
	t1.join();
pryanikov's avatar
pryanikov committed
377

pryanikov's avatar
pryanikov committed
378
	replicator::shutdown();
pryanikov's avatar
pryanikov committed
379
380
381

	return 0;
}