提交 2379905f 编写于 作者: F Fankux

message seq for legacy protocol

上级 2da1935f
......@@ -73,7 +73,7 @@ public:
_comm.clear_channels();
_comm.stop();
ObLogReader& reader = ObLogReader::instance();
ObLogReader reader;
OblogConfig oblog_config(_config);
if (!oblog_config.sys_user.empty()) {
oblog_config.user.set(oblog_config.sys_user.val());
......
......@@ -60,14 +60,15 @@ static int compress_data(const RecordDataMessage& msg, MsgBuf& buffer)
return OMS_FAILED;
}
uint32_t idx = msg.idx;
size_t offset = 0;
for (size_t i = 0; i < ptrs.size(); ++i) {
size_t block_size = ptrs[i].second;
uint32_t seq_be = cpu_to_be<uint32_t>(i);
for (auto& ptr : ptrs) {
size_t block_size = ptr.second;
uint32_t seq_be = cpu_to_be<uint32_t>(idx++);
uint32_t size_be = cpu_to_be<uint32_t>(block_size);
memcpy(raw + offset, &seq_be, 4);
memcpy(raw + offset + 4, &size_be, 4);
memcpy(raw + offset + 8, ptrs[i].first, block_size);
memcpy(raw + offset + 8, ptr.first, block_size);
offset += (block_size + 8);
}
......@@ -153,10 +154,9 @@ LegacyEncoder::LegacyEncoder()
return compress_data(msg, buffer);
}
uint32_t idx = msg.idx;
uint32_t total_size = 0;
for (size_t i = 0; i < msg.records.size(); ++i) {
ILogRecord* record = msg.records[i];
for (auto record : msg.records) {
size_t size = 0;
// got independ address
const char* logmsg_buf = record->getFormatedString(&size);
......@@ -170,7 +170,7 @@ LegacyEncoder::LegacyEncoder()
<< ", size: " << header->m_size;
}
uint32_t seq_be = cpu_to_be<uint32_t>(i);
uint32_t seq_be = cpu_to_be<uint32_t>(idx++);
uint32_t size_be = cpu_to_be<uint32_t>(size);
buffer.push_back_copy((char*)&seq_be, 4);
buffer.push_back_copy((char*)&size_be, 4);
......
......@@ -243,6 +243,9 @@ protected:
public:
CompressType compress_type = CompressType::PLAIN;
std::vector<ILogRecord*> records;
// index to count message seq for a client session
uint32_t idx = 0;
};
} // namespace logproxy
......
......@@ -23,8 +23,6 @@ namespace oceanbase {
namespace logproxy {
class ObLogReader {
OMS_SINGLETON(ObLogReader);
OMS_AVOID_COPY(ObLogReader);
public:
virtual ~ObLogReader();
......@@ -41,8 +39,8 @@ private:
OblogAccess _oblog;
BlockingQueue<ILogRecord*> _queue{Config::instance().record_queue_size.val()};
ReaderRoutine _reader{_oblog, _queue};
SenderRoutine _sender{_oblog, _queue};
ReaderRoutine _reader{*this, _oblog, _queue};
SenderRoutine _sender{*this, _oblog, _queue};
};
} // namespace logproxy
......
......@@ -23,8 +23,8 @@ namespace logproxy {
static Config& _s_config = Config::instance();
ReaderRoutine::ReaderRoutine(OblogAccess& oblog, BlockingQueue<ILogRecord*>& q)
: Thread("ReaderRoutine"), _oblog(oblog), _queue(q)
ReaderRoutine::ReaderRoutine(ObLogReader& reader, OblogAccess& oblog, BlockingQueue<ILogRecord*>& q)
: Thread("ReaderRoutine"), _reader(reader), _oblog(oblog), _queue(q)
{}
int ReaderRoutine::init(const OblogConfig& config)
......@@ -80,7 +80,7 @@ void ReaderRoutine::run()
counter.count_read(1);
}
ObLogReader::instance().stop();
_reader.stop();
}
} // namespace logproxy
......
......@@ -20,9 +20,11 @@
namespace oceanbase {
namespace logproxy {
class ObLogReader;
class ReaderRoutine : public Thread {
public:
ReaderRoutine(OblogAccess&, BlockingQueue<ILogRecord*>&);
ReaderRoutine(ObLogReader&, OblogAccess&, BlockingQueue<ILogRecord*>&);
int init(const OblogConfig& config);
......@@ -32,6 +34,7 @@ private:
void run() override;
private:
ObLogReader& _reader;
OblogAccess& _oblog;
BlockingQueue<ILogRecord*>& _queue;
......
......@@ -28,8 +28,8 @@ namespace logproxy {
static Config& _s_config = Config::instance();
SenderRoutine::SenderRoutine(OblogAccess& oblog, BlockingQueue<ILogRecord*>& rqueue)
: Thread("SenderRoutine"), _oblog(oblog), _rqueue(rqueue)
SenderRoutine::SenderRoutine(ObLogReader& reader, OblogAccess& oblog, BlockingQueue<ILogRecord*>& rqueue)
: Thread("SenderRoutine"), _reader(reader), _oblog(oblog), _rqueue(rqueue)
{}
int SenderRoutine::init(MessageVersion packet_version, Channel* ch)
......@@ -170,7 +170,7 @@ void SenderRoutine::run()
}
LogMsgLocalDestroy();
ObLogReader::instance().stop();
_reader.stop();
}
int SenderRoutine::do_send(const std::vector<ILogRecord*>& records, size_t offset, size_t count)
......@@ -183,9 +183,12 @@ int SenderRoutine::do_send(const std::vector<ILogRecord*>& records, size_t offse
RecordDataMessage msg(records, offset, count);
msg.set_version(_packet_version);
msg.compress_type = CompressType::LZ4;
msg.idx = _msg_seq;
int ret = _comm.send_message(_client_peer, msg, true);
Counter::instance().count_key(Counter::SENDER_SEND_US, _stage_timer.elapsed());
_msg_seq += count;
if (ret == OMS_OK) {
ILogRecord* last = records[offset + count - 1];
Counter::instance().count_write(count);
......
......@@ -20,9 +20,11 @@
namespace oceanbase {
namespace logproxy {
class ObLogReader;
class SenderRoutine : public Thread {
public:
SenderRoutine(OblogAccess&, BlockingQueue<ILogRecord*>&);
SenderRoutine(ObLogReader&, OblogAccess&, BlockingQueue<ILogRecord*>&);
int init(MessageVersion packet_version, Channel* ch);
......@@ -34,6 +36,7 @@ private:
int do_send(const std::vector<ILogRecord*>& records, size_t offset, size_t count);
private:
ObLogReader& _reader;
OblogAccess& _oblog;
BlockingQueue<ILogRecord*>& _rqueue;
......@@ -44,6 +47,8 @@ private:
PeerInfo _client_peer;
Timer _stage_timer;
uint32_t _msg_seq = 0;
};
} // namespace logproxy
......
......@@ -30,7 +30,7 @@ int run(const std::string& cluster_url, const std::string& user, const std::stri
OblogConfig oblog_config(config_str);
OMS_INFO << "OB Log Config: " << oblog_config.debug_str(true);
ObLogReader& reader = ObLogReader::instance();
ObLogReader reader;
PeerInfo peer(0);
ChannelFactory channel_factory;
int ret = channel_factory.init(Config::instance());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册