提交 fb85dd28 编写于 作者: O obdev 提交者: wangzelin.wzl

Reply committed_info for fetch log request.

上级 c118ed16
......@@ -937,6 +937,31 @@ int LogEngine::submit_learner_keepalive_resp(const common::ObAddr &server,
return ret;
}
int LogEngine::submit_committed_info_req(
const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t prev_log_id,
const int64_t &prev_log_proposal_id,
const LSN &committed_end_lsn)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogEngine not init", K(ret), KPC(this));
} else if (OB_FAIL(log_net_service_.submit_committed_info_req(
server, msg_proposal_id,
prev_log_id, prev_log_proposal_id, committed_end_lsn))) {
PALF_LOG(ERROR, "LogNetService submit_committed_info_req failed", K(ret),
KPC(this), K(server),
K(prev_log_id), K(prev_log_proposal_id), K(committed_end_lsn));
} else {
PALF_LOG(TRACE, "submit_committed_info_req success", K(ret), KPC(this),
K(server), K(msg_proposal_id), K(prev_log_id),
K(prev_log_proposal_id), K(committed_end_lsn));
}
return ret;
}
LogMeta LogEngine::get_log_meta() const
{
ObSpinLockGuard guard(log_meta_lock_);
......
......@@ -320,7 +320,12 @@ public:
int submit_notify_rebuild_req(const ObAddr &server,
const LSN &base_lsn,
const LogInfo &base_prev_log_info);
int submit_committed_info_req(
const ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t prev_log_id,
const int64_t &prev_log_proposal_id,
const LSN &committed_end_lsn);
// @brief: this function used to send committed_info to child replica
// @param[in] member_list: current paxos member list
// @param[in] msg_proposal_id: the current proposal_id
......
......@@ -94,6 +94,25 @@ int LogNetService::submit_push_log_req(
return ret;
}
int LogNetService::submit_committed_info_req(
const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t prev_log_id,
const int64_t &prev_log_proposal_id,
const LSN &committed_end_lsn)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogNetService has not inited!!!", K(ret));
} else {
CommittedInfo committed_info_req(msg_proposal_id, prev_log_id,
prev_log_proposal_id, committed_end_lsn);
ret = post_request_to_server_(server, committed_info_req);
}
return ret;
}
int LogNetService::submit_push_log_resp(
const ObAddr &server,
const int64_t &msg_proposal_id,
......
......@@ -188,7 +188,11 @@ public:
int submit_retire_child_req(const common::ObAddr &server, const LogLearner &parent_itself);
int submit_learner_keepalive_req(const common::ObAddr &server, const LogLearner &sender_itself);
int submit_learner_keepalive_resp(const common::ObAddr &server, const LogLearner &sender_itself);
int submit_committed_info_req(const common::ObAddr &server,
const int64_t &msg_proposal_id,
const int64_t prev_log_id,
const int64_t &prev_log_proposal_id,
const LSN &committed_end_lsn);
template<class List>
int submit_committed_info_req(
const List &member_list,
......
......@@ -3273,7 +3273,60 @@ int LogSlidingWindow::try_update_match_lsn_map_(const common::ObAddr &server, co
return ret;
}
int LogSlidingWindow::leader_broadcast_committed_info_(const LSN &committed_end_lsn)
int LogSlidingWindow::try_send_committed_info(const common::ObAddr &server,
const LSN &log_lsn,
const LSN &log_end_lsn,
const int64_t &log_proposal_id)
{
int ret = OB_SUCCESS;
LSN committed_end_lsn;
get_committed_end_lsn_(committed_end_lsn);
const int64_t curr_proposal_id = state_mgr_->get_proposal_id();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!log_lsn.is_valid() || !log_end_lsn.is_valid() || INVALID_PROPOSAL_ID == log_proposal_id) {
ret = OB_INVALID_ARGUMENT;
} else if (state_mgr_->is_leader_active()) {
// Leader
int64_t last_log_id = OB_INVALID_LOG_ID;
int64_t last_log_proposal_id = INVALID_PROPOSAL_ID;
if (OB_FAIL(leader_get_committed_log_info_(committed_end_lsn, last_log_id, last_log_proposal_id))
|| OB_INVALID_LOG_ID == last_log_id) {
// no need send committed_info
} else if (OB_FAIL(log_engine_->submit_committed_info_req(server, curr_proposal_id,
last_log_id, log_proposal_id, committed_end_lsn))) {
PALF_LOG(WARN, "submit_committed_info_req failed", K(ret), K_(palf_id), K_(self), K(server));
}
} else {
// Follower
int64_t last_slide_log_id = OB_INVALID_LOG_ID;
int64_t last_slide_log_ts = OB_INVALID_TIMESTAMP;
LSN last_slide_lsn;
LSN last_slide_end_lsn;
int64_t last_slide_log_pid = INVALID_PROPOSAL_ID;
int64_t last_slide_accum_checksum = -1;
get_last_slide_log_info_(last_slide_log_id, last_slide_log_ts, last_slide_lsn, \
last_slide_end_lsn, last_slide_log_pid, last_slide_accum_checksum);
if (log_lsn == last_slide_lsn
&& log_proposal_id == last_slide_log_pid
&& committed_end_lsn == log_end_lsn) {
// If arg log does match with last slide log, follower can send committed_info to server.
OB_ASSERT(log_end_lsn == last_slide_end_lsn);
if (OB_FAIL(log_engine_->submit_committed_info_req(server, curr_proposal_id,
last_slide_log_id, log_proposal_id, committed_end_lsn))) {
PALF_LOG(WARN, "submit_committed_info_req failed", K(ret), K_(palf_id), K_(self), K(server));
} else {
PALF_LOG(TRACE, "follower try_send_committed_info success", K(ret), K_(palf_id), K_(self),
K(last_slide_log_id), K(log_proposal_id), K(committed_end_lsn));
}
}
}
return ret;
}
int LogSlidingWindow::leader_get_committed_log_info_(const LSN &committed_end_lsn,
int64_t &log_id,
int64_t &log_proposal_id)
{
int ret = OB_SUCCESS;
const int64_t max_log_id = get_max_log_id();
......@@ -3289,25 +3342,38 @@ int LogSlidingWindow::leader_broadcast_committed_info_(const LSN &committed_end_
// log_task is invalid or not freezed, that means there is maybe new log after committed_end_lsn.
// No need broadcast commonitted_info.
} else {
int64_t log_proposal_id = INVALID_PROPOSAL_ID;
LSN log_end_lsn;
log_task->lock();
log_proposal_id = log_task->get_proposal_id();
log_end_lsn = log_task->get_begin_lsn() + LogGroupEntryHeader::HEADER_SER_SIZE + log_task->get_data_len();
log_task->unlock();
if (log_end_lsn == committed_end_lsn) {
ObMemberList dst_member_list;
const int64_t curr_proposal_id = state_mgr_->get_proposal_id();
if (OB_FAIL(mm_->get_curr_member_list(dst_member_list))) {
PALF_LOG(WARN, "get_curr_member_list failed", K(ret), K_(palf_id), K_(self));
} else if (OB_FAIL(dst_member_list.remove_server(self_))) {
PALF_LOG(WARN, "dst_member_list remove_server failed", K(ret), K_(palf_id), K_(self));
} else if (dst_member_list.is_valid()
&& OB_FAIL(log_engine_->submit_committed_info_req(dst_member_list, curr_proposal_id,
max_log_id, log_proposal_id, committed_end_lsn))) {
} else {}
log_id = max_log_id;
}
PALF_LOG(TRACE, "leader_broadcast_committed_info_", K(ret), K_(palf_id), K_(self), K(max_log_id));
}
return ret;
}
int LogSlidingWindow::leader_broadcast_committed_info_(const LSN &committed_end_lsn)
{
int ret = OB_SUCCESS;
const int64_t curr_proposal_id = state_mgr_->get_proposal_id();
int64_t log_id = OB_INVALID_LOG_ID;
int64_t log_proposal_id = INVALID_PROPOSAL_ID;
ObMemberList dst_member_list;
if (OB_FAIL(leader_get_committed_log_info_(committed_end_lsn, log_id, log_proposal_id))
|| OB_INVALID_LOG_ID == log_id) {
// no need send committed_info
} else if (OB_FAIL(mm_->get_curr_member_list(dst_member_list))) {
PALF_LOG(WARN, "get_curr_member_list failed", K(ret), K_(palf_id), K_(self));
} else if (OB_FAIL(dst_member_list.remove_server(self_))) {
PALF_LOG(WARN, "dst_member_list remove_server failed", K(ret), K_(palf_id), K_(self));
} else if (dst_member_list.is_valid()
&& OB_FAIL(log_engine_->submit_committed_info_req(dst_member_list, curr_proposal_id,
log_id, log_proposal_id, committed_end_lsn))) {
PALF_LOG(WARN, "submit_committed_info_req failed", K(ret), K_(palf_id), K_(self), K(log_id));
} else {
PALF_LOG(TRACE, "leader_broadcast_committed_info_", K(ret), K_(palf_id), K_(self), K(log_id));
}
return ret;
}
......
......@@ -190,6 +190,10 @@ public:
virtual int set_location_cache_cb(PalfLocationCacheCb *lc_cb);
virtual int reset_location_cache_cb();
virtual int advance_reuse_lsn(const LSN &flush_log_end_lsn);
virtual int try_send_committed_info(const common::ObAddr &server,
const LSN &log_lsn,
const LSN &log_end_lsn,
const int64_t &log_proposal_id);
TO_STRING_KV(K_(palf_id), K_(self), K_(lsn_allocator), K_(group_buffer), \
K_(last_submit_lsn), K_(last_submit_end_lsn), K_(last_submit_log_id), K_(last_submit_log_pid), \
K_(max_flushed_lsn), K_(max_flushed_end_lsn), K_(max_flushed_log_pid), K_(committed_end_lsn), \
......@@ -303,6 +307,9 @@ private:
const char *buf,
const int64_t buf_len,
int64_t &min_log_ts_ns);
int leader_get_committed_log_info_(const LSN &committed_end_lsn,
int64_t &log_id,
int64_t &log_proposal_id);
int leader_broadcast_committed_info_(const LSN &committed_end_lsn);
int submit_push_log_resp_(const common::ObAddr &server, const int64_t &msg_proposal_id, const LSN &lsn);
inline int try_push_log_to_paxos_follower_(const int64_t curr_proposal_id,
......
......@@ -2760,13 +2760,36 @@ int PalfHandleImpl::fetch_log_from_storage(const common::ObAddr &server,
return ret;
}
int PalfHandleImpl::try_send_committed_info_(const ObAddr &server,
const LSN &log_lsn,
const LSN &log_end_lsn,
const int64_t &log_proposal_id)
{
int ret = OB_SUCCESS;
AccessMode access_mode;
if (!log_lsn.is_valid() || !log_end_lsn.is_valid() || INVALID_PROPOSAL_ID == log_proposal_id) {
ret = OB_INVALID_ARGUMENT;
} else if (OB_FAIL(mode_mgr_.get_access_mode(access_mode))) {
PALF_LOG(WARN, "get_access_mode failed", K(ret), KPC(this));
} else if (AccessMode::APPEND == access_mode) {
// No need send committed_info in APPEND mode, because leader will genenrate keeapAlive log periodically.
} else if (OB_FAIL(sw_.try_send_committed_info(server, log_lsn, log_end_lsn, log_proposal_id))) {
PALF_LOG(TRACE, "try_send_committed_info failed", K(ret), K_(palf_id), K_(self),
K(server), K(log_lsn), K(log_end_lsn), K(log_proposal_id));
} else {
PALF_LOG(TRACE, "try_send_committed_info_ success", K(ret), K_(palf_id), K_(self), K(server),
K(log_lsn), K(log_end_lsn), K(log_proposal_id));
}
return ret;
}
int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
const FetchLogType fetch_type,
const int64_t &msg_proposal_id,
const LSN &prev_lsn,
const LSN &fetch_start_lsn,
const int64_t fetch_log_size,
const int64_t fetch_log_count)
const FetchLogType fetch_type,
const int64_t &msg_proposal_id,
const LSN &prev_lsn,
const LSN &fetch_start_lsn,
const int64_t fetch_log_size,
const int64_t fetch_log_count)
{
int ret = OB_SUCCESS;
PalfGroupBufferIterator iterator;
......@@ -2822,6 +2845,7 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
bool is_reach_end = false;
int64_t fetched_count = 0;
LSN curr_log_end_lsn = curr_lsn + curr_group_entry.get_group_entry_size();
LSN prev_log_end_lsn;
int64_t prev_log_proposal_id = prev_log_info.log_proposal_id_;
while (OB_SUCC(ret) && !is_reach_size_limit && !is_reach_count_limit && !is_reach_end
&& OB_SUCC(iterator.next())) {
......@@ -2853,12 +2877,18 @@ int PalfHandleImpl::fetch_log_from_storage_(const common::ObAddr &server,
K(prev_log_proposal_id), K(fetch_end_lsn), K(curr_log_end_lsn), K(is_reach_size_limit),
K(fetch_log_size), K(fetched_count), K(is_reach_count_limit));
each_round_prev_lsn = curr_lsn;
prev_log_end_lsn = curr_log_end_lsn;
prev_log_proposal_id = curr_group_entry.get_header().get_log_proposal_id();
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
// try send committed_info to server
if (OB_SUCC(ret)) {
RLockGuard guard(lock_);
(void) try_send_committed_info_(server, each_round_prev_lsn, prev_log_end_lsn, prev_log_proposal_id);
}
}
if (OB_FAIL(ret) && OB_ERR_OUT_OF_LOWER_BOUND == ret) {
......
......@@ -867,6 +867,10 @@ private:
int construct_palf_base_info_(const LSN &max_committed_lsn,
PalfBaseInfo &palf_base_info);
int append_disk_log_to_sw_(const LSN &start_lsn);
int try_send_committed_info_(const common::ObAddr &server,
const LSN &log_lsn,
const LSN &log_end_lsn,
const int64_t &log_proposal_id);
int fetch_log_from_storage_(const common::ObAddr &server,
const FetchLogType fetch_type,
const int64_t &msg_proposal_id,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册