tpwriter.cpp 13.1 KB
Newer Older
pryanikov's avatar
pryanikov committed
1
2
#include <iostream>
#include <sys/time.h>
pryanikov's avatar
pryanikov committed
3
4
5
6
7

#include <tarantool/tarantool.h>
#include <tarantool/tnt_net.h>
#include <tarantool/tnt_opt.h>
#include <msgpuck.h>
pryanikov's avatar
pryanikov committed
8

pryanikov's avatar
pryanikov committed
9
#include <boost/any.hpp>
pryanikov's avatar
pryanikov committed
10
11
12
13
14
15
16
17

#include "tpwriter.h"
#include "serializable.h"

namespace replicator {

const std::string TPWriter::empty_call("");

pryanikov's avatar
pryanikov committed
18
19
20
21
22
23
24
TPWriter::TPWriter(
	const std::string &host,
	const std::string &user,
	const std::string &password,
	uint32_t binlog_key_space,
	uint32_t binlog_key,
	unsigned connect_retry,
pryanikov's avatar
pryanikov committed
25
	unsigned sync_retry
pryanikov's avatar
pryanikov committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
) :
	host(host),
	user(user),
	password(password),
	binlog_key_space(binlog_key_space),
	binlog_key(binlog_key),
	binlog_name(""),
	binlog_pos(0),
	seconds_behind_master(0),
	connect_retry(connect_retry),
	sync_retry(sync_retry),
	next_connect_attempt(0),
	next_sync_attempt(0),
	next_ping_attempt(0),
	reply_server_code(0),
	reply_error_msg("")
pryanikov's avatar
pryanikov committed
42
{
pryanikov's avatar
pryanikov committed
43
44
45
46
47
48
49
50
51
52
53
54
	::tnt_net(&sess);

	::tnt_set(&sess, ::TNT_OPT_URI, host.c_str());
	::tnt_set(&sess, ::TNT_OPT_SEND_BUF, 0);
	::tnt_set(&sess, ::TNT_OPT_RECV_BUF, 16 * 1024 * 1024);

	::timeval tmout;
	auto make_timeout = [&tmout] (const unsigned t) -> ::timeval* {
		tmout.tv_sec  =  t / 1000;
		tmout.tv_usec = (t % 1000) * 1000;
		return &tmout;
	};
pryanikov's avatar
pryanikov committed
55

pryanikov's avatar
pryanikov committed
56
57
	::tnt_set(&sess, ::TNT_OPT_TMOUT_RECV, make_timeout(100));
	::tnt_set(&sess, ::TNT_OPT_TMOUT_SEND, make_timeout(10000));
pryanikov's avatar
pryanikov committed
58
59
60
61
62
63
64
65
66
}

bool TPWriter::Connect()
{
	// connect to tarantool
	if (::time(NULL) < next_connect_attempt) {
		::sleep(1);
		return false;
	}
pryanikov's avatar
pryanikov committed
67
	if (::tnt_connect(&sess) < 0) {
pryanikov's avatar
pryanikov committed
68
		std::cerr << "Could not connect to Tarantool: " << ::tnt_strerror(&sess) << std::endl;
pryanikov's avatar
pryanikov committed
69
		::tnt_close(&sess);
pryanikov's avatar
pryanikov committed
70
71
72
73
		next_connect_attempt = ::time(NULL) + connect_retry;
		return false;
	}

pryanikov's avatar
pryanikov committed
74
	std::cout << "Connected to Tarantool at " << host << std::endl;
pryanikov's avatar
pryanikov committed
75
76
77
78
79
80
81
	next_sync_attempt = 0;

	return true;
}

TPWriter::~TPWriter()
{
pryanikov's avatar
pryanikov committed
82
	::tnt_stream_free(&sess);
pryanikov's avatar
pryanikov committed
83
84
85
86
87
}

bool TPWriter::ReadBinlogPos(std::string &binlog_name, unsigned long &binlog_pos)
{
	// read initial binlog pos
pryanikov's avatar
pryanikov committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
	int64_t sync;
	{
		__tnt_object key;
		::tnt_object_add_array(&key, 1);
		::tnt_object_add_uint(&key, binlog_key);
		::tnt_object_container_close(&key);

		__tnt_request req;
		::tnt_request_select(&req);
		::tnt_request_set_space(&req, binlog_key_space);
		::tnt_request_set_limit(&req, 1);
		::tnt_request_set_key(&req, &key);

		sync = Send(&req);
	}
pryanikov's avatar
pryanikov committed
103

pryanikov's avatar
pryanikov committed
104
105
106
107
108
109
110
	__tnt_reply re;
	do {
		const int r = Recv(&re);
		if (r == 0) {
			break;
		}
		else if (r < 0 && reply_server_code) {
pryanikov's avatar
fixes    
pryanikov committed
111
			std::cerr << "ReadBinlogPos Tarantool error: " << reply_error_msg << " (code: " << reply_server_code << ")" << std::endl;
pryanikov's avatar
pryanikov committed
112
113
114
115
116
117
118
			return false;
		}
		else {
			std::cerr << "ReadBinlogPos error: no replies, weird" << std::endl;
			return false;
		}
	} while (true);
pryanikov's avatar
pryanikov committed
119

pryanikov's avatar
pryanikov committed
120
121
	if ((&re)->sync != sync) {
		std::cerr << "ReadBinlogPos error: not requested reply" << std::endl;
pryanikov's avatar
pryanikov committed
122
123
124
		return false;
	}

pryanikov's avatar
pryanikov committed
125
126
127
128
129
130
131
132
133
134
135
	do {
		const char *data = (&re)->data;

		// result rows
		if (mp_unlikely(mp_typeof(*data) != MP_ARRAY)) break;
		if (mp_unlikely(mp_decode_array(&data) == 0)) {
			// no binlog created yet
			this->binlog_name = binlog_name = "";
			this->binlog_pos = binlog_pos = 0;
			return true;
		}
pryanikov's avatar
pryanikov committed
136

pryanikov's avatar
pryanikov committed
137
138
139
		// row
		if (mp_unlikely(mp_typeof(*data) != MP_ARRAY)) break;
		if (mp_unlikely(mp_decode_array(&data) != 3)) break;
pryanikov's avatar
pryanikov committed
140

pryanikov's avatar
pryanikov committed
141
142
143
		// binlog_key
		if (mp_unlikely(mp_typeof(*data) != MP_UINT)) break;
		if (mp_unlikely(mp_decode_uint(&data) != binlog_key)) break;;
pryanikov's avatar
pryanikov committed
144

pryanikov's avatar
pryanikov committed
145
146
147
		if (mp_unlikely(mp_typeof(*data) != MP_STR)) break;
		uint32_t _binlog_name_len;
		const char *_binlog_name = mp_decode_str(&data, &_binlog_name_len);
pryanikov's avatar
pryanikov committed
148

pryanikov's avatar
pryanikov committed
149
150
		if (mp_unlikely(mp_typeof(*data) != MP_UINT)) break;
		uint64_t _binlog_pos = mp_decode_uint(&data);
pryanikov's avatar
pryanikov committed
151

pryanikov's avatar
pryanikov committed
152
153
		this->binlog_name = binlog_name = std::string(_binlog_name, _binlog_name_len);
		this->binlog_pos = binlog_pos = _binlog_pos;
pryanikov's avatar
pryanikov committed
154

pryanikov's avatar
pryanikov committed
155
		next_ping_attempt = Milliseconds() + TPWriter::PING_TIMEOUT;
pryanikov's avatar
pryanikov committed
156

pryanikov's avatar
pryanikov committed
157
158
		return true;
	} while (0);
pryanikov's avatar
pryanikov committed
159

pryanikov's avatar
pryanikov committed
160
161
162
	std::cerr << "binlog record format error" << std::endl;
	this->binlog_name = binlog_name = "";
	this->binlog_pos = binlog_pos = 0;
pryanikov's avatar
pryanikov committed
163
164
165
166
167
	return true;
}

void TPWriter::Disconnect()
{
pryanikov's avatar
pryanikov committed
168
	::tnt_close(&sess);
pryanikov's avatar
pryanikov committed
169
170
}

pryanikov's avatar
pryanikov committed
171
inline void TPWriter::Ping()
pryanikov's avatar
pryanikov committed
172
{
pryanikov's avatar
pryanikov committed
173
174
175
	__tnt_request req;
	::tnt_request_ping(&req);
	Send(&req);
pryanikov's avatar
pryanikov committed
176
177
}

pryanikov's avatar
pryanikov committed
178
179
180
181
182
183
184
185
186
void TPWriter::AddTable(
	const std::string &db,
	const std::string &table,
	const unsigned space,
	const Tuple &keys,
	const std::string &insert_call,
	const std::string &update_call,
	const std::string &delete_call
) {
pryanikov's avatar
pryanikov committed
187
188
189
190
191
192
193
194
195
	TableMap &d = dbs[db];
	TableSpace &s = d[table];
	s.space = space;
	s.keys = keys;
	s.insert_call = insert_call;
	s.update_call = update_call;
	s.delete_call = delete_call;
}

pryanikov's avatar
pryanikov committed
196
inline void TPWriter::SaveBinlogPos()
pryanikov's avatar
pryanikov committed
197
{
pryanikov's avatar
pryanikov committed
198
199
200
201
202
203
204
205
206
207
208
209
210
	__tnt_object tuple;
	::tnt_object_add_array(&tuple, 3);
	::tnt_object_add_uint(&tuple, binlog_key);
	::tnt_object_add_str(&tuple, binlog_name.c_str(), binlog_name.length());
	::tnt_object_add_uint(&tuple, binlog_pos);
	::tnt_object_container_close(&tuple);

	__tnt_request req;
	::tnt_request_replace(&req);
	::tnt_request_set_space(&req, binlog_key_space);
	::tnt_request_set_tuple(&req, &tuple);

	Send(&req);
pryanikov's avatar
pryanikov committed
211
212
}

pryanikov's avatar
pryanikov committed
213
bool TPWriter::BinlogEventCallback(const SerializableBinlogEvent& ev)
pryanikov's avatar
pryanikov committed
214
215
216
{
	// spacial case event "IGNORE", which only updates binlog position
	// but doesn't modify any table data
pryanikov's avatar
pryanikov committed
217
218
	if (ev.event == "IGNORE")
		return false;
pryanikov's avatar
pryanikov committed
219

pryanikov's avatar
pryanikov committed
220
221
222
223
224
	if (ev.binlog_name != "") {
		binlog_name = ev.binlog_name;
		binlog_pos = ev.binlog_pos;
	}

pryanikov's avatar
pryanikov committed
225
226
227
	const auto idb = dbs.find(ev.database);
	if (idb == dbs.end())
		return false;
pryanikov's avatar
pryanikov committed
228

pryanikov's avatar
pryanikov committed
229
230
231
232
	const TableMap &tm = idb->second;
	const auto itm = tm.find(ev.table);
	if (itm == tm.end())
		return false;
pryanikov's avatar
pryanikov committed
233

pryanikov's avatar
pryanikov committed
234
	const TableSpace &ts = itm->second;
pryanikov's avatar
pryanikov committed
235

pryanikov's avatar
fix    
pryanikov committed
236
237
238
239
240
	const auto irn = replace_null.find(ts.space);
	const std::map<unsigned, SerializableValue>* replace_null_;
	if (irn != replace_null.end()) {
		replace_null_ = &irn->second;
	} else {
pryanikov's avatar
pryanikov committed
241
		replace_null_ = nullptr;
pryanikov's avatar
fix    
pryanikov committed
242
243
	}

pryanikov's avatar
pryanikov committed
244
245
246
	const auto add_nil_with_replace = [&] (struct ::tnt_stream *o, const unsigned index) -> void {
		do {
			if (replace_null_ == nullptr) break;
pryanikov's avatar
fix    
pryanikov committed
247
			const auto irnv = replace_null_->find(index);
pryanikov's avatar
pryanikov committed
248
249
250
251
252
253
254
255
256
257
258
259
			if (irnv == irn->second.end()) break;

			const SerializableValue& v = irnv->second;
			if (v.is<std::string>()) {
				const std::string s = v.as<std::string>();
				::tnt_object_add_str(o, s.c_str(), s.length());
			} else if (v.is<long long>()) {
				::tnt_object_add_int(o, v.as<long long>());
			} else if (v.is<unsigned long long>()) {
				::tnt_object_add_uint(o, v.as<unsigned long long>());
			} else {
				throw std::range_error(std::string("Typecasting error for non-null value for column: " + index));
pryanikov's avatar
pryanikov committed
260
			}
pryanikov's avatar
pryanikov committed
261
262
263
			return;

		} while (false);
pryanikov's avatar
pryanikov committed
264
265
266
267

		::tnt_object_add_nil(o);
	};

pryanikov's avatar
pryanikov committed
268
	const auto add_value = [&] (struct ::tnt_stream *o, const unsigned index, const SerializableValue &v) -> void {
pryanikov's avatar
pryanikov committed
269
		try {
pryanikov's avatar
pryanikov committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
			if (v.is<std::string>()) {
				const std::string s = v.as<std::string>();
				::tnt_object_add_str(o, s.c_str(), s.length());
			} else if (v.is<char>()) {
				::tnt_object_add_int(o, v.as<char>());
			} else if (v.is<unsigned char>()) {
				::tnt_object_add_uint(o, v.as<unsigned char>());
			} else if (v.is<short>()) {
				::tnt_object_add_int(o, v.as<short>());
			} else if (v.is<unsigned short>()) {
				::tnt_object_add_uint(o, v.as<unsigned short>());
			} else if (v.is<int>()) {
				::tnt_object_add_int(o, v.as<int>());
			} else if (v.is<unsigned int>()) {
				::tnt_object_add_uint(o, v.as<unsigned int>());
			} else if (v.is<long>()) {
				::tnt_object_add_int(o, v.as<long>());
			} else if (v.is<unsigned long>()) {
				::tnt_object_add_uint(o, v.as<unsigned long>());
			} else if (v.is<long long>()) {
				::tnt_object_add_int(o, v.as<long long>());
			} else if (v.is<unsigned long long>()) {
				::tnt_object_add_uint(o, v.as<unsigned long long>());
			} else if (v.is<float>()) {
				::tnt_object_add_float(o, v.as<float>());
			} else if (v.is<double>()) {
				::tnt_object_add_double(o, v.as<double>());
pryanikov's avatar
pryanikov committed
297
			} else {
pryanikov's avatar
pryanikov committed
298
				add_nil_with_replace(o, index);
pryanikov's avatar
pryanikov committed
299
			}
pryanikov's avatar
pryanikov committed
300
301
302
303
304
		}
		catch (boost::bad_any_cast &ex) {
			throw std::range_error(std::string("Typecasting error for column: ") + ex.what());
		}
	};
pryanikov's avatar
pryanikov committed
305

pryanikov's avatar
pryanikov committed
306
	const auto add_key = [&] (struct ::tnt_stream *o) -> void {
pryanikov's avatar
pryanikov committed
307
308
		::tnt_object_add_array(o, ts.keys.size());
		for (const auto i : ts.keys) {
pryanikov's avatar
pryanikov committed
309
			add_value(o, i, ev.row.at(i));
pryanikov's avatar
pryanikov committed
310
311
312
313
		}
		::tnt_object_container_close(o);
	};

pryanikov's avatar
pryanikov committed
314
	const auto add_tuple = [&] (struct ::tnt_stream *o) -> void {
pryanikov's avatar
pryanikov committed
315
316
317
318
319
320
		::tnt_object_add_array(o, space_last_id[ts.space] + 1);
		unsigned i_nil = 0;
		// ev.row may have gaps, since it's not an array but a map
		// so fill the gaps to match columns count
		for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
			// fill gaps
pryanikov's avatar
pryanikov committed
321
			for (; i_nil < it->first; ++i_nil) add_nil_with_replace(o, i_nil);
pryanikov's avatar
pryanikov committed
322

pryanikov's avatar
pryanikov committed
323
			add_value(o, it->first, it->second);
pryanikov's avatar
pryanikov committed
324
			i_nil = it->first + 1;
pryanikov's avatar
pryanikov committed
325
		}
pryanikov's avatar
pryanikov committed
326
		// fill gaps
pryanikov's avatar
pryanikov committed
327
		for (; i_nil <= space_last_id[ts.space]; ++i_nil) add_nil_with_replace(o, i_nil);
pryanikov's avatar
pryanikov committed
328
329
330
331

		::tnt_object_container_close(o);
	};

pryanikov's avatar
pryanikov committed
332
333
	const auto add_ops = [&] (struct ::tnt_stream *o) -> void {
		::tnt_update_container_reset(o);
pryanikov's avatar
pryanikov committed
334
335
		for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
			__tnt_object sval;
pryanikov's avatar
pryanikov committed
336
			add_value(&sval, it->first, it->second);
pryanikov's avatar
pryanikov committed
337
338
			::tnt_update_assign(o, it->first, &sval);
		}
pryanikov's avatar
pryanikov committed
339
340
341
342
343
344
345
346
347
348
349
		::tnt_update_container_close(o);
	};

	const auto make_call = [&] (const std::string& func_name) -> void {
		__tnt_object args;
		::tnt_object_add_array(&args, 1);
		::tnt_object_add_map(&args, ev.row.size());

		for (auto it = ev.row.begin(), end = ev.row.end(); it != end; ++it) {
			::tnt_object_add_uint(&args, it->first);
			add_value(&args, it->first, it->second);
pryanikov's avatar
fixes    
pryanikov committed
350
		}
pryanikov's avatar
pryanikov committed
351
352
353
354
355
356
357
358
359
		::tnt_object_container_close(&args);
		::tnt_object_container_close(&args);

		__tnt_request req;
		::tnt_request_call(&req);
		::tnt_request_set_tuple(&req, &args);
		::tnt_request_set_func(&req, func_name.c_str(), func_name.length());

		Send(&req);
pryanikov's avatar
pryanikov committed
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
	};

	// add Tarantool request
	if (ev.event == "DELETE") {
		if (ts.delete_call.empty()) {
			__tnt_object key;
			add_key(&key);

			__tnt_request req;
			::tnt_request_delete(&req);
			::tnt_request_set_space(&req, ts.space);
			::tnt_request_set_key(&req, &key);

			Send(&req);
		} else {
pryanikov's avatar
pryanikov committed
375
			make_call(ts.delete_call);
pryanikov's avatar
pryanikov committed
376
377
378
		}
	} else if (ev.event == "INSERT") {
		if (ts.insert_call.empty()) {
379
380
381
382
383
384
385
386
			// __tnt_object tuple;
			// add_tuple(&tuple);

			// __tnt_request req;
			// ::tnt_request_replace(&req);
			// ::tnt_request_set_space(&req, ts.space);
			// ::tnt_request_set_tuple(&req, &tuple);

pryanikov's avatar
pryanikov committed
387
388
389
			__tnt_object tuple;
			add_tuple(&tuple);

390
391
392
			__tnt_object ops;
			add_ops(&ops);

pryanikov's avatar
pryanikov committed
393
			__tnt_request req;
394
			::tnt_request_upsert(&req);
pryanikov's avatar
pryanikov committed
395
396
			::tnt_request_set_space(&req, ts.space);
			::tnt_request_set_tuple(&req, &tuple);
397
			::tnt_request_set_ops(&req, &ops);
pryanikov's avatar
pryanikov committed
398
399
400

			Send(&req);
		} else {
pryanikov's avatar
pryanikov committed
401
			make_call(ts.insert_call);
pryanikov's avatar
pryanikov committed
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
		}
	} else if (ev.event == "UPDATE") {
		if (ts.update_call.empty()) {
			__tnt_object key;
			add_key(&key);

			__tnt_object ops;
			add_ops(&ops);

			__tnt_request req;
			::tnt_request_update(&req);
			::tnt_request_set_space(&req, ts.space);
			::tnt_request_set_key(&req, &key);
			::tnt_request_set_ops(&req, &ops);

			Send(&req);
		} else {
pryanikov's avatar
pryanikov committed
419
			make_call(ts.update_call);
420
		}
pryanikov's avatar
pryanikov committed
421
422
	} else {
		throw std::range_error("Uknown binlog event: " + ev.event);
pryanikov's avatar
pryanikov committed
423
424
	}

pryanikov's avatar
pryanikov committed
425
426
427
428
429
430
431
432
433
434
	// check if doing same key as last time
	// and do sync, if true
	static std::string prev_key;
	std::string curnt_key;

	for (const auto i : ts.keys)
		curnt_key += ev.row.at(i).to_string();

	if (prev_key == curnt_key)
		return true;
pryanikov's avatar
pryanikov committed
435

pryanikov's avatar
pryanikov committed
436
	prev_key = curnt_key;
pryanikov's avatar
pryanikov committed
437
438
439
440
	return false;
}

// blocking send
pryanikov's avatar
pryanikov committed
441
int64_t TPWriter::Send(struct ::tnt_request *req)
pryanikov's avatar
pryanikov committed
442
{
pryanikov's avatar
pryanikov committed
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
	struct ::tnt_stream sbuf;
	::tnt_buf(&sbuf);
	const int64_t sync = ::tnt_request_compile(&sbuf, req);

	size_t len = TNT_SBUF_SIZE(&sbuf);
	char *buf = TNT_SBUF_DATA(&sbuf);

	while (len > 0) {
		const ssize_t r = sess.write(&sess, buf, len);
		if (r < 0) {
			const int _errno = ::tnt_errno(&sess);
			if (_errno == EWOULDBLOCK || _errno == EAGAIN) {
				continue;
			}
			::tnt_stream_free(&sbuf);
			throw std::runtime_error("Send failed: " + std::string(::tnt_strerror(&sess)));
pryanikov's avatar
pryanikov committed
459
		}
pryanikov's avatar
pryanikov committed
460
461
		len -= r;
		buf += r;
pryanikov's avatar
pryanikov committed
462
463
	}

pryanikov's avatar
pryanikov committed
464
465
	::tnt_stream_free(&sbuf);
	return sync;
pryanikov's avatar
pryanikov committed
466
467
468
469
470
471
472
473
474
475
476
477
478
}

bool TPWriter::Sync(bool force)
{
	if (next_ping_attempt == 0 || Milliseconds() > next_ping_attempt) {
		force = true;
		next_ping_attempt = Milliseconds() + TPWriter::PING_TIMEOUT;
		Ping();
	}
	if (force || next_sync_attempt == 0 || Milliseconds() >= next_sync_attempt) {
		SaveBinlogPos();
		next_sync_attempt = Milliseconds() + sync_retry;
	}
pryanikov's avatar
pryanikov committed
479
	return true;
pryanikov's avatar
pryanikov committed
480
481
}

pryanikov's avatar
pryanikov committed
482
483
// non-blocking receive
int TPWriter::Recv(struct ::tnt_reply *re)
pryanikov's avatar
pryanikov committed
484
{
pryanikov's avatar
pryanikov committed
485
486
487
488
489
490
	const int r = sess.read_reply(&sess, re);

	if (r == 0) {
		if (re->code) {
			reply_server_code = re->code;
			reply_error_msg = std::move(std::string(re->error, re->error_end - re->error));
pryanikov's avatar
pryanikov committed
491
			return -1;
pryanikov's avatar
pryanikov committed
492
493
494
		} else if (reply_server_code) {
			reply_server_code = 0;
			reply_error_msg = "";
pryanikov's avatar
pryanikov committed
495
		}
pryanikov's avatar
pryanikov committed
496
		return r;
pryanikov's avatar
pryanikov committed
497
	}
pryanikov's avatar
pryanikov committed
498
499
500
501
	if (r < 0) {
		const int _errno = ::tnt_errno(&sess);
		if (_errno == EWOULDBLOCK || _errno == EAGAIN) {
			return r;
pryanikov's avatar
pryanikov committed
502
		}
pryanikov's avatar
pryanikov committed
503
		throw std::runtime_error("Recv failed: " + std::string(::tnt_strerror(&sess)));
pryanikov's avatar
pryanikov committed
504
	}
pryanikov's avatar
pryanikov committed
505
506
	return r;
}
pryanikov's avatar
pryanikov committed
507

pryanikov's avatar
pryanikov committed
508
509
510
int TPWriter::ReadReply()
{
	int r;
pryanikov's avatar
pryanikov committed
511
512
513
514
	do {
		__tnt_reply re;
		r = Recv(&re);
	}
pryanikov's avatar
pryanikov committed
515
516
	while (r == 0);
	return r;
pryanikov's avatar
pryanikov committed
517
518
}

pryanikov's avatar
pryanikov committed
519
uint64_t TPWriter::GetReplyCode() const
pryanikov's avatar
pryanikov committed
520
521
522
523
{
	return reply_server_code;
}

pryanikov's avatar
pryanikov committed
524
const std::string& TPWriter::GetReplyErrorMessage() const
pryanikov's avatar
pryanikov committed
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
{
	return reply_error_msg;
}

uint64_t TPWriter::Milliseconds()
{
	struct timeval tp;
	::gettimeofday( &tp, NULL );
	if (!secbase) {
		secbase = tp.tv_sec;
		return tp.tv_usec / 1000;
	}
	return (uint64_t)(tp.tv_sec - secbase)*1000 + tp.tv_usec / 1000;
}

}