未验证 提交 513dde92 编写于 作者: F Fankux 提交者: GitHub

Merge pull request #10 from fankux/master

legacy v1 protocol && enrich metric logs
......@@ -49,7 +49,7 @@ message(STATUS "DEP_VAR: ${DEP_VAR}")
message(STATUS "JAVA_HOME: ${JAVA_HOME}")
# compiler
find_program(CC NAMES gcc PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
find_program(CC NAMES gcc PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
find_program(CXX NAMES g++ PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
find_program(AR NAMES gcc-ar ar PATHS ${DEP_VAR}/usr/local/oceanbase/devtools/bin/ ${DEP_VAR}/usr/local/gcc-5.2.0/bin/ /usr/bin/ NO_DEFAULT_PATH)
SET(CMAKE_C_COMPILER ${CC})
......@@ -150,7 +150,11 @@ endforeach ()
add_library(PROTO_OBJS OBJECT ${PROTO_SRCS} ${PROTO_HDRS})
message("protoc: ${PROTOBUF_PROTOC_EXECUTABLE}, proto srcs : ${PROTO_SRCS}")
# obliboblog
# oblogmsg
include(oblogmsg)
SET(OBLOGMSG_MAPPING "")
# oblog
if (FIND_LIBOBLOG)
find_path(LIBOBLOG_INCLUDE_PATH NAMES liboblog.h)
find_library(LIBOBLOG_LIBRARIES NAMES liboblog.so)
......@@ -160,11 +164,6 @@ if (FIND_LIBOBLOG)
endif ()
GET_FILENAME_COMPONENT(LIBOBLOG_LIB_DIR ${LIBOBLOG_LIBRARIES} DIRECTORY)
# oblogmsg
include(oblogmsg)
SET(OBLOGMSG_MAPPING "")
message("oblogmsg: ${OBLOGMSG_INCLUDE_DIR}, ${OBLOGMSG_LIBRARIES}")
message("liboblog: ${LIBOBLOG_INCLUDE_PATH}, ${LIBOBLOG_LIB_DIR}")
......
......@@ -28,8 +28,6 @@ ExternalProject_Add(
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
-DCMAKE_BUILD_TYPE=Debug
-DCMAKE_PREFIX_PATH=${prefix_path}
-DTEST=OFF
-DUSE_CXX11_ABI=${USE_CXX11_ABI}
INSTALL_COMMAND mkdir -p ${OBLOGMSG_INSTALL_DIR}/lib ${OBLOGMSG_INSTALL_DIR}/include COMMAND cp -r ${OBLOGMSG_SOURCES_DIR}/src/extern_oblogmsg/src/${OBLOGMSG_LIBRARIES} ${OBLOGMSG_INSTALL_DIR}/lib/ COMMAND cp -r ${OBLOGMSG_SOURCES_DIR}/src/extern_oblogmsg/include ${OBLOGMSG_INSTALL_DIR}/ COMMAND cp -r ${OBLOGMSG_SOURCES_DIR}/src/extern_oblogmsg/src/${OBLOGMSG_LIBRARIES} ${LIBRARY_OUTPUT_PATH}/
)
......
syntax = "proto3";
package oceanbase.logproxy.legacy;
option java_package = "com.oceanbase.clogproxy.common.packet.protocol";
option java_outer_classname = "LogProxyProto";
message PbPacket {
int32 type = 1; // HeaderType
int32 compress_type = 2; // CompressType
// resevered for other options
bytes payload = 100;
}
message ClientHandShake {
int32 log_type = 1; // LogType
string client_ip = 2;
string client_id = 3;
string client_version = 4;
bool enable_monitor = 5;
string configuration = 6;
}
message RuntimeStatus {
string ip = 1;
int32 port = 2;
int32 stream_count = 3;
int32 worker_count = 4;
}
\ No newline at end of file
......@@ -10,29 +10,92 @@
* See the Mulan PubL v2 for more details.
*/
#include "common/guard.hpp"
#include "communication/channel.h"
#include "codec/decoder.h"
#include "legacy.pb.h"
namespace oceanbase {
namespace logproxy {
static PacketError decode_v1(Channel* ch, Message*& message)
{
// V1 is protobuf handshake packet:
// [4] pb packet length
// [pb packet length] pb buffer
uint32_t payload_size = 0;
if (ch->readn((char*)(&payload_size), 4) != OMS_OK) {
OMS_ERROR << "Failed to read message payload size, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return PacketError::NETWORK_ERROR;
}
payload_size = be_to_cpu<uint32_t>(payload_size);
// FIXME.. use an mem pool
char* payload_buf = (char*)malloc(payload_size);
if (nullptr == payload_buf) {
OMS_ERROR << "Failed to malloc memory for message data. size:" << payload_size << ", ch:" << ch->peer().id();
return PacketError::OUT_OF_MEMORY;
}
FreeGuard<char*> payload_buf_guard(payload_buf);
if (ch->readn(payload_buf, payload_size) != 0) {
OMS_ERROR << "Failed to read message. ch:" << ch->peer().id() << ", error:" << strerror(errno);
return PacketError::NETWORK_ERROR;
}
payload_buf_guard.release();
legacy::PbPacket pb_packet;
bool ret = pb_packet.ParseFromArray(payload_buf, payload_size);
if (!ret) {
OMS_ERROR << "Failed to parse payload, ch:" << ch->peer().id();
return PacketError::PROTOCOL_ERROR;
}
if ((MessageType)pb_packet.type() != MessageType::HANDSHAKE_REQUEST_CLIENT) {
OMS_ERROR << "Invalid packet type:" << pb_packet.type() << ", ch:" << ch->peer().id();
return PacketError::PROTOCOL_ERROR;
}
legacy::ClientHandShake handshake;
ret = handshake.ParseFromString(pb_packet.payload());
if (!ret) {
OMS_ERROR << "Failed to parse handshake, ch:" << ch->peer().id();
return PacketError::PROTOCOL_ERROR;
}
ClientHandshakeRequestMessage* msg = new (std::nothrow) ClientHandshakeRequestMessage;
msg->log_type = handshake.log_type();
msg->ip = handshake.client_ip();
msg->id = handshake.client_id();
msg->version = handshake.client_version();
msg->configuration = handshake.configuration();
message = msg;
return PacketError::SUCCESS;
}
/*
* =========== Message Header ============
* [4] type
*/
PacketError LegacyDecoder::decode(Channel* ch, MessageVersion version, Message*& message)
{
OMS_DEBUG << "Legacy decode with, version: " << (int)version;
if (version == MessageVersion::V1) {
return decode_v1(ch, message);
}
// type
int32_t type = -1;
if (ch->readn((char*)&type, 4) != OMS_OK) {
OMS_ERROR << "Failed to read message header, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return PacketError::NETWORK_ERROR;
}
type = be_to_cpu<int8_t>(type);
type = be_to_cpu<int32_t>(type);
if (!is_type_available(type)) {
OMS_ERROR << "Invalid packet type:" << type << ", ch:" << ch->peer().id();
return PacketError::PROTOCOL_ERROR;
}
OMS_DEBUG << "Legacy message type:" << type;
int ret = OMS_OK;
switch ((MessageType)type) {
......@@ -41,7 +104,7 @@ PacketError LegacyDecoder::decode(Channel* ch, MessageVersion version, Message*&
break;
default:
// We don not care other request type as a server decoder
break;
return PacketError::IGNORE;
}
return ret == OMS_OK ? PacketError::SUCCESS : PacketError::PROTOCOL_ERROR;
......@@ -55,10 +118,13 @@ static int read_varstr(Channel* ch, std::string& val)
}
len = be_to_cpu<uint32_t>(len);
val.reserve(len);
if (ch->readn((char*)val.data(), len) != OMS_OK) {
char* buf = (char*)malloc(len);
FreeGuard<char*> ff(buf);
if (ch->readn(buf, len) != OMS_OK) {
return OMS_FAILED;
}
val.assign(buf, len);
return OMS_OK;
}
......@@ -76,22 +142,32 @@ int LegacyDecoder::decode_handshake_request(Channel* ch, Message*& message)
OMS_ERROR << "Failed to read message log_type, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return OMS_FAILED;
}
OMS_DEBUG << "log type:" << (int)msg->log_type;
if (read_varstr(ch, msg->ip) != OMS_OK) {
OMS_ERROR << "Failed to read message Client IP, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return OMS_FAILED;
}
OMS_DEBUG << "client ip: " << msg->ip;
if (read_varstr(ch, msg->id) != OMS_OK) {
OMS_ERROR << "Failed to read message Client IP, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return OMS_FAILED;
}
OMS_DEBUG << "client id: " << msg->id;
if (read_varstr(ch, msg->version) != OMS_OK) {
OMS_ERROR << "Failed to read message Client IP, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return OMS_FAILED;
}
OMS_DEBUG << "client version: " << msg->version;
if (read_varstr(ch, msg->configuration) != OMS_OK) {
OMS_ERROR << "Failed to read message Client IP, ch:" << ch->peer().id() << ", error:" << strerror(errno);
return OMS_FAILED;
}
OMS_DEBUG << "configuration: " << msg->configuration;
message = msg;
return OMS_OK;
}
......
......@@ -23,7 +23,7 @@ static Config& _s_config = Config::instance();
LegacyEncoder::LegacyEncoder()
{
/*
* [4] reponse code
* [4] response code
* [1+varstr] Server IP
* [1+varstr] Server Version
*/
......@@ -35,12 +35,12 @@ LegacyEncoder::LegacyEncoder()
OMS_ERROR << "Failed to encode handshake request due to failed to alloc memory";
return OMS_FAILED;
}
// response code
size_t offset = 0;
// Response code
memset(buf, 0, 4);
offset += 4;
// Server IP
size_t offset = 4;
uint8_t varlen = msg.ip.size();
memcpy(buf + offset, &varlen, 1);
offset += 1;
......@@ -77,46 +77,34 @@ LegacyEncoder::LegacyEncoder()
size_t size = 0;
// got independ address
const char* log_record_buffer = record->getFormatedString(&size);
if (nullptr == log_record_buffer) {
const char* logmsg_buf = record->getFormatedString(&size);
if (logmsg_buf == nullptr) {
OMS_ERROR << "Failed to serialize log record";
return OMS_FAILED;
}
const MsgHeader* header = (const MsgHeader*)(log_record_buffer);
if (_s_config.verbose_packet.val()) {
const MsgHeader* header = (const MsgHeader*)(logmsg_buf);
OMS_DEBUG << "Encode LogMessage Header, type: " << header->m_msgType << ", version: " << header->m_version
<< ", size: " << header->m_size;
}
size_t calc_size = header->m_size + sizeof(MsgHeader);
if (calc_size != size) {
if (calc_size > size) {
OMS_FATAL << "LogMessage Invalid, header calc size:" << calc_size << " > buffer size:" << size;
return OMS_FAILED;
}
if (_s_config.verbose_packet.val()) {
OMS_WARN << "LogMessage header size:" << calc_size << " != toString size:" << size
<< ". adjust to header size";
}
size = calc_size;
}
total_size += calc_size;
uint32_t seq_be = cpu_to_be<uint32_t>(i);
uint32_t size_be = cpu_to_be<uint32_t>(size);
buffer.push_back_copy((char*)&seq_be, 4);
buffer.push_back_copy((char*)&size_be, 4);
buffer.push_back((char*)logmsg_buf, size, false);
uint32_t seq = cpu_to_be<uint32_t>(i);
uint32_t len = cpu_to_be<uint32_t>(calc_size);
buffer.push_back_copy((char*)&seq, 4);
buffer.push_back_copy((char*)&len, 4);
buffer.push_back((char*)log_record_buffer, calc_size, false);
total_size += (size + 8);
}
uint32_t packet_len = cpu_to_be<uint32_t>(total_size + 7);
uint32_t packet_len_be = cpu_to_be<uint32_t>(total_size + 9);
total_size = cpu_to_be<uint32_t>(total_size);
char* buf = (char*)malloc(13);
memcpy(buf, &packet_len, sizeof(packet_len));
char* buf = (char*)malloc(4 + 1 + 4 + 4);
memcpy(buf, &packet_len_be, 4);
memset(buf + 4, 0, 1); // CompressType::PLAIN
memcpy(buf + 5, &total_size, sizeof(total_size));
memcpy(buf + 9, &total_size, sizeof(total_size));
memcpy(buf + 5, &total_size, 4);
memcpy(buf + 9, &total_size, 4);
buffer.push_front(buf, 13);
return OMS_OK;
});
......@@ -130,23 +118,17 @@ LegacyEncoder::LegacyEncoder()
_funcs.emplace((int8_t)MessageType::ERROR_RESPONSE, [](const Message& in_msg, MessageBuffer& buffer) {
const ErrorMessage& msg = (const ErrorMessage&)in_msg;
size_t len = 4 + 4 + msg.message.size();
size_t len = 4 + msg.message.size();
char* buf = (char*)malloc(len);
if (buf == nullptr) {
OMS_ERROR << "Failed to encode error message due to failed to alloc memory";
return OMS_FAILED;
}
// response code
int code = cpu_to_be<int>(msg.code);
memcpy(buf, &code, 4);
size_t offset = 4;
// Error message
uint32_t varlen = cpu_to_be<uint32_t>(msg.message.size());
memcpy(buf + offset, &varlen, 4);
offset += 4;
memcpy(buf + offset, msg.message.c_str(), varlen);
memcpy(buf, &varlen, 4);
memcpy(buf + 4, msg.message.c_str(), varlen);
buffer.push_back(buf, len);
return OMS_OK;
......@@ -155,7 +137,24 @@ LegacyEncoder::LegacyEncoder()
int LegacyEncoder::encode(const Message& msg, MessageBuffer& buffer)
{
return _funcs[(int8_t)msg.type()](msg, buffer);
int ret = _funcs[(int8_t)msg.type()](msg, buffer);
if (ret == OMS_FAILED) {
return OMS_FAILED;
}
// append header
size_t len = 2 + 4;
char* buf = (char*)malloc(len);
// version code
memset(buf, 0, 2);
// response type code
uint32_t msg_type_be = cpu_to_be<uint32_t>((uint32_t)msg.type());
memcpy(buf + 2, &msg_type_be, 4);
buffer.push_front(buf, len);
return ret;
}
} // namespace logproxy
......
......@@ -58,6 +58,7 @@ enum class CompressType {
enum class PacketError {
SUCCESS,
IGNORE,
OUT_OF_MEMORY,
PROTOCOL_ERROR,
NETWORK_ERROR,
......@@ -115,7 +116,7 @@ public:
~ClientHandshakeRequestMessage() override = default;
OMS_MF_DFT(int, log_type, -1);
OMS_MF_DFT(uint8_t, log_type, 0);
OMS_MF(std::string, id);
OMS_MF(std::string, ip);
OMS_MF(std::string, version);
......@@ -164,7 +165,7 @@ public:
~RecordDataMessage() override;
size_t count() const
inline size_t count() const
{
return records.size();
}
......
......@@ -60,6 +60,7 @@ PacketError ProtobufDecoder::decode(Channel* ch, MessageVersion version, Message
return PacketError::PROTOCOL_ERROR;
}
// FIXME.. use an mem pool
char* payload_buf = (char*)malloc(payload_size);
if (nullptr == payload_buf) {
OMS_ERROR << "Failed to malloc memory for message data. size:" << payload_size << ", ch:" << ch->peer().id();
......
......@@ -32,29 +32,42 @@ void Counter::run()
while (is_run()) {
_timer.reset();
this->sleep();
int64_t interval = _timer.elapsed() / 1000;
uint64_t rcount = _read_count.fetch_and(0);
uint64_t wcount = _write_count.fetch_and(0);
uint64_t rio = _read_io.fetch_and(0);
uint64_t wio = _write_io.fetch_and(0);
uint64_t rtps = rcount / (interval / 1000);
uint64_t wtps = wcount / (interval / 1000);
uint64_t rios = rio / (interval / 1000);
uint64_t wios = wio / (interval / 1000);
int delay = (int)time(nullptr) - _timestamp;
int chk_delay = time(nullptr) - _checkpoint;
int64_t interval_ms = _timer.elapsed() / 1000;
int64_t interval_s = interval_ms == 0 ? 0 : (interval_ms / 1000);
uint64_t rcount = _read_count.load();
uint64_t wcount = _write_count.load();
uint64_t rio = _read_io.load();
uint64_t wio = _write_io.load();
uint64_t rtps = interval_s == 0 ? rcount : (rcount / interval_s);
uint64_t wtps = interval_s == 0 ? wcount : (wcount / interval_s);
uint64_t rios = interval_s == 0 ? rio : (rio / interval_s);
uint64_t wios = interval_s == 0 ? wio : (wio / interval_s);
int nowtm = time(nullptr);
int delay = nowtm - _timestamp;
int chk_delay = nowtm - _checkpoint;
// TODO... bytes rate
ss.str("");
ss << "Counter:[Interval:" << interval << "ms][Delay:" << delay << "," << chk_delay << "][Read:" << rcount
<< "][RTPS:" << rtps << "][RIOS:" << rios << "][Write:" << wcount << "][WTPS:" << wtps << "][WIOS:" << wios
ss << "Counter:[Span:" << interval_ms << "ms][Delay:" << delay << "," << chk_delay << "][RCNT:" << rcount
<< "][RTPS:" << rtps << "][RIOS:" << rios << "][WCNT:" << wcount << "][WTPS:" << wtps << "][WIOS:" << wios
<< "]";
for (auto& count : _counts) {
uint64_t c = count.count.load();
ss << "[" << count.name << ":" << c << "]";
count.count.fetch_sub(c);
}
for (auto& entry : _gauges) {
ss << "[" << entry.first << ":" << entry.second() << "]";
}
OMS_INFO << ss.str();
// sub count that logged
_read_count.fetch_sub(rcount);
_write_count.fetch_sub(wcount);
_read_io.fetch_sub(rio);
_write_io.fetch_sub(wio);
}
OMS_INFO << "#### Counter thread stop, tid: " << tid();
......@@ -85,6 +98,11 @@ void Counter::count_write_io(int bytes)
_write_io.fetch_add(bytes);
}
void Counter::count_key(Counter::CountKey key, uint64_t count)
{
_counts[key].count.fetch_add(count);
}
void Counter::mark_timestamp(int timestamp)
{
_timestamp = timestamp;
......
......@@ -46,6 +46,16 @@ public:
void count_write_io(int bytes);
// MUST BE as same order as _counts
enum CountKey {
READER_FETCH_US = 0,
READER_OFFER_US = 1,
SENDER_POLL_US = 2,
SENDER_SEND_US = 3,
};
void count_key(CountKey key, uint64_t count);
void mark_timestamp(int timestamp);
void mark_checkpoint(uint64_t checkpoint);
......@@ -54,6 +64,14 @@ private:
void sleep();
private:
struct CountItem {
const char* name;
std::atomic<uint64_t> count{0};
CountItem(const char* n) : name(n)
{}
};
Timer _timer;
std::atomic<uint64_t> _read_count{0};
......@@ -63,6 +81,8 @@ private:
volatile int _timestamp = time(nullptr);
volatile int _checkpoint = time(nullptr);
CountItem _counts[4]{{"RFETCH"}, {"ROFFER"}, {"SPOLL"}, {"SSEND"}};
std::map<std::string, std::function<int64_t()>> _gauges;
std::mutex _sleep_cv_lk;
......
......@@ -12,6 +12,8 @@
#pragma once
#include <stdlib.h>
namespace oceanbase {
namespace logproxy {
......@@ -24,7 +26,7 @@ public:
~FreeGuard()
{
if (_own) {
free(_ptr);
::free(_ptr);
_ptr = nullptr;
}
}
......
......@@ -37,9 +37,7 @@ public:
inline void reset()
{
struct timeval tm;
gettimeofday(&tm, nullptr);
_start_time = tm.tv_sec * 1000000 + tm.tv_usec;
_start_time = now();
}
inline void reset(uint64_t us)
......
......@@ -401,7 +401,11 @@ PacketError Communicator::receive_message(Channel* ch, Message*& msg)
return PacketError::PROTOCOL_ERROR;
}
return _decoders[version]->decode(ch, (MessageVersion)version, msg);
auto ret = _decoders[version]->decode(ch, (MessageVersion)version, msg);
if (msg != nullptr) {
msg->set_version((MessageVersion)version);
}
return ret;
}
void Communicator::on_event(int fd, short event, void* arg)
......@@ -419,7 +423,7 @@ void Communicator::on_event(int fd, short event, void* arg)
Communicator& c = *ch->get_communicator();
OMS_INFO << "On event fd: " << fd << " got channel, peer: " << ch->peer().to_string();
EventResult er = EventResult::ER_SUCCESS;
EventResult err = EventResult::ER_SUCCESS;
if ((event & EV_FINALIZE) || (event & EV_CLOSED)) {
OMS_WARN << "On event close, peer: " << ch->peer().to_string();
......@@ -429,13 +433,13 @@ void Communicator::on_event(int fd, short event, void* arg)
if (event & EV_WRITE) {
event_del(&ch->_write_event);
}
er = EventResult::ER_CLOSE_CHANNEL;
err = EventResult::ER_CLOSE_CHANNEL;
} else {
if ((event & EV_WRITE) && ch->_write_msg != nullptr) {
OMS_INFO << "On event about to write message: " << ch->peer().to_string();
if (c.write_message(ch, *ch->_write_msg) != OMS_OK) {
er = EventResult::ER_CLOSE_CHANNEL;
err = EventResult::ER_CLOSE_CHANNEL;
}
ch->set_write_msg(nullptr);
event_del(&ch->_write_event); // one-shot
......@@ -444,20 +448,20 @@ void Communicator::on_event(int fd, short event, void* arg)
if (event & EV_READ) {
Message* msg = nullptr;
PacketError result = c.receive_message(ch, msg);
if (result != PacketError::SUCCESS) {
if (result == PacketError::SUCCESS) {
auto event_callback = c._read_callback.load();
if (event_callback != nullptr) {
err = (*event_callback)(ch->_peer, *msg);
}
} else if (result == PacketError::IGNORE) {
// do nothing
} else {
OMS_ERROR << "Failed to handle receive message, ret: " << (int)result;
auto error_callback = c._error_callback.load();
if (error_callback != nullptr) {
er = (*error_callback)(ch->_peer, result);
err = (*error_callback)(ch->_peer, result);
} else {
er = EventResult::ER_CLOSE_CHANNEL;
}
} else {
auto event_callback = c._read_callback.load();
if (event_callback != nullptr) {
er = (*event_callback)(ch->_peer, *msg);
err = EventResult::ER_CLOSE_CHANNEL;
}
}
delete msg;
......@@ -465,7 +469,7 @@ void Communicator::on_event(int fd, short event, void* arg)
}
ch->put();
switch (er) {
switch (err) {
case EventResult::ER_CLOSE_CHANNEL:
c.remove_channel(ch->_peer);
break;
......
......@@ -10,6 +10,8 @@
* See the Mulan PubL v2 for more details.
*/
#include "MsgHeader.h"
#include "common/log.h"
#include "common/config.h"
#include "common/counter.h"
......
......@@ -47,9 +47,15 @@ void ReaderRoutine::run()
return;
}
Counter& counter = Counter::instance();
Timer stage_tm;
while (is_run()) {
stage_tm.reset();
ILogRecord* record = nullptr;
int ret = _oblog.fetch(record, _s_config.read_timeout_us.val());
int64_t fetch_us = stage_tm.elapsed();
if (ret == OB_TIMEOUT && record == nullptr) {
OMS_INFO << "fetch liboblog timeout, nothing incoming...";
continue;
......@@ -60,12 +66,16 @@ void ReaderRoutine::run()
continue;
}
stage_tm.reset();
while (!_queue.offer(record, _s_config.read_timeout_us.val())) {
OMS_WARN << "reader transfer queue full(" << _queue.size(false) << "), retry...";
}
int64_t offer_us = stage_tm.elapsed();
Counter::instance().count_read_io(record->getRealSize());
Counter::instance().count_read(1);
counter.count_key(Counter::READER_FETCH_US, fetch_us);
counter.count_key(Counter::READER_OFFER_US, offer_us);
counter.count_read_io(record->getRealSize());
counter.count_read(1);
}
ObLogReader::instance().stop();
......
......@@ -69,16 +69,13 @@ void SenderRoutine::stop()
void SenderRoutine::run()
{
#ifndef NEED_MAPPING_CLASS
LogMsgLocalInit();
#else
LMLocalinit();
#endif
std::vector<ILogRecord*> records;
records.reserve(_s_config.read_wait_num.val());
while (is_run()) {
if (!_s_config.readonly.val()) {
int ret = _comm.poll();
if (ret != OMS_OK) {
......@@ -88,10 +85,13 @@ void SenderRoutine::run()
}
}
_stage_timer.reset();
records.clear();
while (!_rqueue.poll(records, _s_config.read_timeout_us.val()) || records.empty()) {
OMS_INFO << "send transfer queue empty, retry...";
}
int64_t poll_us = _stage_timer.elapsed();
Counter::instance().count_key(Counter::SENDER_POLL_US, poll_us);
for (size_t i = 0; i < records.size(); ++i) {
ILogRecord* record = records[i];
......@@ -100,14 +100,14 @@ void SenderRoutine::run()
if (_s_config.verbose_packet.val()) {
OMS_INFO << "fetch a records(" << (i + 1) << "/" << records.size() << ") from liboblog: "
<< "size:" << record->getRealSize() << ", record_type:" << record->recordType()
<< ", timestamp:" << record->getTimestamp() << ", checkpoint:" << record->getFileNameOffset()
<< ", timestamp:" << record->getTimestamp() << ", checkpoint:" << record->getCheckpoint1()
<< ", dbname:" << record->dbname() << ", tbname:" << record->tbname()
<< ", queue size:" << _rqueue.size(false);
}
if (_s_config.readonly.val()) {
Counter::instance().count_write(1);
Counter::instance().mark_timestamp(record->getTimestamp());
Counter::instance().mark_checkpoint(record->getFileNameOffset());
Counter::instance().mark_checkpoint(record->getCheckpoint1());
_oblog.release(record);
}
}
......@@ -122,7 +122,12 @@ void SenderRoutine::run()
for (i = 0; i < records.size(); ++i) {
ILogRecord* r = records[i];
size_t size = 0;
r->toString(&size, true);
const char* rbuf = r->toString(&size, true);
if (rbuf == nullptr) {
OMS_ERROR << "failed parse logmsg Record, !!!EXIT!!!";
stop();
break;
}
if (packet_size + size > _s_config.max_packet_bytes.val()) {
if (packet_size == 0) {
......@@ -163,11 +168,7 @@ void SenderRoutine::run()
}
}
#ifndef NEED_MAPPING_CLASS
LogMsgLocalDestroy();
#else
LMLocaldestroy();
#endif
ObLogReader::instance().stop();
}
......@@ -176,15 +177,19 @@ int SenderRoutine::do_send(const std::vector<ILogRecord*>& records, size_t offse
if (_s_config.verbose.val()) {
OMS_DEBUG << "send record range[" << offset << ", " << offset + count << ")";
}
_stage_timer.reset();
RecordDataMessage msg(records, offset, count);
msg.set_version(_packet_version);
msg.compress_type = CompressType::PLAIN;
int ret = _comm.send_message(_client_peer, msg, true);
Counter::instance().count_key(Counter::SENDER_SEND_US, _stage_timer.elapsed());
if (ret == OMS_OK) {
ILogRecord* last = records[offset + count - 1];
Counter::instance().count_write(count);
Counter::instance().mark_timestamp(last->getTimestamp());
Counter::instance().mark_checkpoint(last->getFileNameOffset());
Counter::instance().mark_checkpoint(last->getCheckpoint1());
} else {
OMS_WARN << "Failed to send record data message to client. peer=" << _client_peer.id();
}
......
......@@ -13,6 +13,7 @@
#pragma once
#include "common/thread.h"
#include "common/timer.h"
#include "common/blocking_queue.hpp"
#include "oblogreader/oblog_access.h"
......@@ -41,6 +42,8 @@ private:
MessageVersion _packet_version;
PeerInfo _client_peer;
Timer _stage_timer;
};
} // namespace logproxy
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册