提交 e9d6a79f 编写于 作者: O obdev 提交者: ob-robot

Fix palf fs_cb dead lock bug.

上级 134dfc14
......@@ -305,6 +305,26 @@ int LogEngine::submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx,
return ret;
}
int LogEngine::submit_sliding_cb_task(const int cb_pool_tg_id, const LogSlidingCbCtx &sliding_cb_ctx)
{
int ret = OB_SUCCESS;
LogSlidingCbTask *sliding_cb_task = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogEngine not inited!!!");
} else if (0 >= cb_pool_tg_id || !sliding_cb_ctx.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument!!!", K(cb_pool_tg_id), K(sliding_cb_ctx));
} else if (OB_FAIL(generate_sliding_cb_task_(sliding_cb_ctx, sliding_cb_task))) {
PALF_LOG(ERROR, "generate_sliding_cb_task_ failed", K(sliding_cb_ctx));
} else if (OB_FAIL(push_task_into_cb_thread_pool(cb_pool_tg_id, sliding_cb_task))) {
PALF_LOG(ERROR, "submit sliding_cb_task failed");
} else {
PALF_LOG(TRACE, "submit_sliding_cb_task success", K(ret), K(cb_pool_tg_id), K(sliding_cb_ctx));
}
return ret;
}
int LogEngine::submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx,
const LogPrepareMeta &prepare_meta)
{
......@@ -1192,6 +1212,23 @@ int LogEngine::generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx,
return ret;
}
int LogEngine::generate_sliding_cb_task_(const LogSlidingCbCtx &sliding_cb_ctx,
LogSlidingCbTask *&sliding_cb_task)
{
int ret = OB_SUCCESS;
// Be careful to handle the duration of this pointer
sliding_cb_task = NULL;
if (!sliding_cb_ctx.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else if (NULL == (sliding_cb_task = alloc_mgr_->alloc_log_sliding_cb_task(palf_id_, palf_epoch_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(ERROR, "alloc_log_sliding_cb_task failed");
} else if (OB_FAIL(sliding_cb_task->init(sliding_cb_ctx))) {
PALF_LOG(ERROR, "init LogSlidingCbTask failed");
} else {
}
return ret;
}
int LogEngine::generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx,
LogIOTruncateLogTask *&truncate_log_task)
{
......
......@@ -36,10 +36,12 @@ class LogIOWorker;
class PalfHandleImpl;
class LogIOTask;
class LogIOFlushLogTask;
class LogSlidingCbTask;
class LogIOTruncateLogTask;
class LogIOFlushMetaTask;
class LogIOTruncatePrefixBlocksTask;
class FlushLogCbCtx;
class LogSlidingCbCtx;
class TruncateLogCbCtx;
class FlushMetaCbCtx;
class TruncatePrefixBlocksCbCtx;
......@@ -113,6 +115,8 @@ public:
const int64_t buf_len);
virtual int submit_flush_log_task(const FlushLogCbCtx &flush_log_cb_ctx, const LogWriteBuf &write_buf);
int submit_sliding_cb_task(const int cb_pool_tg_id,
const LogSlidingCbCtx &sliding_cb_ctx);
int submit_flush_prepare_meta_task(const FlushMetaCbCtx &flush_meta_cb_ctx,
const LogPrepareMeta &prepare_meta);
......@@ -400,6 +404,8 @@ private:
int generate_flush_log_task_(const FlushLogCbCtx &flush_log_cb_ctx,
const LogWriteBuf &write_buf,
LogIOFlushLogTask *&flush_log_task);
int generate_sliding_cb_task_(const LogSlidingCbCtx &sliding_cb_ctx,
LogSlidingCbTask *&sliding_cb_task);
int generate_truncate_log_task_(const TruncateLogCbCtx &truncate_log_cb_ctx,
LogIOTruncateLogTask *&truncate_log_task);
......
......@@ -218,6 +218,77 @@ void LogIOFlushLogTask::free_this_(IPalfEnvImpl *palf_env_impl)
palf_env_impl->get_log_allocator()->free_log_io_flush_log_task(this);
}
LogSlidingCbTask::LogSlidingCbTask(const int64_t palf_id, const int64_t palf_epoch)
: LogIOTask(palf_id, palf_epoch), sliding_cb_ctx_(), is_inited_(false)
{}
LogSlidingCbTask::~LogSlidingCbTask()
{
destroy();
}
int LogSlidingCbTask::init(const LogSlidingCbCtx &sliding_cb_ctx)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "LogSlidingCbTask has been inited");
} else if (!sliding_cb_ctx.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invaild arguments!!!", K(palf_id_), K(palf_epoch_), K(sliding_cb_ctx));
} else {
sliding_cb_ctx_ = sliding_cb_ctx;
is_inited_ = true;
PALF_LOG(TRACE, "LogSlidingCbTask init success", K(ret), K(sliding_cb_ctx));
}
return ret;
}
void LogSlidingCbTask::destroy()
{
if (IS_INIT) {
is_inited_ = false;
sliding_cb_ctx_.reset();
PALF_LOG(TRACE, "LogSlidingCbTask destroy", KP(this));
}
}
int LogSlidingCbTask::do_task_(int tg_id, IPalfEnvImpl *palf_env_impl)
{
UNUSED(tg_id);
UNUSED(palf_env_impl);
return OB_SUCCESS;
}
// NB: the memory of 'this' will be release
int LogSlidingCbTask::after_consume_(IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
int64_t palf_epoch = -1;
IPalfHandleImplGuard guard;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(ERROR, "LogIOFlusLoghTask not inited");
} else if (OB_FAIL(palf_env_impl->get_palf_handle_impl(palf_id_, guard))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_handle_impl failed", K(palf_id_));
} else if (OB_FAIL(guard.get_palf_handle_impl()->get_palf_epoch(palf_epoch))) {
PALF_LOG(WARN, "IPalfEnvImpl get_palf_epoch failed", K(palf_id_));
} else if (palf_epoch != palf_epoch_) {
PALF_LOG(WARN, "palf_epoch has changed, drop task", K(palf_id_), K(palf_epoch),
K_(sliding_cb_ctx));
} else if (OB_FAIL(guard.get_palf_handle_impl()->file_size_cb(sliding_cb_ctx_))) {
PALF_LOG(ERROR, "file_size_cb failed", K_(sliding_cb_ctx));
} else {}
// free this object
palf_env_impl->get_log_allocator()->free_log_sliding_cb_task(this);
return ret;
}
void LogSlidingCbTask::free_this_(IPalfEnvImpl *palf_env_impl)
{
return;
}
LogIOTruncateLogTask::LogIOTruncateLogTask(const int64_t palf_id, const int64_t palf_epoch)
: LogIOTask(palf_id, palf_epoch), truncate_log_cb_ctx_(), is_inited_(false)
{}
......
......@@ -32,7 +32,8 @@ enum class LogIOTaskType
FLUSH_META_TYPE = 2,
TRUNCATE_PREFIX_TYPE = 3,
TRUNCATE_LOG_TYPE = 4,
FLASHBACK_LOG_TYPE = 5
FLASHBACK_LOG_TYPE = 5,
SLIDING_CB_TYPE = 6
};
class LogIOTask;
......@@ -209,6 +210,25 @@ private:
int64_t palf_id_;
bool is_inited_;
};
class LogSlidingCbTask : public LogIOTask {
public:
LogSlidingCbTask(const int64_t palf_id,const int64_t palf_epoch);
~LogSlidingCbTask();
public:
int init(const LogSlidingCbCtx &sliding_cb_ctx);
void destroy();
int do_task_(int tg_id, IPalfEnvImpl *palf_env_impl) override final;
// IO cb thread will call this function to call fs cb
int after_consume_(IPalfEnvImpl *palf_env_impl) override final;
LogIOTaskType get_io_task_type_() const override final { return LogIOTaskType::SLIDING_CB_TYPE; }
void free_this_(IPalfEnvImpl *palf_env_impl) override final;
INHERIT_TO_STRING_KV("LogSlidingCbTask", LogIOTask, K_(is_inited), K_(sliding_cb_ctx));
private:
LogSlidingCbCtx sliding_cb_ctx_;
bool is_inited_;
};
} // end namespace palf
} // end namespace oceanbase
......
......@@ -71,6 +71,45 @@ FlushLogCbCtx& FlushLogCbCtx::operator=(const FlushLogCbCtx &arg)
return *this;
}
LogSlidingCbCtx::LogSlidingCbCtx()
: palf_id_(INVALID_PALF_ID),
log_end_lsn_(),
log_proposal_id_(INVALID_PROPOSAL_ID)
{}
LogSlidingCbCtx::LogSlidingCbCtx(const int64_t palf_id,
const LSN &log_end_lsn,
const int64_t log_proposal_id)
: palf_id_(palf_id),
log_end_lsn_(log_end_lsn),
log_proposal_id_(log_proposal_id)
{}
LogSlidingCbCtx::~LogSlidingCbCtx()
{
reset();
}
bool LogSlidingCbCtx::is_valid() const
{
return (is_valid_palf_id(palf_id_) && log_end_lsn_.is_valid() && INVALID_PROPOSAL_ID != log_proposal_id_);
}
void LogSlidingCbCtx::reset()
{
palf_id_ = INVALID_PALF_ID;
log_end_lsn_.reset();
log_proposal_id_ = INVALID_PROPOSAL_ID;
}
LogSlidingCbCtx& LogSlidingCbCtx::operator=(const LogSlidingCbCtx &arg)
{
palf_id_ = arg.palf_id_;
log_end_lsn_ = arg.log_end_lsn_;
log_proposal_id_ = arg.log_proposal_id_;
return *this;
}
TruncateLogCbCtx::TruncateLogCbCtx()
: lsn_()
{
......
......@@ -46,6 +46,20 @@ struct FlushLogCbCtx
int64_t begin_ts_;
};
struct LogSlidingCbCtx
{
LogSlidingCbCtx();
LogSlidingCbCtx(const int64_t palf_id, const LSN &log_end_lsn, const int64_t log_proposal_id);
~LogSlidingCbCtx();
bool is_valid() const;
void reset();
LogSlidingCbCtx &operator=(const LogSlidingCbCtx &flush_log_cb_ctx);
TO_STRING_KV(K_(palf_id), K_(log_end_lsn), K_(log_proposal_id));
int64_t palf_id_;
LSN log_end_lsn_;
int64_t log_proposal_id_;
};
struct TruncateLogCbCtx {
TruncateLogCbCtx(const LSN &lsn);
TruncateLogCbCtx();
......
......@@ -112,7 +112,6 @@ LogSlidingWindow::LogSlidingWindow()
is_rebuilding_(false),
last_rebuild_lsn_(),
last_record_end_lsn_(PALF_INITIAL_LSN_VAL),
fs_cb_cost_stat_("[PALF STAT FS CB]", 2 * 1000 * 1000),
log_life_time_stat_("[PALF STAT LOG LIFETIME]", 2 * 1000 * 1000),
log_submit_wait_stat_("[PALF STAT LOG SUBMIT WAIT]", 2 * 1000 * 1000),
log_submit_to_slide_cost_stat_("[PALF STAT LOG SLIDE WAIT]", 2 * 1000 * 1000),
......@@ -121,6 +120,7 @@ LogSlidingWindow::LogSlidingWindow()
accum_group_log_size_(0),
last_record_group_log_id_(FIRST_VALID_LOG_ID - 1),
freeze_mode_(FEEDBACK_FREEZE_MODE),
cb_pool_tg_id_(-1),
is_inited_(false)
{}
......@@ -140,6 +140,7 @@ void LogSlidingWindow::destroy()
log_engine_ = NULL;
mm_ = NULL;
mode_mgr_ = NULL;
cb_pool_tg_id_ = -1;
}
int LogSlidingWindow::flashback(const PalfBaseInfo &palf_base_info, const int64_t palf_id, common::ObILogAllocator *alloc_mgr)
......@@ -201,7 +202,8 @@ int LogSlidingWindow::init(const int64_t palf_id,
palf::PalfFSCbWrapper *palf_fs_cb,
common::ObILogAllocator *alloc_mgr,
const PalfBaseInfo &palf_base_info,
const bool is_normal_replica)
const bool is_normal_replica,
const int cb_pool_tg_id)
{
int ret = OB_SUCCESS;
const LogInfo &prev_log_info = palf_base_info.prev_log_info_;
......@@ -214,10 +216,11 @@ int LogSlidingWindow::init(const int64_t palf_id,
|| NULL == mm
|| NULL == mode_mgr
|| NULL == log_engine
|| NULL == palf_fs_cb) {
|| NULL == palf_fs_cb
|| 0 >= cb_pool_tg_id) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argumetns", K(ret), K(palf_id), K(self), K(palf_base_info),
KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb));
KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb), K(cb_pool_tg_id));
} else if (is_normal_replica && OB_FAIL(do_init_mem_(palf_id, palf_base_info, alloc_mgr))) {
PALF_LOG(WARN, "do_init_mem_ failed", K(ret), K(palf_id));
} else {
......@@ -249,6 +252,7 @@ int LogSlidingWindow::init(const int64_t palf_id,
MEMSET(append_cnt_array_, 0, APPEND_CNT_ARRAY_SIZE * sizeof(int64_t));
cb_pool_tg_id_ = cb_pool_tg_id;
is_inited_ = true;
LogGroupEntryHeader group_header;
LogEntryHeader log_header;
......@@ -1951,51 +1955,41 @@ int LogSlidingWindow::sliding_cb(const int64_t sn, const FixedSlidingWindowSlot
const int64_t log_submit_ts = log_task->get_submit_ts();
log_task->unlock();
int tmp_ret = OB_SUCCESS;
const int64_t fs_cb_begin_ts = ObTimeUtility::current_time();
if (OB_SUCCESS != (tmp_ret = palf_fs_cb_->update_end_lsn(palf_id_, log_end_lsn, log_proposal_id))) {
if (OB_EAGAIN == tmp_ret) {
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
PALF_LOG(WARN, "update_end_lsn eagain", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
} else {
PALF_LOG(WARN, "update_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
LogSlidingCbCtx sliding_cb_ctx(palf_id_, log_end_lsn, log_proposal_id);
if (OB_FAIL(log_engine_->submit_sliding_cb_task(cb_pool_tg_id_, sliding_cb_ctx))) {
PALF_LOG(ERROR, "submit_sliding_cb_task failed", K_(palf_id), K_(self), K(log_id), KPC(log_task));
}
const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts;
fs_cb_cost_stat_.stat(fs_cb_cost);
if (fs_cb_cost > 1 * 1000) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb->update_end_lsn() cost too much time", K(tmp_ret), K_(palf_id), K_(self),
K(fs_cb_cost), K(log_id), K(log_begin_lsn), K(log_end_lsn), K(log_proposal_id));
}
const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts;
log_life_time_stat_.stat(log_life_time);
log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts);
log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts);
if (log_life_time > 100 * 1000) {
if (palf_reach_time_interval(100 * 1000, log_life_long_warn_time_)) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "log_task life cost too much time", K_(palf_id), K_(self), K(log_id), KPC(log_task),
K(fs_cb_begin_ts), K(log_life_time));
if (OB_SUCC(ret)) {
const int64_t log_life_time = fs_cb_begin_ts - log_gen_ts;
log_life_time_stat_.stat(log_life_time);
log_submit_wait_stat_.stat(log_submit_ts - log_gen_ts);
log_submit_to_slide_cost_stat_.stat(fs_cb_begin_ts - log_submit_ts);
if (log_life_time > 100 * 1000) {
if (palf_reach_time_interval(100 * 1000, log_life_long_warn_time_)) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "log_task life cost too much time", K_(palf_id), K_(self), K(log_id), KPC(log_task),
K(fs_cb_begin_ts), K(log_life_time));
}
}
}
if (OB_FAIL(checksum_.verify_accum_checksum(log_task_header.data_checksum_,
log_task_header.accum_checksum_))) {
PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(ret), K(log_id), KPC(log_task));
} else {
// update last_slide_lsn_
(void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \
log_proposal_id, log_accum_checksum);
}
if (OB_FAIL(checksum_.verify_accum_checksum(log_task_header.data_checksum_,
log_task_header.accum_checksum_))) {
PALF_LOG(ERROR, "verify_accum_checksum failed", K_(palf_id), K_(self), K(log_id), KPC(log_task));
} else {
// update last_slide_lsn_
(void) try_update_last_slide_log_info_(log_id, log_max_scn, log_begin_lsn, log_end_lsn, \
log_proposal_id, log_accum_checksum);
}
MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly
MEM_BARRIER(); // ensure last_slide_log_info_ has been updated before fetch log streamingly
if (OB_SUCC(ret)
&& (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) {
// Check if need fetch log streamingly,
try_fetch_log_streamingly_(log_end_lsn);
if (OB_SUCC(ret)
&& (FOLLOWER == state_mgr_->get_role() || state_mgr_->is_leader_reconfirm())) {
// Check if need fetch log streamingly,
try_fetch_log_streamingly_(log_end_lsn);
}
}
}
if (0 == log_id % 100) {
......
......@@ -53,6 +53,7 @@ class LogModeMgr;
class LogTask;
class LogGroupEntry;
class TruncateLogCbCtx;
class LogIOTask;
enum FetchTriggerType
{
......@@ -131,6 +132,8 @@ private:
common::ObMemberList lagged_list_;
};
extern int push_task_into_cb_thread_pool(const int64_t tg_id, LogIOTask *io_task);
class LogSlidingWindow : public ISlidingCallBack
{
public:
......@@ -148,7 +151,8 @@ public:
palf::PalfFSCbWrapper *palf_fs_cb,
common::ObILogAllocator *alloc_mgr,
const PalfBaseInfo &palf_base_info,
const bool is_normal_replica);
const bool is_normal_replica,
const int cb_pool_tg_id);
virtual int sliding_cb(const int64_t sn, const FixedSlidingWindowSlot *data);
virtual int64_t get_max_log_id() const;
virtual const share::SCN get_max_scn() const;
......@@ -501,7 +505,6 @@ private:
bool is_rebuilding_;
LSN last_rebuild_lsn_;
LSN last_record_end_lsn_;
ObMiniStat::ObStatItem fs_cb_cost_stat_;
ObMiniStat::ObStatItem log_life_time_stat_;
ObMiniStat::ObStatItem log_submit_wait_stat_;
ObMiniStat::ObStatItem log_submit_to_slide_cost_stat_;
......@@ -511,6 +514,7 @@ private:
int64_t last_record_group_log_id_;
int64_t append_cnt_array_[APPEND_CNT_ARRAY_SIZE];
FreezeMode freeze_mode_;
int cb_pool_tg_id_;
bool is_inited_;
private:
DISALLOW_COPY_AND_ASSIGN(LogSlidingWindow);
......
......@@ -364,7 +364,7 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id,
PALF_LOG(WARN, "alloc palf_handle_impl failed", K(ret));
} else if (OB_FAIL(palf_handle_impl->init(palf_id, access_mode, palf_base_info, replica_type,
&fetch_log_engine_, base_dir, log_alloc_mgr_, log_block_pool_, &log_rpc_, &log_io_worker_,
this, self_, &election_timer_, palf_epoch))) {
this, self_, &election_timer_, palf_epoch, cb_thread_pool_.get_tg_id()))) {
PALF_LOG(ERROR, "IPalfHandleImpl init failed", K(ret), K(palf_id));
// NB: always insert value into hash map finally.
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) {
......@@ -917,6 +917,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
LSKey hash_map_key(palf_id);
bool is_integrity = true;
const int64_t palf_epoch = ATOMIC_AAF(&last_palf_epoch_, 1);
const int cb_pool_tg_id = cb_thread_pool_.get_tg_id();
if (0 > (pret = snprintf(base_dir, MAX_PATH_SIZE, "%s/%ld", log_dir_, palf_id))) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(WARN, "snprint failed", K(ret), K(pret), K(palf_id));
......@@ -924,7 +925,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(WARN, "alloc ipalf_handle_impl failed", K(ret));
} else if (OB_FAIL(tmp_palf_handle_impl->load(palf_id, &fetch_log_engine_, base_dir, log_alloc_mgr_,
log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, is_integrity))) {
log_block_pool_, &log_rpc_, &log_io_worker_, this, self_, &election_timer_, palf_epoch, cb_pool_tg_id, is_integrity))) {
PALF_LOG(ERROR, "PalfHandleImpl init failed", K(ret), K(palf_id));
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) {
PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl));
......
......@@ -69,6 +69,7 @@ PalfHandleImpl::PalfHandleImpl()
palf_env_impl_(NULL),
append_cost_stat_("[PALF STAT WRITE LOG]", 2 * 1000 * 1000),
flush_cb_cost_stat_("[PALF STAT FLUSH CB]", 2 * 1000 * 1000),
fs_cb_cost_stat_("[PALF STAT FS CB]", 2 * 1000 * 1000),
replica_meta_lock_(),
rebuilding_lock_(),
config_change_lock_(),
......@@ -106,7 +107,8 @@ int PalfHandleImpl::init(const int64_t palf_id,
IPalfEnvImpl *palf_env_impl,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
const int64_t palf_epoch)
const int64_t palf_epoch,
const int cb_pool_tg_id)
{
int ret = OB_SUCCESS;
int pret = 0;
......@@ -128,11 +130,12 @@ int PalfHandleImpl::init(const int64_t palf_id,
|| NULL == palf_env_impl
|| false == self.is_valid()
|| NULL == election_timer
|| palf_epoch < 0) {
|| palf_epoch < 0
|| cb_pool_tg_id <= 0) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(palf_base_info), K(replica_type),
K(access_mode), K(log_dir), K(alloc_mgr), K(log_block_pool), K(log_rpc),
K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch));
K(log_io_worker), K(palf_env_impl), K(self), K(election_timer), K(palf_epoch), K(cb_pool_tg_id));
} else if (OB_FAIL(log_meta.generate_by_palf_base_info(palf_base_info, access_mode, replica_type))) {
PALF_LOG(WARN, "generate_by_palf_base_info failed", K(ret), K(palf_id), K(palf_base_info), K(access_mode), K(replica_type));
} else if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) {
......@@ -143,11 +146,11 @@ int PalfHandleImpl::init(const int64_t palf_id,
PALF_LOG(WARN, "LogEngine init failed", K(ret), K(palf_id), K(log_dir), K(alloc_mgr),
K(log_rpc), K(log_io_worker));
} else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_meta, log_dir, self, fetch_log_engine,
alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) {
alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer, cb_pool_tg_id))) {
PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id));
} else {
PALF_EVENT("PalfHandleImpl init success", palf_id_, K(ret), K(self), K(access_mode), K(palf_base_info),
K(replica_type), K(log_dir), K(log_meta), K(palf_epoch));
K(replica_type), K(log_dir), K(log_meta), K(palf_epoch), K(cb_pool_tg_id));
}
return ret;
}
......@@ -168,6 +171,7 @@ int PalfHandleImpl::load(const int64_t palf_id,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
const int64_t palf_epoch,
const int cb_pool_tg_id,
bool &is_integrity)
{
int ret = OB_SUCCESS;
......@@ -200,7 +204,7 @@ int PalfHandleImpl::load(const int64_t palf_id,
} else if (OB_FAIL(construct_palf_base_info_(max_committed_end_lsn, palf_base_info))) {
PALF_LOG(WARN, "construct_palf_base_info_ failed", K(ret), K(palf_id), K(entry_header), K(palf_base_info));
} else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_engine_.get_log_meta(), log_dir, self,
fetch_log_engine, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer))) {
fetch_log_engine, alloc_mgr, log_rpc, log_io_worker, palf_env_impl, election_timer, cb_pool_tg_id))) {
PALF_LOG(WARN, "PalfHandleImpl do_init_mem_ failed", K(ret), K(palf_id));
} else if (OB_FAIL(append_disk_log_to_sw_(max_committed_end_lsn))) {
PALF_LOG(WARN, "append_disk_log_to_sw_ failed", K(ret), K(palf_id));
......@@ -2258,7 +2262,8 @@ int PalfHandleImpl::do_init_mem_(
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
IPalfEnvImpl *palf_env_impl,
common::ObOccamTimer *election_timer)
common::ObOccamTimer *election_timer,
const int cb_pool_tg_id)
{
int ret = OB_SUCCESS;
int pret = -1;
......@@ -2276,8 +2281,8 @@ int PalfHandleImpl::do_init_mem_(
if ((pret = snprintf(log_dir_, MAX_PATH_SIZE, "%s", log_dir)) && false) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id));
} else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_,
&log_engine_, &fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica))) {
} else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_, &log_engine_,
&fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica, cb_pool_tg_id))) {
PALF_LOG(WARN, "sw_ init failed", K(ret), K(palf_id));
} else if (OB_FAIL(election_.init_and_start(palf_id,
election_timer,
......@@ -4064,6 +4069,23 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn)
return ret;
}
int PalfHandleImpl::file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx)
{
int ret = OB_SUCCESS;
const int64_t fs_cb_begin_ts = ObTimeUtility::current_time();
if (OB_FAIL(fs_cb_wrapper_.update_end_lsn(sliding_cb_ctx.palf_id_, \
sliding_cb_ctx.log_end_lsn_, sliding_cb_ctx.log_proposal_id_))) {
PALF_LOG(WARN, "fs_cb_wrapper_.update_end_lsn failed", KPC(this), K(sliding_cb_ctx));
}
const int64_t fs_cb_cost = ObTimeUtility::current_time() - fs_cb_begin_ts;
fs_cb_cost_stat_.stat(fs_cb_cost);
if (fs_cb_cost > 1 * 1000) {
PALF_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fs_cb_wrapper_.update_end_lsn() cost too much time",
K_(palf_id), K_(self), K(fs_cb_cost), K(sliding_cb_ctx));
}
return ret;
}
PalfStat::PalfStat()
: self_(),
palf_id_(INVALID_PALF_ID),
......
......@@ -572,6 +572,7 @@ public:
virtual int get_palf_epoch(int64_t &palf_epoch) const = 0;
virtual int diagnose(PalfDiagnoseInfo &diagnose_info) const = 0;
virtual int update_palf_stat() = 0;
virtual int file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx) = 0;
DECLARE_PURE_VIRTUAL_TO_STRING;
};
......@@ -594,7 +595,8 @@ public:
IPalfEnvImpl *palf_env_impl,
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
const int64_t palf_epoch);
const int64_t palf_epoch,
const int cb_pool_tg_id);
bool check_can_be_used() const override final;
// 重启接口
// 1. 生成迭代器,定位meta_storage和log_storage的终点;
......@@ -612,6 +614,7 @@ public:
const common::ObAddr &self,
common::ObOccamTimer *election_timer,
const int64_t palf_epoch,
const int cb_pool_tg_id,
bool &is_integrity);
void destroy();
int start();
......@@ -842,6 +845,7 @@ public:
const int64_t timeout_us) override final;
int diagnose(PalfDiagnoseInfo &diagnose_info) const;
int update_palf_stat() override final;
int file_size_cb(const LogSlidingCbCtx &sliding_cb_ctx);
TO_STRING_KV(K_(palf_id), K_(self), K_(has_set_deleted));
private:
int do_init_mem_(const int64_t palf_id,
......@@ -854,7 +858,8 @@ private:
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
IPalfEnvImpl *palf_env_impl,
common::ObOccamTimer *election_timer);
common::ObOccamTimer *election_timer,
const int cb_pool_tg_id);
int after_flush_prepare_meta_(const int64_t &proposal_id);
int after_flush_config_change_meta_(const int64_t proposal_id, const LogConfigVersion &config_version);
int after_flush_mode_meta_(const int64_t proposal_id,
......@@ -1034,6 +1039,7 @@ private:
bool diskspace_enough_;
ObMiniStat::ObStatItem append_cost_stat_;
ObMiniStat::ObStatItem flush_cb_cost_stat_;
ObMiniStat::ObStatItem fs_cb_cost_stat_;
// a spin lock for read/write replica_meta mutex
SpinLock replica_meta_lock_;
SpinLock rebuilding_lock_;
......
......@@ -29,6 +29,7 @@ namespace common
ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
: tenant_id_(tenant_id), total_limit_(INT64_MAX), pending_replay_mutator_size_(0),
LOG_IO_FLUSH_LOG_TASK_SIZE(sizeof(palf::LogIOFlushLogTask)),
LOG_SLIDING_CB_TASK_SIZE(sizeof(palf::LogSlidingCbTask)),
LOG_IO_TRUNCATE_LOG_TASK_SIZE(sizeof(palf::LogIOTruncateLogTask)),
LOG_IO_FLUSH_META_TASK_SIZE(sizeof(palf::LogIOFlushMetaTask)),
LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE(sizeof(palf::LogIOTruncatePrefixBlocksTask)),
......@@ -43,6 +44,7 @@ ObTenantMutilAllocator::ObTenantMutilAllocator(uint64_t tenant_id)
inner_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, inner_table_replay_blk_alloc_),
user_table_replay_task_alloc_(ObMemAttr(tenant_id, ObModIds::OB_LOG_REPLAY_ENGINE), ObVSliceAlloc::DEFAULT_BLOCK_SIZE, user_table_replay_blk_alloc_),
log_io_flush_log_task_alloc_(LOG_IO_FLUSH_LOG_TASK_SIZE, ObMemAttr(tenant_id, "FlushLog"), choose_blk_size(LOG_IO_FLUSH_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_sliding_cb_task_alloc_(LOG_SLIDING_CB_TASK_SIZE, ObMemAttr(tenant_id, "SlidingCb"), choose_blk_size(LOG_SLIDING_CB_TASK_SIZE), clog_blk_alloc_, this),
log_io_truncate_log_task_alloc_(LOG_IO_TRUNCATE_LOG_TASK_SIZE, ObMemAttr(tenant_id, "TruncateLog"), choose_blk_size(LOG_IO_TRUNCATE_LOG_TASK_SIZE), clog_blk_alloc_, this),
log_io_flush_meta_task_alloc_(LOG_IO_FLUSH_META_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_FLUSH_META_TASK_SIZE), clog_blk_alloc_, this),
log_io_truncate_prefix_blocks_task_alloc_(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE, ObMemAttr(tenant_id, "FlushMeta"), choose_blk_size(LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE), clog_blk_alloc_, this),
......@@ -82,6 +84,7 @@ void ObTenantMutilAllocator::try_purge()
inner_table_replay_task_alloc_.purge_extra_cached_block(0);
user_table_replay_task_alloc_.purge_extra_cached_block(0);
log_io_flush_log_task_alloc_.purge_extra_cached_block(0);
log_sliding_cb_task_alloc_.purge_extra_cached_block(0);
log_io_truncate_log_task_alloc_.purge_extra_cached_block(0);
log_io_flush_meta_task_alloc_.purge_extra_cached_block(0);
log_io_truncate_prefix_blocks_task_alloc_.purge_extra_cached_block(0);
......@@ -180,6 +183,28 @@ void ObTenantMutilAllocator::free_log_io_flush_log_task(LogIOFlushLogTask *ptr)
}
}
LogSlidingCbTask *ObTenantMutilAllocator::alloc_log_sliding_cb_task(
const int64_t palf_id, const int64_t palf_epoch)
{
LogSlidingCbTask *ret_ptr = NULL;
void *ptr = log_sliding_cb_task_alloc_.alloc();
if (NULL != ptr) {
ret_ptr = new(ptr)LogSlidingCbTask(palf_id, palf_epoch);
ATOMIC_INC(&flying_sliding_cb_task_);
}
return ret_ptr;
}
void ObTenantMutilAllocator::free_log_sliding_cb_task(LogSlidingCbTask *ptr)
{
if (OB_LIKELY(NULL != ptr)) {
ptr->~LogSlidingCbTask();
log_sliding_cb_task_alloc_.free(ptr);
ATOMIC_DEC(&flying_sliding_cb_task_);
}
}
LogIOTruncateLogTask *ObTenantMutilAllocator::alloc_log_io_truncate_log_task(
const int64_t palf_id, const int64_t palf_epoch)
{
......
......@@ -25,6 +25,7 @@ namespace oceanbase
namespace palf
{
class LogIOFlushLogTask;
class LogSlidingCbTask;
class LogIOTruncateLogTask;
class LogIOFlushMetaTask;
class LogIOTruncatePrefixBlocksTask;
......@@ -43,7 +44,7 @@ class ObTraceProfile;
class ObILogAllocator : public ObIAllocator
{
public:
ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0) {}
ObILogAllocator() : flying_log_task_(0), flying_meta_task_(0), flying_sliding_cb_task_(0) {}
virtual ~ObILogAllocator() {}
public:
......@@ -55,6 +56,8 @@ public:
virtual const ObBlockAllocMgr &get_clog_blk_alloc_mgr() const = 0;
virtual palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr) = 0;
virtual palf::LogSlidingCbTask *alloc_log_sliding_cb_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_sliding_cb_task(palf::LogSlidingCbTask *ptr) = 0;
virtual palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
virtual void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr) = 0;
virtual palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch) = 0;
......@@ -74,6 +77,7 @@ public:
protected:
int64_t flying_log_task_;
int64_t flying_meta_task_;
int64_t flying_sliding_cb_task_;
};
// Interface for ReplayEngine module
......@@ -137,6 +141,8 @@ public:
// V4.0
palf::LogIOFlushLogTask *alloc_log_io_flush_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_flush_log_task(palf::LogIOFlushLogTask *ptr);
palf::LogSlidingCbTask *alloc_log_sliding_cb_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_sliding_cb_task(palf::LogSlidingCbTask *ptr);
palf::LogIOTruncateLogTask *alloc_log_io_truncate_log_task(const int64_t palf_id, const int64_t palf_epoch);
void free_log_io_truncate_log_task(palf::LogIOTruncateLogTask *ptr);
palf::LogIOFlushMetaTask *alloc_log_io_flush_meta_task(const int64_t palf_id, const int64_t palf_epoch);
......@@ -157,6 +163,7 @@ private:
int64_t total_limit_;
int64_t pending_replay_mutator_size_;
const int LOG_IO_FLUSH_LOG_TASK_SIZE;
const int LOG_SLIDING_CB_TASK_SIZE;
const int LOG_IO_TRUNCATE_LOG_TASK_SIZE;
const int LOG_IO_FLUSH_META_TASK_SIZE;
const int LOG_IO_TRUNCATE_PREFIX_BLOCKS_TASK_SIZE;
......@@ -171,6 +178,7 @@ private:
ObVSliceAlloc inner_table_replay_task_alloc_;
ObVSliceAlloc user_table_replay_task_alloc_;
ObSliceAlloc log_io_flush_log_task_alloc_;
ObSliceAlloc log_sliding_cb_task_alloc_;
ObSliceAlloc log_io_truncate_log_task_alloc_;
ObSliceAlloc log_io_flush_meta_task_alloc_;
ObSliceAlloc log_io_truncate_prefix_blocks_task_alloc_;
......
......@@ -148,19 +148,19 @@ TEST_F(TestLogSlidingWindow, test_init)
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, NULL,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true));
&mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true));
&mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true, 1));
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
// init twice
EXPECT_EQ(OB_INIT_TWICE, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
}
TEST_F(TestLogSlidingWindow, test_private_func_batch_01)
......@@ -175,7 +175,7 @@ TEST_F(TestLogSlidingWindow, test_private_func_batch_01)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
log_id = 10 + PALF_SLIDING_WINDOW_SIZE;
EXPECT_EQ(false, log_sw_.can_receive_larger_log_(log_id));
EXPECT_EQ(false, log_sw_.leader_can_submit_larger_log_(log_id));
......@@ -194,7 +194,7 @@ TEST_F(TestLogSlidingWindow, test_to_follower_pending)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
char *buf = data_buf_;
int64_t buf_len = 1 * 1024 * 1024;
share::SCN ref_scn;
......@@ -221,7 +221,7 @@ TEST_F(TestLogSlidingWindow, test_fetch_log)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
prev_lsn.val_ = 1;
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log(fetch_log_type, prev_lsn, fetch_start_lsn, fetch_start_log_id));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log_for_reconfirm(dest, fetch_end_lsn, is_fetched));
......@@ -244,7 +244,7 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
EXPECT_EQ(OB_SUCCESS, log_sw_.report_log_task_trace(1));
char *buf = data_buf_;
int64_t buf_len = 2 * 1024 * 1024;
......@@ -265,7 +265,7 @@ TEST_F(TestLogSlidingWindow, test_set_location_cache_cb)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.set_location_cache_cb(NULL));
EXPECT_EQ(OB_SUCCESS, log_sw_.set_location_cache_cb(&cb));
EXPECT_EQ(OB_NOT_SUPPORTED, log_sw_.set_location_cache_cb(&cb));
......@@ -278,7 +278,7 @@ TEST_F(TestLogSlidingWindow, test_reset_location_cache_cb)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
EXPECT_EQ(OB_SUCCESS, log_sw_.reset_location_cache_cb());
}
......@@ -295,7 +295,7 @@ TEST_F(TestLogSlidingWindow, test_submit_log)
share::SCN scn;
EXPECT_EQ(OB_NOT_INIT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn));
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(NULL, buf_len, ref_scn, lsn, scn));
buf_len = 0;
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn));
......@@ -327,7 +327,7 @@ TEST_F(TestLogSlidingWindow, test_submit_group_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
mock_state_mgr_.mock_proposal_id_ = 100;
LSN lsn(10);
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_group_log(lsn, NULL, 1024));
......@@ -386,7 +386,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
char *buf = data_buf_;
int64_t buf_len = 2 * 1024 * 1024;
......@@ -549,7 +549,7 @@ TEST_F(TestLogSlidingWindow, test_after_flush_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
int64_t curr_proposal_id = 10;
// set default config meta
......@@ -612,7 +612,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
......@@ -720,7 +720,7 @@ TEST_F(TestLogSlidingWindow, test_ack_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
log_sw_.self_ = self_;
......@@ -778,7 +778,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_for_rebuild)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
......@@ -889,7 +889,7 @@ TEST_F(TestLogSlidingWindow, test_append_disk_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
// generate new group entry
......@@ -1011,7 +1011,7 @@ TEST_F(TestLogSlidingWindow, test_group_entry_truncate)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true, 1));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
// generate new group entry
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册