提交 49b53c45 编写于 作者: M ml0 提交者: wangzelin.wzl

A couple of bug-fixes for opensource branch.

上级 192f6aee
...@@ -156,7 +156,7 @@ void ObNetEasy::update_eio_sql_tcp_keepalive(easy_io_t* eio, int64_t user_timeou ...@@ -156,7 +156,7 @@ void ObNetEasy::update_eio_sql_tcp_keepalive(easy_io_t* eio, int64_t user_timeou
eio->tcp_keepintvl = max(tcp_keepintvl / 1000000, 1); eio->tcp_keepintvl = max(tcp_keepintvl / 1000000, 1);
eio->tcp_keepcnt = tcp_keepcnt; eio->tcp_keepcnt = tcp_keepcnt;
eio->conn_timeout = user_timeout / 1000; eio->conn_timeout = user_timeout / 1000;
eio->ack_timeout = user_timeout / 1000; eio->ack_timeout = 0;
} }
} }
......
...@@ -32,6 +32,8 @@ int ObReqProcessor::run() ...@@ -32,6 +32,8 @@ int ObReqProcessor::run()
run_timestamp_ = ObTimeUtility::current_time(); run_timestamp_ = ObTimeUtility::current_time();
if (OB_FAIL(check_timeout())) { if (OB_FAIL(check_timeout())) {
LOG_WARN("req timeout", K(ret)); LOG_WARN("req timeout", K(ret));
} else if (OB_FAIL(check_cluster_id())) {
LOG_WARN("checking cluster ID failed", K(ret));
} else if (OB_FAIL(deserialize())) { } else if (OB_FAIL(deserialize())) {
before_process_ret_ = ret; before_process_ret_ = ret;
deseri_succ = false; deseri_succ = false;
...@@ -80,6 +82,11 @@ int ObReqProcessor::run() ...@@ -80,6 +82,11 @@ int ObReqProcessor::run()
return ret; return ret;
} }
int ObReqProcessor::check_cluster_id()
{
return OB_SUCCESS;
}
int ObReqProcessor::after_process() int ObReqProcessor::after_process()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
......
...@@ -108,6 +108,7 @@ protected: ...@@ -108,6 +108,7 @@ protected:
{ {
return common::OB_SUCCESS; return common::OB_SUCCESS;
} }
virtual int check_cluster_id();
virtual int deserialize() = 0; virtual int deserialize() = 0;
virtual int serialize() = 0; virtual int serialize() = 0;
......
...@@ -59,6 +59,7 @@ int async_cb(easy_request_t* r) ...@@ -59,6 +59,7 @@ int async_cb(easy_request_t* r)
// 3. destination responses but can't fulfilled as a single packet until timeout // 3. destination responses but can't fulfilled as a single packet until timeout
// We set easy error so that return EASY_ERROR to easy. // We set easy error so that return EASY_ERROR to easy.
easy_err = cb->get_error(); easy_err = cb->get_error();
cb->set_error(easy_err);
ret = cb->on_error(easy_err); ret = cb->on_error(easy_err);
if (OB_ERROR == ret) { if (OB_ERROR == ret) {
/* /*
...@@ -72,10 +73,22 @@ int async_cb(easy_request_t* r) ...@@ -72,10 +73,22 @@ int async_cb(easy_request_t* r)
} else if (OB_FAIL(cb->decode(r->ipacket))) { } else if (OB_FAIL(cb->decode(r->ipacket))) {
cb->on_invalid(); cb->on_invalid();
LOG_DEBUG("decode failed", K(ret)); LOG_DEBUG("decode failed", K(ret));
} else if (OB_PACKET_CLUSTER_ID_NOT_MATCH == cb->get_rcode()) {
LOG_ERROR("wrong cluster id", K(ret));
cb->set_error(EASY_ERROR);
ret = cb->on_error(EASY_ERROR);
if (OB_ERROR == ret) {
/*
* The derived classe has not overwrite thie own on_error callback. We still use
* on_timeout for For backward compatibility.
*/
cb->on_timeout();
}
} else { } else {
after_decode_time = ObTimeUtility::current_time(); after_decode_time = ObTimeUtility::current_time();
ObRpcPacket* pkt = reinterpret_cast<ObRpcPacket*>(r->ipacket); ObRpcPacket* pkt = reinterpret_cast<ObRpcPacket*>(r->ipacket);
pcode = pkt->get_pcode(); pcode = pkt->get_pcode();
bool cb_cloned = cb->get_cloned();
EVENT_INC(RPC_PACKET_IN); EVENT_INC(RPC_PACKET_IN);
EVENT_ADD(RPC_PACKET_IN_BYTES, pkt->get_clen() + pkt->get_header_size() + OB_NET_HEADER_LENGTH); EVENT_ADD(RPC_PACKET_IN_BYTES, pkt->get_clen() + pkt->get_header_size() + OB_NET_HEADER_LENGTH);
...@@ -83,6 +96,11 @@ int async_cb(easy_request_t* r) ...@@ -83,6 +96,11 @@ int async_cb(easy_request_t* r)
if (OB_FAIL(cb->process())) { if (OB_FAIL(cb->process())) {
LOG_DEBUG("process failed", K(ret)); LOG_DEBUG("process failed", K(ret));
} }
if (cb_cloned) {
LOG_DEBUG("reset rcode", K(cb_cloned));
cb->reset_rcode();
}
} }
} else { } else {
// async rpc without callback // async rpc without callback
...@@ -117,11 +135,6 @@ int async_cb(easy_request_t* r) ...@@ -117,11 +135,6 @@ int async_cb(easy_request_t* r)
return EASY_OK; return EASY_OK;
} }
int ObReqTransport::AsyncCB::get_error() const
{
return NULL != req_ ? ((easy_session_t*)req_->ms)->error : EASY_ERROR;
}
int ObReqTransport::AsyncCB::on_error(int) int ObReqTransport::AsyncCB::on_error(int)
{ {
/* /*
...@@ -198,6 +211,10 @@ int ObReqTransport::create_session(easy_session_t*& session, const ObAddr& addr, ...@@ -198,6 +211,10 @@ int ObReqTransport::create_session(easy_session_t*& session, const ObAddr& addr,
session->r.user_data = *newcb; session->r.user_data = *newcb;
if (!*newcb) { if (!*newcb) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
if (cb != *newcb) {
(*newcb)->set_cloned(true);
}
} }
} }
if (NULL == cb) { if (NULL == cb) {
......
...@@ -54,7 +54,9 @@ public: ...@@ -54,7 +54,9 @@ public:
// been called after easy has detected the response packet. // been called after easy has detected the response packet.
class AsyncCB { class AsyncCB {
public: public:
AsyncCB() : dst_(), timeout_(0), tenant_id_(0), req_(NULL), send_ts_(0), payload_(0) AsyncCB()
: dst_(), timeout_(0), tenant_id_(0),
err_(0), req_(NULL), send_ts_(0), payload_(0)
{} {}
virtual ~AsyncCB() virtual ~AsyncCB()
{} {}
...@@ -65,6 +67,10 @@ public: ...@@ -65,6 +67,10 @@ public:
{} {}
virtual int decode(void* pkt) = 0; virtual int decode(void* pkt) = 0;
virtual int process() = 0; virtual int process() = 0;
virtual void reset_rcode() = 0;
virtual void set_cloned(bool cloned) = 0;
virtual bool get_cloned() = 0;
virtual int get_rcode() = 0;
// invoke when get a valid packet on protocol level, but can't decode it. // invoke when get a valid packet on protocol level, but can't decode it.
virtual void on_invalid() virtual void on_invalid()
...@@ -77,7 +83,8 @@ public: ...@@ -77,7 +83,8 @@ public:
RPC_FRAME_LOG(DEBUG, "packet timeout"); RPC_FRAME_LOG(DEBUG, "packet timeout");
} }
virtual int on_error(int err); virtual int on_error(int err);
int get_error() const; void set_error(int err) { err_ = err; }
int get_error() const { return err_; }
void set_dst(const ObAddr& dst) void set_dst(const ObAddr& dst)
{ {
...@@ -118,7 +125,8 @@ public: ...@@ -118,7 +125,8 @@ public:
ObAddr dst_; ObAddr dst_;
int64_t timeout_; int64_t timeout_;
uint64_t tenant_id_; uint64_t tenant_id_;
const easy_request_t* req_; int err_;
const easy_request_t *req_;
int64_t send_ts_; int64_t send_ts_;
int64_t payload_; int64_t payload_;
}; };
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "rpc/obrpc/ob_rpc_stat.h" #include "rpc/obrpc/ob_rpc_stat.h"
#include "rpc/obrpc/ob_irpc_extra_payload.h" #include "rpc/obrpc/ob_irpc_extra_payload.h"
#include "rpc/obrpc/ob_rpc_processor_base.h" #include "rpc/obrpc/ob_rpc_processor_base.h"
#include "rpc/obrpc/ob_rpc_net_handler.h"
using namespace oceanbase::common; using namespace oceanbase::common;
...@@ -49,6 +50,25 @@ ObRpcProcessorBase::~ObRpcProcessorBase() ...@@ -49,6 +50,25 @@ ObRpcProcessorBase::~ObRpcProcessorBase()
} }
} }
int ObRpcProcessorBase::check_cluster_id()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(rpc_pkt_)) {
ret = OB_ERR_UNEXPECTED;
RPC_OBRPC_LOG(ERROR, "rpc_pkt_ should not be NULL", K(ret));
} else if (INVALID_CLUSTER_ID != ObRpcNetHandler::CLUSTER_ID
&& INVALID_CLUSTER_ID != rpc_pkt_->get_dst_cluster_id()
&& ObRpcNetHandler::CLUSTER_ID != rpc_pkt_->get_dst_cluster_id()) {
// The verification is turned on locally and does not match the received pkt dst_cluster_id
ret = OB_PACKET_CLUSTER_ID_NOT_MATCH;
if (REACH_TIME_INTERVAL(500 * 1000)) {
RPC_OBRPC_LOG(WARN, "packet dst_cluster_id not match", K(ret), "self.dst_cluster_id", ObRpcNetHandler::CLUSTER_ID,
"pkt.dst_cluster_id", rpc_pkt_->get_dst_cluster_id(), "pkt", *rpc_pkt_);
}
}
return ret;
}
int ObRpcProcessorBase::deserialize() int ObRpcProcessorBase::deserialize()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
......
...@@ -67,6 +67,7 @@ public: ...@@ -67,6 +67,7 @@ public:
return send_timestamp_; return send_timestamp_;
} }
virtual int check_cluster_id();
int64_t get_src_cluster_id() const int64_t get_src_cluster_id() const
{ {
int64_t cluster_id = common::OB_INVALID_CLUSTER_ID; int64_t cluster_id = common::OB_INVALID_CLUSTER_ID;
......
...@@ -90,7 +90,22 @@ public: ...@@ -90,7 +90,22 @@ public:
using Response = typename pcodeStruct::Response; using Response = typename pcodeStruct::Response;
public: public:
int decode(void* pkt); AsyncCB() { cloned_ = false; }
virtual ~AsyncCB() { reset_rcode(); }
int decode(void *pkt);
void reset_rcode()
{
rcode_.reset();
}
void set_cloned(bool cloned)
{
cloned_ = cloned;
}
bool get_cloned()
{
return cloned_;
}
int get_rcode();
virtual void do_first(); virtual void do_first();
virtual void set_args(const Request& arg) = 0; virtual void set_args(const Request& arg) = 0;
...@@ -101,6 +116,15 @@ public: ...@@ -101,6 +116,15 @@ public:
void check_request_rt(const bool force_print = false); void check_request_rt(const bool force_print = false);
protected: protected:
/*
* When the variable 'clone_' is true, it indicates that the derived class of AsyncCB realloctes
* new memory and clone itself in its overwriten 'clone' virtual function. But in some cases, the
* derived class reuses its original memory which is maintained by up-layer modules, and the value
* of 'clone_' is false. Further, rcode_.warnings_ may reallocate and enlarge it internal memory
* space when rpc packets deserealized. When clone_is false, the relocated memory in rcode_.warnings_
* has to be freed in the destructor of class AsyncCB.
*/
bool cloned_;
Response result_; Response result_;
ObRpcResultCode rcode_; ObRpcResultCode rcode_;
}; };
......
...@@ -245,6 +245,12 @@ int ObRpcProxy::AsyncCB<pcodeStruct>::decode(void* pkt) ...@@ -245,6 +245,12 @@ int ObRpcProxy::AsyncCB<pcodeStruct>::decode(void* pkt)
return ret; return ret;
} }
template <class pcodeStruct>
int ObRpcProxy::AsyncCB<pcodeStruct>::get_rcode()
{
return rcode_.rcode_;
}
template <class pcodeStruct> template <class pcodeStruct>
void ObRpcProxy::AsyncCB<pcodeStruct>::check_request_rt(const bool force_print) void ObRpcProxy::AsyncCB<pcodeStruct>::check_request_rt(const bool force_print)
{ {
......
...@@ -629,22 +629,6 @@ int ObVirtualRpcProtocolProcessor::decode_net_rpc_packet( ...@@ -629,22 +629,6 @@ int ObVirtualRpcProtocolProcessor::decode_net_rpc_packet(
// decode packet header fail // decode packet header fail
pkt = NULL; pkt = NULL;
LOG_WARN("decode packet fail, close connection", K(ret)); LOG_WARN("decode packet fail, close connection", K(ret));
} else if (INVALID_CLUSTER_ID != ObRpcNetHandler::CLUSTER_ID && INVALID_CLUSTER_ID != pkt->get_dst_cluster_id() &&
ObRpcNetHandler::CLUSTER_ID != pkt->get_dst_cluster_id()) {
// The verification is turned on locally and does not match the received pkt dst_cluster_id
ret = OB_PACKET_CLUSTER_ID_NOT_MATCH;
if (REACH_TIME_INTERVAL(500 * 1000)) {
LOG_WARN("packet dst_cluster_id not match",
K(ret),
"self.dst_cluster_id",
ObRpcNetHandler::CLUSTER_ID,
"pkt.dst_cluster_id",
pkt->get_dst_cluster_id(),
"pkt",
*pkt);
}
pkt = NULL; // If the verification fails, you need to set the packet to NULL, so that io/easy_connection.c can
// be sensed
} }
timeguard.click(); timeguard.click();
} }
...@@ -719,24 +703,6 @@ int ObVirtualRpcProtocolProcessor::decode_raw_net_rpc_packet( ...@@ -719,24 +703,6 @@ int ObVirtualRpcProtocolProcessor::decode_raw_net_rpc_packet(
if (OB_FAIL(pkt->decode(pbuf, plen))) { if (OB_FAIL(pkt->decode(pbuf, plen))) {
// decode packet header fail // decode packet header fail
LOG_WARN("decode packet fail, close connection", K(ret), "ms status", ms->status); LOG_WARN("decode packet fail, close connection", K(ret), "ms status", ms->status);
} else if (INVALID_CLUSTER_ID != ObRpcNetHandler::CLUSTER_ID &&
INVALID_CLUSTER_ID != pkt->get_dst_cluster_id() &&
ObRpcNetHandler::CLUSTER_ID != pkt->get_dst_cluster_id()) {
// The verification is turned on locally and does not match the received pkt dst_cluster_id
ms->status = EASY_ERROR;
ret = OB_PACKET_CLUSTER_ID_NOT_MATCH;
if (REACH_TIME_INTERVAL(500 * 1000)) {
LOG_WARN("packet dst_cluster_id not match",
K(ret),
"self.dst_cluster_id",
ObRpcNetHandler::CLUSTER_ID,
"pkt.dst_cluster_id",
pkt->get_dst_cluster_id(),
"pkt",
*pkt);
}
pkt = NULL; // If the verification fails, you need to set the packet to NULL, so that
// io/easy_connection.c can be sensed
} }
in_data += full_demanded_len; in_data += full_demanded_len;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册