From a97444e77c6449bc9e766fae621a01a6fcad1cbf Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 28 Oct 2022 08:37:39 +0000 Subject: [PATCH] [server net keepalive] sync serialization size --- deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp | 80 ++++++++++++++++++- 1 file changed, 76 insertions(+), 4 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp index cb3229bf13..da318dbd89 100644 --- a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp @@ -28,10 +28,13 @@ #include "lib/utility/ob_defer.h" #include "lib/thread/ob_thread_name.h" #include "lib/time/ob_time_utility.h" +#include "lib/utility/serialization.h" +#include "lib/utility/utility.h" #include "rpc/frame/ob_net_easy.h" #include "io/easy_negotiation.h" using namespace oceanbase::common; +using namespace oceanbase::common::serialization; using namespace oceanbase::lib; using namespace oceanbase::rpc::frame; namespace oceanbase @@ -44,6 +47,43 @@ namespace obrpc #define WINDOW_MAX_FAILS 4 // 4 times #define MAX_CREDIBLE_WINDOW 10 * 1000 * 1000 // 10s +constexpr int32_t KP_MAGIC = 0x2c15c364; +struct Header +{ +public: + Header(int32_t data_len = 0) + : magic_(KP_MAGIC), data_len_(data_len) {} + int encode(char *buf, const int64_t buf_len, int64_t &pos) + { + int ret = OB_SUCCESS; + if (OB_FAIL(encode_i32(buf, buf_len, pos, magic_))) { + _LOG_WARN("encode magic failed, ret: %d, pos: %ld", ret, pos); + } else if (OB_FAIL(encode_i32(buf, buf_len, pos, data_len_))) { + _LOG_WARN("encode data len failed, ret: %d, pos: %ld", ret, pos); + } + return ret; + } + int decode(const char *buf, const int64_t buf_len, int64_t &pos) + { + int ret = OB_SUCCESS; + if (OB_FAIL(decode_i32(buf, buf_len, pos, &magic_))) { + _LOG_WARN("decode magic failed, ret: %d, pos: %ld", ret, pos); + } else if (magic_ != KP_MAGIC) { + ret = OB_ERR_UNEXPECTED; + _LOG_WARN("unexpected magic, magic: %d", magic_); + } else if (OB_FAIL(decode_i32(buf, buf_len, pos, &data_len_))) { + _LOG_WARN("decode data len failed, ret: %d, pos: %ld", ret, pos); + } + return ret; + } + int32_t get_encoded_size() const + { + return encoded_length_i32(magic_) + encoded_length_i32(data_len_); + } + int32_t magic_; + int32_t data_len_; +}; + enum { UNCONNECT = 0, CONNECTING, @@ -333,6 +373,7 @@ void ObNetKeepAlive::do_server_loop() for (int i = 0; i < cnt; i++) { struct server *s = (struct server *)events[i].data.ptr; int ev_fd = NULL == s? pipefd_ : s->fd_; + bool need_disconn = false; if (NULL == s) { struct server *s = (struct server *)ob_malloc(sizeof(struct server), "KeepAliveServer"); if (NULL == s) { @@ -383,10 +424,26 @@ void ObNetKeepAlive::do_server_loop() char data = PROTOCOL_DATA; while ((n = read(ev_fd, &data, sizeof data)) < 0 && errno == EINTR); if (n <= 0) break; - while ((n = write(ev_fd, &PROTOCOL_DATA, sizeof PROTOCOL_DATA)) < 0 && errno == EINTR); + char buf[128]; + const int64_t buf_len = sizeof buf; + int32_t data_len = 1; + Header header(data_len); + int tmp_ret = OB_SUCCESS; + int64_t pos = 0; + if (OB_SUCCESS != (tmp_ret = header.encode(buf, buf_len, pos))) { + _LOG_WARN("encode failed, ret: %d, pos: %ld", tmp_ret, pos); + } else if (FALSE_IT(pos += data_len)/*TODO: encode data*/) { + _LOG_WARN("encode data failed: %d", data_len); + } else { + while ((n = write(ev_fd, buf, pos)) < 0 && errno == EINTR); + need_disconn = n < pos; + } } } - if (events[i].events & (EPOLLRDHUP | EPOLLHUP)) { + if (!need_disconn) { + need_disconn = events[i].events & (EPOLLRDHUP | EPOLLHUP); + } + if (need_disconn) { _LOG_INFO("server connection closed, fd: %d, addr: %s", ev_fd, NULL == s? "" : addr_to_string(s->cli_addr_)); epoll_ctl(epfd, EPOLL_CTL_DEL, ev_fd, NULL); close(ev_fd); @@ -464,9 +521,24 @@ void ObNetKeepAlive::do_client_loop() _LOG_DEBUG("update read ts, addr: %s, fd: %d, ts: %ld", addr_to_string(rs->svr_addr_), ev_fd, rs->last_read_ts_); for (;;) { ssize_t n = -1; - char data = PROTOCOL_DATA; - while ((n = read(ev_fd, &data, sizeof data)) < 0 && errno == EINTR); + char buf[128]; + Header header; + int32_t read_len = header.get_encoded_size(); + while ((n = read(ev_fd, buf, read_len)) < 0 && errno == EINTR); if (n <= 0) break; + int tmp_ret = OB_SUCCESS; + int64_t pos = 0; + if (OB_SUCCESS != (tmp_ret = header.decode(buf, read_len, pos))) { + _LOG_WARN("decode failed, ret: %d, pos: %ld", tmp_ret, pos); + } else { + char data[512]; // TODO + if (header.data_len_ > sizeof data) { + tmp_ret = OB_BUF_NOT_ENOUGH; + _LOG_WARN("data buf not enough: %d", header.data_len_); + } else { + while ((n = read(ev_fd, data, header.data_len_)) < 0 && errno == EINTR); + } + } do_rpin(client2rs(c)); } } -- GitLab