提交 2ebaaf7b 编写于 作者: O obdev 提交者: ob-robot

fix dtl first buffer not first error

上级 8b26e8db
......@@ -392,7 +392,8 @@ int ObDtlBasicChannel::mock_eof_buffer(int64_t timeout_ts)
return ret;
}
int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached)
int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached,
bool inc_recv_buf_cnt)
{
int ret = OB_SUCCESS;
ObDtlMsgHeader header;
......@@ -405,7 +406,9 @@ int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_b
LOG_WARN("failed to deserialize msg header", K(ret));
} else if (header.is_drain()) {
alloc_buffer_count();
inc_recv_buffer_cnt();
if (inc_recv_buf_cnt) {
inc_recv_buffer_cnt();
}
dfc_->set_drain(this);
LOG_TRACE("transmit receive drain cmd", KP(linked_buffer), K(this), KP(id_), KP(peer_id_),
K(recv_list_.is_empty()));
......@@ -424,7 +427,9 @@ int ObDtlBasicChannel::attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_b
linked_buffer = nullptr;
// 将attach收到的是自己申请的,因为释放权交给了当前channel,所以认为这次申请也是自己,与free保持一致,方便统计
alloc_buffer_count();
inc_recv_buffer_cnt();
if (inc_recv_buf_cnt) {
inc_recv_buffer_cnt();
}
if (is_data_msg) {
metric_.mark_first_in();
if (is_eof) {
......
......@@ -357,7 +357,8 @@ public:
virtual int send(const ObDtlMsg &msg, int64_t timeout_ts,
ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) override;
virtual int feedup(ObDtlLinkedBuffer *&buffer) override;
virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached = false);
virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_first_buffer_cached = false,
bool inc_recv_buf_cnt = true);
// don't call send&flush in different threads.
virtual int flush(bool wait=true, bool wait_response = true) override;
......
......@@ -108,7 +108,8 @@ public:
virtual int send(const ObDtlMsg &msg, int64_t timeout_ts,
ObEvalCtx *eval_ctx = nullptr, bool is_eof = false) = 0;
virtual int feedup(ObDtlLinkedBuffer *&buffer) = 0;
virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_firt_buffer_cached = false) = 0;
virtual int attach(ObDtlLinkedBuffer *&linked_buffer, bool is_firt_buffer_cached = false,
bool inc_recv_buf_cnt = true) = 0;
virtual int flush(bool wait=true, bool wait_response = true) = 0;
virtual bool is_empty() const = 0;
......
......@@ -176,7 +176,7 @@ ObDtlRpcChannel::ObDtlRpcChannel(
const uint64_t tenant_id,
const uint64_t id,
const ObAddr &peer)
: ObDtlBasicChannel(tenant_id, id, peer), recv_mock_eof_cnt_(0)
: ObDtlBasicChannel(tenant_id, id, peer)
{}
ObDtlRpcChannel::ObDtlRpcChannel(
......@@ -184,7 +184,7 @@ ObDtlRpcChannel::ObDtlRpcChannel(
const uint64_t id,
const ObAddr &peer,
const int64_t hash_val)
: ObDtlBasicChannel(tenant_id, id, peer, hash_val), recv_mock_eof_cnt_(0)
: ObDtlBasicChannel(tenant_id, id, peer, hash_val)
{}
ObDtlRpcChannel::~ObDtlRpcChannel()
......@@ -233,11 +233,10 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer *&buffer)
LOG_TRACE("DTL feedup a new msg to msg loop", K(buffer->size()), KP(id_), K(peer_));
ObDtlLinkedBuffer::assign(*buffer, linked_buffer);
if (1 == linked_buffer->seq_no() && linked_buffer->is_data_msg()
&& 0 != get_recv_buffer_cnt() &&
!(1 == get_recv_buffer_cnt() && 1 == recv_mock_eof_cnt_)) {
&& 0 != get_recv_buffer_cnt()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("first buffer is not first", K(ret), K(get_id()), K(get_peer_id()),
K(get_recv_buffer_cnt()), K(recv_mock_eof_cnt_));
K(get_recv_buffer_cnt()), K(linked_buffer->seq_no()));
free_buf(linked_buffer);
linked_buffer = nullptr;
} else if (OB_FAIL(block_on_increase_size(linked_buffer->size()))) {
......@@ -250,9 +249,6 @@ int ObDtlRpcChannel::feedup(ObDtlLinkedBuffer *&buffer)
linked_buffer = nullptr;
} else if (FALSE_IT(inc_recv_buffer_cnt())) {
} else {
if (1 == buffer->seq_no() && buffer->is_eof() && 0 == buffer->pos()) {
recv_mock_eof_cnt_++;
}
if (buffer->is_data_msg()) {
metric_.mark_first_in();
if (buffer->is_eof()) {
......
......@@ -101,8 +101,6 @@ public:
virtual int feedup(ObDtlLinkedBuffer *&buffer) override;
virtual int send_message(ObDtlLinkedBuffer *&buf);
private:
int64_t recv_mock_eof_cnt_;
};
} // dtl
......
......@@ -317,6 +317,8 @@ int ObFastInitSqcReportQCMessageCall::mock_sqc_finish_msg()
buffer->set_data_msg(false);
buffer->timeout_ts() = timeout_ts_;
buffer->set_msg_type(dtl::ObDtlMsgType::FINISH_SQC_RESULT);
const bool is_first_buffer_cached = false;
const bool inc_recv_buf_cnt = false;
if (OB_FAIL(common::serialization::encode(buf, size, pos, header))) {
LOG_WARN("fail to encode buffer", K(ret));
} else if (OB_FAIL(common::serialization::encode(buf, size, pos, finish_msg))) {
......@@ -324,7 +326,7 @@ int ObFastInitSqcReportQCMessageCall::mock_sqc_finish_msg()
} else if (FALSE_IT(buffer->size() = pos)) {
} else if (FALSE_IT(pos = 0)) {
} else if (FALSE_IT(buffer->tenant_id() = ch->get_tenant_id())) {
} else if (OB_FAIL(ch->attach(buffer))) {
} else if (OB_FAIL(ch->attach(buffer, is_first_buffer_cached, inc_recv_buf_cnt))) {
LOG_WARN("fail to feedup buffer", K(ret));
} else if (FALSE_IT(ch->free_buffer_count())) {
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册