election_proposer.cpp 33.7 KB
Newer Older
W
wangzelin.wzl 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/**
 * Copyright (c) 2021 OceanBase
 * OceanBase CE is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *          http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */


#include "logservice/palf/election/message/election_message.h"
O
obdev 已提交
15
#include "ob_role.h"
W
wangzelin.wzl 已提交
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278
#include "share/ob_occam_time_guard.h"
#include "election_proposer.h"
#include "common/ob_clock_generator.h"
#include "election_impl.h"
#include "lib/ob_errno.h"
#include "lib/string/ob_string_holder.h"
#include "logservice/palf/election/interface/election_msg_handler.h"
#include "logservice/palf/election/utils/election_common_define.h"
#include "logservice/palf/election/utils/election_member_list.h"
#include "logservice/palf/palf_callback.h"

namespace oceanbase
{
namespace palf
{
namespace election
{

struct ResponseChecker
{
  template <typename ResponseMsg>
  static bool check_ballot_and_restart_counter_valid_and_accepted(const ResponseMsg &msg,
                                                                  ElectionProposer *proposer,
                                                                  const LogPhase phase)
  {
    ELECT_TIME_GUARD(500_ms);
    #define PRINT_WRAPPER K(msg), K(*proposer)
    bool ret = false;
    // 1. 检查restart counter
    if (OB_UNLIKELY(msg.get_restart_counter() != proposer->restart_counter_)) {
      LOG_PHASE(INFO, phase, "receive old restart counter response");
      assert(msg.get_restart_counter() < proposer->restart_counter_);
    // 2. 检查ballot number
    } else if (OB_UNLIKELY(msg.get_ballot_number() < proposer->ballot_number_)) {
      LOG_PHASE(WARN, phase, "receive old ballot response");
    // 3. ballot number更大的时候更新ballot number,若自己仍然是leader,触发leader prepare
    } else if (msg.get_ballot_number() > proposer->ballot_number_) {
      assert(!msg.is_accepted());
      proposer->advance_ballot_number_and_reset_related_states_(msg.get_ballot_number(),
                                                                "receive reject message");
      if (proposer->check_leader()) {
        // 在check leader后可能卡住,做leader prepare时就已经不再是leader了
        // 但是没有关系,正确性是由prepare阶段保证的,check_leader的意义在于尽量避免无谓的leader prepare流程
        LOG_PHASE(WARN, phase, "leader message is rejected cause ballot number", K(msg.get_ballot_number()), K(proposer->prepare_success_ballot_));
        proposer->advance_ballot_number_and_reset_related_states_(proposer->ballot_number_ + 1,
                                                                  "retry leader prepare");
        proposer->prepare(ObRole::LEADER);
      }
    // 4. 检查消息是否因为其他原因被拒绝
    } else if (!msg.is_accepted()) {
      LOG_PHASE(WARN, phase, "request is rejected");
    } else {
      ret = true;
    }
    return ret;
    #undef PRINT_WRAPPER
  }
};

ElectionProposer::ElectionProposer(ElectionImpl *election)
:role_(ObRole::FOLLOWER),
ballot_number_(INVALID_VALUE),
prepare_success_ballot_(INVALID_VALUE),
switch_source_leader_ballot_(INVALID_VALUE),
restart_counter_(INVALID_VALUE),
p_election_(election),
last_do_prepare_ts_(INVALID_VALUE),
last_dump_proposer_info_ts_(INVALID_VALUE),
last_dump_election_msg_count_state_ts_(INVALID_VALUE),
record_lease_interval_(INVALID_VALUE) {}

int ElectionProposer::init(const int64_t restart_counter)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER K(*this)
  int ret = OB_SUCCESS;
  if (CLICK_FAIL(memberlist_with_states_.init())) {
    LOG_INIT(ERROR, "init memberlist with states failed");
  } else {
    restart_counter_ = restart_counter;
  }
  return ret;
  #undef PRINT_WRAPPER
}

int ElectionProposer::set_member_list(const MemberList &new_member_list)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER K(*this), K(new_member_list)
  int ret = OB_SUCCESS;
  // 检查旧的成员组的信息是否一致
  const MemberList &current_member_list = memberlist_with_states_.get_member_list();
  if (current_member_list.is_valid()) {
    if (new_member_list.get_membership_version() < current_member_list.get_membership_version()) {
      ret = OB_INVALID_ARGUMENT;
      LOG_SET_MEMBER(ERROR, "new memberlsit's membership version is not greater than current");
    } else if (check_leader()) {
      if (!memberlist_with_states_.is_synced_with_majority()) {
        ret = OB_OP_NOT_ALLOW;
        LOG_SET_MEMBER(WARN, "current membership version is not sync with majority yet, change member list not allowed now");
      }
    }
  }
  if (OB_SUCC(ret)) {
    MemberList old_list = memberlist_with_states_.get_member_list();
    if (CLICK_FAIL(memberlist_with_states_.set_member_list(new_member_list))) {
      LOG_SET_MEMBER(WARN, "set new member list failed");
    } else {
      if (old_list.get_addr_list().empty() && new_member_list.get_addr_list().count() == 1) {// 单副本第一次设置成员列表
        prepare(ObRole::FOLLOWER);
      }
      if (old_list.only_membership_version_different(new_member_list)) {
        LOG_SET_MEMBER(INFO, "advance membership version");
      } else {
        LOG_SET_MEMBER(INFO, "member list is changed");
        p_election_->event_recorder_.report_member_list_changed_event(old_list, memberlist_with_states_.get_member_list());
      }
    }
  }
  return ret;
  #undef PRINT_WRAPPER
}

int ElectionProposer::change_leader_to(const ObAddr &dest_addr)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER K(*this), K(dest_addr), K(redirect_addr)
  int ret = OB_SUCCESS;
  int idx = 0;
  ObAddr redirect_addr = dest_addr;
  const ObArray<ObAddr> &addr_list = memberlist_with_states_.get_member_list().get_addr_list();
  for (; idx < addr_list.count(); ++idx) {
    if (dest_addr == addr_list[idx]) {
      break;
    }
  }
  if (OB_UNLIKELY(idx == addr_list.count())) {
    LOG_CHANGE_LEADER(WARN, "the dest addr is not in member list, change leader to myself");
    redirect_addr = p_election_->get_self_addr();
  }
  if (OB_UNLIKELY(!check_leader())) {
    ret = OB_NOT_MASTER;
    LOG_CHANGE_LEADER(WARN, "follower cannot do change leader");
  } else {
    inner_change_leader_to(redirect_addr);
    LOG_CHANGE_LEADER(INFO, "direct change leader", K(lbt()));
  }
  return ret;
  #undef PRINT_WRAPPER
}

bool ElectionProposer::leader_revoke_if_lease_expired_(RoleChangeReason reason)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER K(*this)
  bool ret_bool = false;
  if (role_ == ObRole::LEADER && !check_leader()) {
    (p_election_->role_change_cb_)(p_election_, ObRole::LEADER, ObRole::FOLLOWER, reason);
    role_ = ObRole::FOLLOWER;
    memberlist_with_states_.clear_prepare_and_accept_states();
    leader_lease_and_epoch_.reset();
    ret_bool = true;
  }
  return ret_bool;
  #undef PRINT_WRAPPER
}

bool ElectionProposer::leader_takeover_if_lease_valid_(RoleChangeReason reason)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER K(*this)
  bool ret_bool = false;
  int ret = common::OB_SUCCESS;
  if (role_ != ObRole::LEADER && check_leader()) {
    (p_election_->role_change_cb_)(p_election_, ObRole::FOLLOWER, ObRole::LEADER, reason);
    role_ = ObRole::LEADER;
    propose();
    ret_bool = true;
  }
  return ret_bool;
  #undef PRINT_WRAPPER
}

void ElectionProposer::
     advance_ballot_number_and_reset_related_states_(const int64_t new_ballot_number,
                                                     const char *reason)
{
  ELECT_TIME_GUARD(500_ms);
  assert(new_ballot_number >= ballot_number_);
  ELECT_LOG(INFO, "advance ballot number", K(new_ballot_number), K(reason), K(*this));
  ballot_number_ = new_ballot_number;
  memberlist_with_states_.clear_prepare_and_accept_states();
  // 如果在切主过程中推大ballot number需要清理切主流程中的相关状态
  switch_source_leader_ballot_ = INVALID_VALUE;
  switch_source_leader_addr_.reset();
}

int ElectionProposer::register_renew_lease_task_()
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(*this)
  int ret = OB_SUCCESS;
  // 如果续约不够快很多case不能及时切主,所以当MAX_TST设置的比较大时,也让续约间隔设置的短一些
  if (CLICK_FAIL(p_election_->timer_->schedule_task_repeat(renew_lease_task_handle_,
                                                           std::min(int64_t(500_ms), CALCULATE_RENEW_LEASE_INTERVAL()),
                                                           [this]() {
    int ret = OB_SUCCESS;
    LockGuard lock_guard(p_election_->lock_);
    // 周期性打印选举的状态
    if (ObClockGenerator::getCurrentTime() > last_dump_proposer_info_ts_ + 3_s) {
      last_dump_proposer_info_ts_ = ObClockGenerator::getCurrentTime();
      ELECT_LOG(INFO, "dump proposer info", K(*this));
    }
    // 周期性打印选举的消息收发统计信息
    if (ObClockGenerator::getCurrentTime() > last_dump_election_msg_count_state_ts_ + 10_s) {
      last_dump_election_msg_count_state_ts_ = ObClockGenerator::getCurrentTime();
      char ls_id_buffer[32] = {0};
      auto pretend_to_be_ls_id = [ls_id_buffer](const int64_t id) mutable {
        int64_t pos = 0;
        databuff_printf(ls_id_buffer, 32, pos, "{id:%ld}", id);
        return ls_id_buffer;
      };// 在日志打印时与ls_id的格式保持一致
      ELECT_LOG(INFO, "dump message count", "ls_id", pretend_to_be_ls_id(p_election_->id_), "self_addr", p_election_->self_addr_, "state", p_election_->msg_counter_);
    }
    // 判断是否可以进行续约
    if (!p_election_->is_running_) {
      LOG_RENEW_LEASE(WARN, "election is stopped, this renew lease task should be stopped");
    } else if (role_ == ObRole::FOLLOWER) {
      // 在FOLLOWER上仍然需要执行该定时任务,以周期性打印选举状态
    } else if (role_ != ObRole::LEADER) {
      LOG_RENEW_LEASE(ERROR, "unexpected role status");
    } else if (OB_UNLIKELY(leader_revoke_if_lease_expired_(RoleChangeReason::LeaseExpiredToRevoke))) {
      LOG_RENEW_LEASE(ERROR, "leader lease expired, leader revoked");
    } else if (prepare_success_ballot_ != ballot_number_) {// 需要进行leader prepare推大用于续约的ballot number
      LOG_RENEW_LEASE(INFO, "prepare_success_ballot_ not same as ballot_number_, maybe in Leader Prepare phase, gie up renew lease this time");
    } else {
      propose();
    }
    return false;
  }))) {
    LOG_INIT(ERROR, "regist renew lease task failed");
  }
  return ret;
  #undef PRINT_WRAPPER
}

// 这个接口是在外部未加锁的状态下调用的
int ElectionProposer::reschedule_or_register_prepare_task_after_(const int64_t delay_us)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(delay_us), K(*this)
  int ret = OB_SUCCESS;
  CLICK();
  if (delay_us < 0) {
    ret = OB_INVALID_ARGUMENT;
    LOG_NONE(ERROR, "invalid argument");
  } else if (devote_task_handle_.is_running()) {
    if (CLICK_FAIL(devote_task_handle_.reschedule_after(delay_us))) {
      LOG_RENEW_LEASE(ERROR, "reschedule devote task failed");
    }
  } else if (CLICK_FAIL(p_election_->timer_->schedule_task_repeat_spcifiy_first_delay(devote_task_handle_,
                                                                                      delay_us,
                                                                                      CALCULATE_MAX_ELECT_COST_TIME(),
O
obdev 已提交
279 280
                                                                                      [this, delay_us]() {
    int ret = OB_SUCCESS;
W
wangzelin.wzl 已提交
281
    LockGuard lock_guard(p_election_->lock_);
O
obdev 已提交
282 283 284 285 286 287 288 289
    if (check_leader()) {// Leader不应该靠定时任务主动做Prepare,只能被动触发Prepare
      LOG_RENEW_LEASE(INFO, "leader not allow do prepare in timer task before lease expired, this log may printed when message delay too large");
    } else {
      if (role_ == ObRole::LEADER) {
        role_ = ObRole::FOLLOWER;
      }
      this->prepare(role_);// 只有Follower可以走到这里
    }
W
wangzelin.wzl 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
    return false;
  }))) {
    LOG_INIT(ERROR, "first time register devote task failed");
  }
  LOG_NONE(DEBUG, "call reschedule_or_register_prepare_task_after_");
  return ret;
  #undef PRINT_WRAPPER
}

int ElectionProposer::start()
{
  #define PRINT_WRAPPER K(*this)
  ELECT_TIME_GUARD(500_ms);
  int ret = OB_SUCCESS;
  if (CLICK_FAIL(reschedule_or_register_prepare_task_after_(3_s))) {
    LOG_INIT(ERROR, "first time register devote task failed");
  } else if (CLICK_FAIL(register_renew_lease_task_())) {
    LOG_INIT(ERROR, "first time register renew lease task failed");
  }
  return ret;
  #undef PRINT_WRAPPER
}

void ElectionProposer::stop()
{
  #define PRINT_WRAPPER K(*this)
  ELECT_TIME_GUARD(3_s);
  devote_task_handle_.stop_and_wait();
  renew_lease_task_handle_.stop_and_wait();
  LockGuard lock_guard(p_election_->lock_);
  leader_lease_and_epoch_.reset();
  if (leader_revoke_if_lease_expired_(RoleChangeReason::StopToRevoke)) {
    LOG_DESTROY(INFO, "leader revoke because election is stopped");
  }
   #undef PRINT_WRAPPER
}

void ElectionProposer::prepare(const ObRole role)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(role), K(*this)
  int ret = OB_SUCCESS;
  int64_t cur_ts = ObClockGenerator::getCurrentTime();
  LogPhase phase = role == ObRole::LEADER ? LogPhase::RENEW_LEASE : LogPhase::ELECT_LEADER;
  if (memberlist_with_states_.get_member_list().get_addr_list().empty()) {
    LOG_PHASE(INFO, phase, "memberlist is empty, give up do prepare this time");
  } else if (role == ObRole::FOLLOWER && cur_ts - last_do_prepare_ts_ < CALCULATE_MAX_ELECT_COST_TIME() / 2) {// 若这是一个一乎动作,且距离上一次一呼百应的时间点过近,该次一乎调度无效
    LOG_PHASE(INFO, phase, "the prepare action just happened, need wait next time");
  } else {
    last_do_prepare_ts_ = cur_ts;
    // 1. Leader prepare不推ballot number,Follower prepare需要推大自己的ballot number再进行
    if (role == ObRole::FOLLOWER) {
      (void) advance_ballot_number_and_reset_related_states_(ballot_number_ + 1, "do prepare");
      LOG_PHASE(INFO, phase, "do prepare");
    } else if (role == ObRole::LEADER) {
      LOG_PHASE(INFO, phase, "do leader prepare");
    } else {
      LOG_PHASE(ERROR, phase, "unexpected code path");
      abort();
    }
    // 2. 获取本地的优先级
    ElectionPrepareRequestMsg prepare_req(p_election_->id_,
                                          p_election_->get_self_addr(),
                                          restart_counter_,
                                          ballot_number_,
                                          p_election_->get_membership_version_());
    (void) p_election_->refresh_priority_();
    if (CLICK_FAIL(prepare_req.set(p_election_->get_priority_(),
                                role))) {
      LOG_PHASE(ERROR, phase, "create prepare request failed");
    // 3. 广播消息
    } else if (!is_self_in_memberlist_()) {
      LOG_PHASE(INFO, phase, "self is not in memberlist, give up do prepare");
    } else if (CLICK_FAIL(p_election_->broadcast_(prepare_req,
                                              memberlist_with_states_.
                                              get_member_list().
                                              get_addr_list()))) {
      LOG_PHASE(ERROR, phase, "broadcast prepare request failed");
    } else {
      LOG_PHASE(INFO, phase, "broadcast prepare request");
    }
  }
  #undef PRINT_WRAPPER
}

void ElectionProposer::on_prepare_request(const ElectionPrepareRequestMsg &prepare_req,
                                          bool *need_register_devote_task)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(prepare_req), K(*this)
  int ret = OB_SUCCESS;
O
obdev 已提交
381 382 383 384 385 386 387 388
  // 0. 拒绝旧消息、过滤本轮次消息、根据新消息推大轮次
  if (prepare_req.get_ballot_number() <= ballot_number_) {
    if (prepare_req.get_ballot_number() < ballot_number_) {// 对于旧消息发送拒绝响应
      ElectionPrepareResponseMsg prepare_res_reject(p_election_->get_self_addr(),
                                                    prepare_req);
      prepare_res_reject.set_rejected(ballot_number_);
      if (CLICK_FAIL(p_election_->send_(prepare_res_reject))) {
        LOG_ELECT_LEADER(ERROR, "create prepare request failed");
W
wangzelin.wzl 已提交
389
      } else {
O
obdev 已提交
390
        LOG_ELECT_LEADER(INFO, "send reject response cause prepare message ballot too small");
W
wangzelin.wzl 已提交
391
      }
O
obdev 已提交
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
    } else {// 对于本轮次消息,需要过滤,否则无限循环
      LOG_ELECT_LEADER(INFO, "has been send prepare request in this ballot, give up this time");
    }
  } else {// 对于新的消息,推大本机选举轮次
    LOG_ELECT_LEADER(INFO, "receive bigger ballot prepare request");
    (void) advance_ballot_number_and_reset_related_states_(prepare_req.get_ballot_number(),
                                                           "receive bigger ballot prepare request");
      // 1. 忽略leader prepare消息,不触发一呼百应
    if (static_cast<ObRole>(prepare_req.get_role()) == ObRole::LEADER) {
      LOG_ELECT_LEADER(INFO, "proposer ignore leader prepare");
    } else if (static_cast<ObRole>(prepare_req.get_role()) != ObRole::FOLLOWER) {
      // 非candidate prepare是非预期的
      LOG_ELECT_LEADER(ERROR, "unexpected code path");
    // 2. 尝试一呼百应
    } else if (memberlist_with_states_.get_member_list().get_addr_list().empty()) {
      LOG_ELECT_LEADER(INFO, "memberlist is empty, give up do prepare this time");
W
wangzelin.wzl 已提交
408 409 410 411 412 413 414 415 416 417 418 419 420 421
    } else {
      (void) p_election_->refresh_priority_();
      ElectionPrepareRequestMsg prepare_followed_req(p_election_->id_,
                                                     p_election_->get_self_addr(),
                                                     restart_counter_,
                                                     ballot_number_,
                                                     p_election_->get_membership_version_());
      if (CLICK_FAIL(prepare_followed_req.set(p_election_->get_priority_(),
                                              role_))) {
        LOG_ELECT_LEADER(ERROR, "create prepare request failed");
      } else if (!is_self_in_memberlist_()) {
        LOG_ELECT_LEADER(INFO, "self is not in memberlist, give up do prepare");
      } else if (CLICK_FAIL(p_election_->broadcast_(prepare_followed_req,
                                                    memberlist_with_states_.get_member_list()
O
obdev 已提交
422
                                                                            .get_addr_list()))) {
W
wangzelin.wzl 已提交

        LOG_ELECT_LEADER(ERROR, "broadcast prepare request failed");
      } else {
        last_do_prepare_ts_ = ObClockGenerator::getCurrentTime();
        if (role_ == ObRole::LEADER) {
          LOG_ELECT_LEADER(INFO, "join elect leader phase as leader");
        } else if (role_ == ObRole::FOLLOWER) {
          LOG_ELECT_LEADER(INFO, "join elect leader phase as follower");
        }
        *need_register_devote_task = true;
      }
    }
  }
  #undef PRINT_WRAPPER
}

void ElectionProposer::on_prepare_response(const ElectionPrepareResponseMsg &prepare_res)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(prepare_res), K(*this)
  int ret = OB_SUCCESS;
  LogPhase phase = role_ == ObRole::LEADER ? LogPhase::RENEW_LEASE : LogPhase::ELECT_LEADER;
  LOG_PHASE(INFO, phase, "handle prepare response");
  const Lease &res_lease = prepare_res.get_lease();
  // 1. 检查消息的有效性
  if (!ResponseChecker::check_ballot_and_restart_counter_valid_and_accepted(prepare_res, this, phase)) {
    LOG_PHASE(WARN, phase, "receive invalid or rejected response");
  // 2. 检查是否具备进入accept阶段的条件
  } else if (!(res_lease.is_empty() ||// 1. 对方目前没有lease
              res_lease.get_owner() == p_election_->get_self_addr() ||// 2. lease是本proposer发出的
              (res_lease.get_owner() == switch_source_leader_addr_ &&// 3. 对方的lease是切主源端的
              res_lease.get_ballot_number() == switch_source_leader_ballot_))) {// 处于旧主的ballot内
    LOG_PHASE(INFO, phase, "peer lease still valid, not count");
  // 3. 记录对方的应答,计入多数派中
  } else if (CLICK_FAIL(memberlist_with_states_.record_prepare_ok(prepare_res))) {
    if (OB_ENTRY_NOT_EXIST == ret) {
      LOG_PHASE(WARN, phase, "peer maybe not in memberlist anymore");
    } else if (OB_ELECTION_BELOW_MAJORITY == ret) {
      LOG_PHASE(INFO, phase, "response not reach majority, waiting more...");
    } else if (OB_ELECTION_OVER_MAJORITY == ret) {
      LOG_PHASE(INFO, phase, "has been send accept message this ballot");
    } else {
      LOG_PHASE(ERROR, phase, "unexpected error code");
    }
  // 4. 若应答刚好达到多数派,进入accept阶段
  } else {
    prepare_success_ballot_ = max(ballot_number_, prepare_success_ballot_);
    propose();
    LOG_PHASE(INFO, phase, "do propose");
  }
  #undef PRINT_WRAPPER
}

void ElectionProposer::propose()
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(*this)
  int ret = OB_SUCCESS;
  int64_t current_ts = get_monotonic_ts();
  int64_t new_lease_interval = CALCULATE_LEASE_INTERVAL();
  LogPhase phase = role_ == ObRole::LEADER ? LogPhase::RENEW_LEASE : LogPhase::ELECT_LEADER;
  if (new_lease_interval != record_lease_interval_) {
    LOG_PHASE(INFO, phase, "lease interval changed", K(record_lease_interval_), K(new_lease_interval));
    record_lease_interval_ = new_lease_interval;
  }
  ElectionAcceptRequestMsg accept_req(p_election_->id_,
                                      p_election_->get_self_addr(),
                                      restart_counter_,
                                      prepare_success_ballot_,
                                      current_ts,
                                      new_lease_interval,
                                      memberlist_with_states_.get_member_list()
                                                             .get_membership_version());
  if (CLICK_FAIL(p_election_->broadcast_(accept_req, memberlist_with_states_.get_member_list()
                                                                            .get_addr_list()))) {
    LOG_PHASE(ERROR, phase, "broadcast accept request failed", K(accept_req));
  }
  #undef PRINT_WRAPPER
}

void ElectionProposer::on_accept_response(const ElectionAcceptResponseMsg &accept_res)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(accept_res), K(new_lease_end), K(*this)
  int ret = OB_SUCCESS;
  int64_t new_lease_end = 0;
  LogPhase phase = role_ == ObRole::LEADER ? LogPhase::RENEW_LEASE : LogPhase::ELECT_LEADER;
  // 1. 检查消息的有效性
  if (!ResponseChecker::check_ballot_and_restart_counter_valid_and_accepted(accept_res, this, LogPhase::RENEW_LEASE)) {
    LOG_PHASE(WARN, phase, "receive invalid or rejected response");
  // 2. 记录多数派
  } else if (CLICK_FAIL(memberlist_with_states_.record_accept_ok(accept_res))) {
    if (OB_ENTRY_NOT_EXIST == ret) {
      LOG_PHASE(WARN, phase, "peer maybe not in memberlist anymore");
    } else if (OB_ELECTION_BELOW_MAJORITY == ret) {
      LOG_PHASE(INFO, phase, "response not reach majority, waiting more...");
    } else {
      LOG_PHASE(ERROR, phase, "unexpected error code");
    }
  // 3. 拿到新的lease
  } else if (CLICK_FAIL(memberlist_with_states_.get_majority_promised_not_vote_ts(new_lease_end))) {
    LOG_PHASE(ERROR, phase, "get majority promised not vote ts failed");
  // 4. 更新lease
  } else {
    // 4.1 如果当前lease已经失效,中间可能出现过其他的leader,而本副本还未感知,需要先卸任
    if (OB_UNLIKELY(leader_revoke_if_lease_expired_(RoleChangeReason::LeaseExpiredToRevoke))) {
      LOG_PHASE(WARN, phase, "lease is expired, need revoke before takeover");
    }
    // 4.2 更新lease和允许成员变更的版本号
    int64_t record_leader_lease_end;
    int64_t exposed_epoch;// not used
    leader_lease_and_epoch_.get(record_leader_lease_end, exposed_epoch);
    if (new_lease_end > record_leader_lease_end) {// 需要更新
      leader_lease_and_epoch_.set_lease_and_epoch_if_lease_expired_or_just_set_lease(new_lease_end, prepare_success_ballot_);
    }
    // 如果4.1步骤检查lease没有过期,但是在4.3步骤之前lease刚好过期,是有可能发生的,但是如果4.3步骤执行成功
    // 可以推出,4.1-4.3步骤之间不存在其他leader上任,因为在4.3步骤上任成功之前多数派都是被lease锁住的
    // 4.3 若之前的角色是Follower且当前lease有效则尝试上任
    if (switch_source_leader_ballot_ != INVALID_VALUE) {
      if (switch_source_leader_ballot_ != ballot_number_ - 1) {
        LOG_CHANGE_LEADER(WARN, "self ballot number is advanced, change leader failed");
      } else if (leader_takeover_if_lease_valid_(RoleChangeReason::ChangeLeaderToBeLeader)) {
        LOG_CHANGE_LEADER(INFO, "change leader, new leader takeover success");
        switch_source_leader_ballot_ = INVALID_VALUE;
        switch_source_leader_addr_.reset();
      }
    } else {
      if (leader_takeover_if_lease_valid_(RoleChangeReason::DevoteToBeLeader)) {
        LOG_ELECT_LEADER(INFO, "decentralized voting, leader takeover success");
      }
    }
  }
  // 5. 检查follower的优先级是否高于Leader,尝试触发切主
  if (role_ == ObRole::LEADER && accept_res.get_sender() != p_election_->self_addr_) {
    ObStringHolder reason;
    (void) p_election_->refresh_priority_();
    ElectionAcceptResponseMsg mock_self_accept_response_msg(p_election_->self_addr_,
                                                            p_election_->get_membership_version_(),
                                                            ElectionAcceptRequestMsg(p_election_->id_,
                                                                                     p_election_->self_addr_,
                                                                                     restart_counter_,
                                                                                     ballot_number_,
                                                                                     0,
                                                                                     record_lease_interval_,
                                                                                     p_election_->get_membership_version_()));
    if (CLICK_FAIL(mock_self_accept_response_msg.set_accepted(ballot_number_, p_election_->priority_))) {
      LOG_CHANGE_LEADER(ERROR, "construct mock acceptor response failed");
    } else if (p_election_->is_rhs_message_higher_(mock_self_accept_response_msg,
                                                   accept_res,
                                                   reason,
                                                   false,
                                                   LogPhase::RENEW_LEASE)) {
      if (!p_election_->prepare_change_leader_cb_.is_valid()) {
        LOG_CHANGE_LEADER(ERROR, "prepare_change_leader_cb_ is not valid", K(reason));
      } else {
        p_election_->event_recorder_.report_prepare_change_leader_event(accept_res.get_sender(), reason);
        if (CLICK_FAIL(p_election_->prepare_change_leader_cb_(p_election_->id_, accept_res.get_sender()))) {
          LOG_CHANGE_LEADER(ERROR, "commit change leader task failed", K(reason));
        } else {
          LOG_CHANGE_LEADER(INFO, "commit change leader task success", K(reason));
        }
      }
    }
  }
  #undef PRINT_WRAPPER
}

void ElectionProposer::inner_change_leader_to(const ObAddr &dst)
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER KR(ret), K(change_leader_msg), K(dst), K(*this)
  int ret = OB_SUCCESS;
  // 发出切主消息要带上旧主的ballot number
  int64_t switch_source_leader_ballot = ballot_number_;
  // 旧主要推高自己的ballot number,把飘在空中的accept_res都挡住,否则有正确性问题
  advance_ballot_number_and_reset_related_states_(ballot_number_ + 1,
                                                  "change leader");
  leader_lease_and_epoch_.reset();
  ElectionChangeLeaderMsg change_leader_msg(p_election_->id_,
                                            p_election_->get_self_addr(),
                                            restart_counter_,
                                            ballot_number_,
                                            switch_source_leader_ballot,
                                            p_election_->get_membership_version_());
  if (OB_LIKELY(leader_revoke_if_lease_expired_(RoleChangeReason::ChangeLeaderToRevoke))) {
    LOG_CHANGE_LEADER(INFO, "change leader, old leader revoke");
    p_election_->event_recorder_.report_change_leader_to_revoke_event(dst);
  } else {
    LOG_CHANGE_LEADER(ERROR, "change leader, old leader revoke failed, which should not happen");
  }
  change_leader_msg.set_receiver(dst);
  if (CLICK_FAIL(p_election_->send_(change_leader_msg))) {
    LOG_CHANGE_LEADER(ERROR, "send change leader msg failed");
  } else {
    LOG_CHANGE_LEADER(INFO, "change leader, old leader revoke");
  }
  #undef PRINT_WRAPPER
}

void ElectionProposer::on_change_leader(const ElectionChangeLeaderMsg &change_leader_msg)
{
  ELECT_TIME_GUARD(500_ms);
  // 新主收到切主消息后,进行一次Leader Prepare
  #define PRINT_WRAPPER K(change_leader_msg), K(*this)
  LOG_CHANGE_LEADER(INFO, "handle change leader message");
  bool accept = false;
  if (change_leader_msg.get_sender() == p_election_->get_self_addr()) {// 自己切给自己的
    if (change_leader_msg.get_ballot_number() == ballot_number_) {
      accept = true;
    } else {
      LOG_CHANGE_LEADER(WARN, "change leader to self msg's ballot number not expected");
    }
  } else {// 别人切给自己的
    if (change_leader_msg.get_ballot_number() > ballot_number_) {
      accept = true;
    } else {
      LOG_CHANGE_LEADER(WARN, "change leader msg's ballot number is too small");
    }
  }
  if (!accept) {
    LOG_CHANGE_LEADER(WARN, "change leader msg not accepted");
  } else if (change_leader_msg.get_membership_version() > memberlist_with_states_.
                                                     get_member_list().
                                                     get_membership_version()) {
    LOG_CHANGE_LEADER(WARN, "change leader msg's membership version is larger than self");
  } else {
    advance_ballot_number_and_reset_related_states_(change_leader_msg.get_ballot_number(),
                                                    "receive change leader message");
    switch_source_leader_ballot_ = change_leader_msg.get_old_ballot_number();
    switch_source_leader_addr_ = change_leader_msg.get_sender();
    prepare(ObRole::LEADER);
    LOG_CHANGE_LEADER(INFO, "receive change leader msg, do leader prepare");
  }
  #undef PRINT_WRAPPER
}

int64_t ElectionProposer::to_string(char *buf, const int64_t buf_len) const
{
  int64_t pos = 0;
  if (OB_NOT_NULL(p_election_)) {
    common::databuff_printf(buf, buf_len, pos, "{ls_id:{id:%ld}", p_election_->id_);
    common::databuff_printf(buf, buf_len, pos, ", addr:%s", to_cstring(p_election_->get_self_addr()));
  }
  common::databuff_printf(buf, buf_len, pos, ", role:%s", obj_to_string(role_));
  common::databuff_printf(buf, buf_len, pos, ", ballot_number:%ld", ballot_number_);
  if (prepare_success_ballot_ != INVALID_VALUE) {
    common::databuff_printf(buf, buf_len, pos, ", prepare_success_ballot:%ld", prepare_success_ballot_);
  }
  int64_t lease = record_lease_interval_;
  if (lease > 1_s) {
    common::databuff_printf(buf, buf_len, pos, ", lease_interval:%.2lfs",lease * 1.0 / 1_s);
  } else if (lease > 1_ms) {
    common::databuff_printf(buf, buf_len, pos, ", lease_interval:%.2lfms", lease * 1.0 / 1_ms);
  } else {
    common::databuff_printf(buf, buf_len, pos, ", lease_interval:%ldus", lease);
  }
  common::databuff_printf(buf, buf_len, pos, ", memberlist_with_states:%s",
                                                to_cstring(memberlist_with_states_));
  if (leader_lease_and_epoch_.is_valid()) {// 非有效的leader不打印lease信息
    common::databuff_printf(buf, buf_len, pos, ", lease_and_epoch:%s",
                                                  to_cstring(leader_lease_and_epoch_));
  }
  if (switch_source_leader_ballot_ != INVALID_VALUE) {// 该变量与切主相关,只在切主过程中打印该变量
    common::databuff_printf(buf, buf_len, pos, ", switch_source_leader_ballot:%ld",
                                                  switch_source_leader_ballot_);
  }
  if (switch_source_leader_addr_.is_valid()) {// 该变量与切主相关,只在切主过程中打印该变量
    common::databuff_printf(buf, buf_len, pos, ", switch_source_leader_addr:%s",
                                                  to_cstring(switch_source_leader_addr_));
  }
  common::databuff_printf(buf, buf_len, pos, ", restart_counter:%ld", restart_counter_);
  common::databuff_printf(buf, buf_len, pos, ", last_do_prepare_ts:%s", ObTime2Str::ob_timestamp_str_range<YEAR, USECOND>(last_do_prepare_ts_));
  if (OB_NOT_NULL(p_election_)) {
    common::databuff_printf(buf, buf_len, pos, ", self_priority:%s", p_election_->priority_ == nullptr ? "NULL" : to_cstring(*(p_election_->priority_)));
  }
  common::databuff_printf(buf, buf_len, pos, ", p_election:0x%lx}", (unsigned long)p_election_);
  return pos;
}

O
obdev 已提交
701
int ElectionProposer::revoke(const RoleChangeReason &reason)
W
wangzelin.wzl 已提交
702 703 704 705 706 707 708 709 710
{
  ELECT_TIME_GUARD(500_ms);
  #define PRINT_WRAPPER K(*this)
  int ret = OB_SUCCESS;
  if (!check_leader()) {
    ret = OB_NOT_MASTER;
    LOG_NONE(WARN, "i am not leader, but someone ask me to revoke", K(lbt()));
  }
  leader_lease_and_epoch_.reset();
O
obdev 已提交
711
  if (!leader_revoke_if_lease_expired_(reason)) {
W
wangzelin.wzl 已提交
712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733
    LOG_NONE(WARN, "somethig wrong when revoke", K(lbt()));
  }
  return ret;
  #undef PRINT_WRAPPER
}

bool ElectionProposer::is_self_in_memberlist_() const
{
  bool ret = false;
  const ObArray<ObAddr> &addr_list = memberlist_with_states_.get_member_list().get_addr_list();
  for (int64_t idx = 0; idx < addr_list.count(); ++idx) {
    if (addr_list[idx] == p_election_->self_addr_) {
      ret = true;
      break;
    }
  }
  return ret;
}

}
}
}