提交 5839aec0 编写于 作者: R rl0 提交者: wangzelin.wzl

Patch some commits to open source branch

上级 69fa727d
......@@ -189,9 +189,7 @@ const uint64_t DEFAULT_CLOG_APPEND_TIMEOUT_US = 365ull * 24 * 3600 * 1000 * 1000
const uint64_t DEFAULT_WRITER_MAX_BUFFER_ITEM_CNT = 4 * 1024;
// the buffer size of membership log
const int64_t MS_LOG_BUFFER_SIZE = 2048;
// max wait time for flush log to stable storage
const uint64_t DEFAULT_CLOG_FLUSH_TIMEOUT_US = 15 * 1000 * 1000;
} // namespace clog
} // namespace oceanbase
} // namespace clog
} // namespace oceanbase
#endif // OCEANBASE_CLOG_OB_CLOG_CONFIG_H_
......@@ -150,7 +150,7 @@ int ObCLogWriter::set_is_disk_error()
ret = OB_NOT_INIT;
} else {
ATOMIC_STORE(&is_disk_error_, true);
CLOG_LOG(ERROR, "clog disk is error!!!");
CLOG_LOG(WARN, "clog disk may be hang or something error has happen!");
}
return ret;
}
......@@ -162,7 +162,7 @@ int ObCLogWriter::reset_is_disk_error()
ret = OB_NOT_INIT;
} else {
ATOMIC_STORE(&is_disk_error_, false);
CLOG_LOG(INFO, "reset clog disk status to normal");
CLOG_LOG(TRACE, "reset clog disk status to normal");
}
return ret;
}
......@@ -204,12 +204,12 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64
} else {
const bool is_idempotent = false;
const uint64_t write_len = block_meta_len + item->get_data_len();
const int64_t warning_value = GCONF.data_storage_warning_tolerance_time;
ObCLogDiskErrorCB *cb = NULL;
lib::ObMutexGuard guard(file_mutex_);
BG_NEW_CALLBACK(cb, ObCLogDiskErrorCB, this);
BG_MONITOR_GUARD(DEFAULT_CLOG_FLUSH_TIMEOUT_US, is_idempotent, cb);
BG_MONITOR_GUARD(warning_value, is_idempotent, cb);
// The timestamp value in block header must be generated by the time order, so
// call inner_switch_file first here.
......@@ -378,7 +378,8 @@ ObCLogDiskErrorCB::ObCLogDiskErrorCB(ObCLogWriter* host) : host_(host)
{}
ObCLogDiskErrorCB::~ObCLogDiskErrorCB()
{}
{
}
int ObCLogDiskErrorCB::callback()
{
......@@ -404,8 +405,11 @@ void ObCLogDiskErrorCB::destroy()
}
}
int locate_clog_tail(
const int64_t timeout, ObILogFileStore* file_store, ObLogDirectReader* reader, file_id_t& file_id, offset_t& offset)
int locate_clog_tail(const int64_t timeout,
ObILogFileStore *file_store,
ObLogDirectReader *reader,
file_id_t &file_id,
offset_t &offset)
{
ObLogFileTailLocatorImpl<ObLogEntry, ObIRawLogIterator> impl;
return impl.locate_tail(timeout, file_store, reader, file_id, offset);
......
......@@ -481,9 +481,13 @@ int ObFileIdList::locate(const ObPartitionKey& pkey, const int64_t target_value,
}
// max_log_id,max_log_timestamp,start_offset may be invalid
int ObFileIdList::append(const ObPartitionKey& pkey, const file_id_t file_id, const offset_t start_offset,
const uint64_t min_log_id, const uint64_t max_log_id, const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
int ObFileIdList::append(const ObPartitionKey &pkey,
const file_id_t file_id,
const offset_t start_offset,
const uint64_t min_log_id,
const uint64_t max_log_id,
const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
......@@ -682,7 +686,9 @@ int ObFileIdList::purge(const common::ObPartitionKey& pkey, ObIFileIdCachePurgeS
}
// The caller guarantees that the function will not be executed concurrently
int ObFileIdList::purge_(const bool is_front_end, IPurgeChecker& checker, bool& empty)
int ObFileIdList::purge_(const bool is_front_end,
IPurgeChecker &checker,
bool &empty)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
......@@ -1070,8 +1076,11 @@ void ObFileIdCache::destroy()
// 4. OB_PARTITION_NOT_EXIST partiiton not exist, prev_item and next_item are both invalid
// 5. OB_NEED_RETRY need retrym prev_item and next_item are both invalid
// 6. Others
int ObFileIdCache::locate(const ObPartitionKey& pkey, const int64_t target_value, const bool locate_by_log_id,
Log2File& prev_item, Log2File& next_item)
int ObFileIdCache::locate(const ObPartitionKey &pkey,
const int64_t target_value,
const bool locate_by_log_id,
Log2File &prev_item,
Log2File &next_item)
{
int ret = OB_SUCCESS;
ObFileIdList* list = NULL;
......@@ -1288,7 +1297,8 @@ int ObFileIdCache::AppendInfoFunctor::init(const file_id_t file_id, ObFileIdCach
}
// For compatibility, allow max_log_id and max_log_timestamp to be invalid
bool ObFileIdCache::AppendInfoFunctor::operator()(const ObPartitionKey& pkey, const IndexInfoBlockEntry& entry)
bool ObFileIdCache::AppendInfoFunctor::operator()(const ObPartitionKey &pkey,
const IndexInfoBlockEntry &entry)
{
if (OB_UNLIKELY(!pkey.is_valid()) || OB_UNLIKELY(!entry.is_valid())) {
err_ = OB_ERR_UNEXPECTED;
......@@ -1456,9 +1466,13 @@ int ObFileIdCache::append_new_list_(const ObPartitionKey& pkey, const file_id_t
}
// For compatibility, allow max_log_id and max_log_timestamp to be invalid
int ObFileIdCache::do_append_(const ObPartitionKey& pkey, const file_id_t file_id, const offset_t start_offset,
const uint64_t min_log_id, const uint64_t max_log_id, const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
int ObFileIdCache::do_append_(const ObPartitionKey &pkey,
const file_id_t file_id,
const offset_t start_offset,
const uint64_t min_log_id,
const uint64_t max_log_id,
const int64_t min_log_timestamp,
const int64_t max_log_timestamp)
{
int ret = OB_SUCCESS;
ObFileIdList* list = NULL;
......
......@@ -52,26 +52,21 @@ class ObIlogAccessor;
// p3 -> [(50,f1), (5000,f3)]
//
// When querying (p2, 170), first find the ordered list corresponding to p2, and then, use binary search.
class Log2File {
class Log2File
{
public:
Log2File()
: file_id_(common::OB_INVALID_FILE_ID),
// For version after 2.1(include 2.1), start_offset correspond to the offset
// of the first ilog of this partition in ilog file
// For version before 2.1, the start_offset correspond to the start_offset_index
// of the first ilog of this partition in ilog cache
start_offset_(OB_INVALID_OFFSET),
min_log_id_(common::OB_INVALID_ID),
max_log_id_(common::OB_INVALID_ID),
min_log_timestamp_(common::OB_INVALID_TIMESTAMP),
max_log_timestamp_(common::OB_INVALID_TIMESTAMP)
{}
~Log2File()
{
reset();
}
Log2File() : file_id_(common::OB_INVALID_FILE_ID),
// For version after 2.1(include 2.1), start_offset correspond to the offset
// of the first ilog of this partition in ilog file
// For version before 2.1, the start_offset correspond to the start_offset_index
// of the first ilog of this partition in ilog cache
start_offset_(OB_INVALID_OFFSET),
min_log_id_(common::OB_INVALID_ID),
max_log_id_(common::OB_INVALID_ID),
min_log_timestamp_(common::OB_INVALID_TIMESTAMP),
max_log_timestamp_(common::OB_INVALID_TIMESTAMP) {}
~Log2File() { reset(); }
public:
file_id_t get_file_id() const
{
......@@ -228,7 +223,7 @@ public:
}
// Determine whether target_item is the next consecutive item
bool is_preceding_to(const Log2File& target_item) const
bool is_preceding_to(const Log2File &target_item) const
{
bool bool_ret = false;
if (common::OB_INVALID_ID != get_max_log_id() && common::OB_INVALID_ID != target_item.min_log_id_) {
......@@ -316,7 +311,8 @@ private:
ObFileIdCache& file_id_cache_;
};
class ObFileIdList {
class ObFileIdList
{
public:
class BackFillFunctor {
public:
......@@ -336,7 +332,8 @@ public:
offset_t start_offset_;
};
class IPurgeChecker {
class IPurgeChecker
{
public:
virtual bool should_purge(const Log2File& log_2_file) const = 0;
virtual bool is_valid() const = 0;
......@@ -344,7 +341,8 @@ public:
};
// purge min
// should_purge return true if min_log_id > top_item.file_id_
class PurgeChecker : public IPurgeChecker {
class PurgeChecker : public IPurgeChecker
{
public:
explicit PurgeChecker(const common::ObPartitionKey& pkey, ObIFileIdCachePurgeStrategy& purge_strategy)
: partition_key_(pkey), purge_strategy_(purge_strategy)
......@@ -364,7 +362,8 @@ public:
// should_purge return true if top_item.file_id_ == broken_file_id_
// Because loading an InfoBlock involves multiple partitions, if only load a part of them,
// then all of this load must be cleaned up
class ClearBrokenFunctor : public IPurgeChecker {
class ClearBrokenFunctor : public IPurgeChecker
{
public:
explicit ClearBrokenFunctor(const file_id_t file_id) : broken_file_id_(file_id)
{}
......@@ -414,8 +413,11 @@ public:
static const int64_t NEED_USE_SEG_ARRAY_THRESHOLD = 50;
private:
int purge_(const bool is_front_end, IPurgeChecker& checker, bool& empty);
int purge_preceding_items_(const common::ObPartitionKey& pkey, const Log2File& last_item);
int purge_(const bool is_front_end,
IPurgeChecker &checker,
bool &empty);
int purge_preceding_items_(const ObPartitionKey &pkey,
const Log2File &last_item);
// The caller guarantees that the function will not be executed concurrently
int prepare_container_();
int move_item_to_seg_array_(common::ObISegArray<Log2File>* tmp_container_ptr) const;
......@@ -424,7 +426,7 @@ private:
bool is_inited_;
bool use_seg_array_;
uint64_t min_continuous_log_id_;
ObFileIdCache* file_id_cache_;
ObFileIdCache *file_id_cache_;
ObLogBasePos base_pos_;
common::ObISegArray<Log2File>* container_ptr_;
......@@ -446,27 +448,29 @@ public:
int init(const int64_t server_seq, const common::ObAddr& addr, ObIlogAccessor* ilog_accessor);
void destroy();
int locate(const common::ObPartitionKey& pkey, const int64_t target_value, const bool locate_by_log_id,
Log2File& prev_item, Log2File& next_item);
int append(const file_id_t file_id, IndexInfoBlockMap& index_info_block_map);
int backfill(const common::ObPartitionKey& pkey, const uint64_t min_log_id, const file_id_t file_id,
const offset_t start_offset);
int purge(ObIFileIdCachePurgeStrategy& purge_strategy);
int ensure_log_continuous(const common::ObPartitionKey& pkey, const uint64_t log_id);
int add_partition_needed(const common::ObPartitionKey& pkey, const uint64_t last_replay_log_id);
file_id_t get_curr_max_file_id() const
{
return ATOMIC_LOAD(&curr_max_file_id_);
}
int64_t get_next_can_purge_log2file_timestamp() const
{
return ATOMIC_LOAD(&next_can_purge_log2file_timestamp_);
}
int get_clog_base_pos(const common::ObPartitionKey& pkey, file_id_t& file_id, offset_t& offset) const;
// Attention: this interface doesn't consider the format of version which before 2.1
int get_cursor_from_file(
const common::ObPartitionKey& pkey, const uint64_t log_id, const Log2File& item, ObLogCursorExt& log_cursor);
int locate(const common::ObPartitionKey &pkey,
const int64_t target_value,
const bool locate_by_log_id,
Log2File &prev_item,
Log2File &next_item);
int append(const file_id_t file_id,
IndexInfoBlockMap &index_info_block_map);
int backfill(const common::ObPartitionKey &pkey,
const uint64_t min_log_id,
const file_id_t file_id,
const offset_t start_offset);
int purge(ObIFileIdCachePurgeStrategy &purge_strategy);
int ensure_log_continuous(const common::ObPartitionKey &pkey,
const uint64_t log_id);
int add_partition_needed(const common::ObPartitionKey &pkey,
const uint64_t last_replay_log_id);
file_id_t get_curr_max_file_id() const {return ATOMIC_LOAD(&curr_max_file_id_);}
int64_t get_next_can_purge_log2file_timestamp() const {return ATOMIC_LOAD(&next_can_purge_log2file_timestamp_);}
int get_clog_base_pos(const ObPartitionKey &pkey, file_id_t &file_id,
offset_t &offset) const;
//Attention: this interface doesn't consider the format of version which before 2.1
int get_cursor_from_file(const ObPartitionKey &pkey, const uint64_t log_id,
const Log2File &item, ObLogCursorExt &log_cursor);
private:
class AppendInfoFunctor {
public:
......@@ -484,7 +488,8 @@ private:
ObFileIdCache* cache_;
};
// Ensure that the loading process is atomic
class ObUndoAppendFunctor {
class ObUndoAppendFunctor
{
public:
explicit ObUndoAppendFunctor(const file_id_t broken_file_id) : broken_file_id_(broken_file_id)
{}
......@@ -499,7 +504,8 @@ private:
file_id_t broken_file_id_;
common::ObPartitionArray dead_pkeys_;
};
class ObPurgeFunctor {
class ObPurgeFunctor
{
public:
explicit ObPurgeFunctor(ObIFileIdCachePurgeStrategy& purge_strategy)
: purge_strategy_(purge_strategy), next_can_purge_log2file_timestamp_(common::OB_INVALID_TIMESTAMP)
......@@ -595,7 +601,7 @@ private:
common::ObSmallAllocator seg_item_allocator_; // allocator for Log2File items(seg)
common::ObSmallAllocator log2file_list_allocator_; // allocator for Log2FileList
common::ObSmallAllocator list_item_allocator_; // allocator for Log2File items(list)
common::ObLinearHashMap<common::ObPartitionKey, ObFileIdList*> map_;
common::ObLinearHashMap<common::ObPartitionKey, ObFileIdList *> map_;
common::ObLinearHashMap<common::ObPartitionKey, uint64_t> filter_map_;
private:
......
......@@ -84,7 +84,6 @@ int ObIlogMemstore::check_need_freeze(
int64_t clog_size_trigger_cfg = CLOG_SIZE_TRIGGER;
#ifdef ERRSIM
freeze_trigger_us = ObServerConfig::get_instance().ilog_flush_trigger_time;
cursor_size_trigger_cfg = 128;
#endif
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
......@@ -172,31 +171,26 @@ int ObIlogMemstore::timer_check_need_freeze(bool& need_freeze) const
return ret;
}
int ObIlogMemstore::check_need_switch_file(bool& need_switch_file) const
int ObIlogMemstore::get_cursor_size(int64_t &cursor_size) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CSR_LOG(ERROR, "ObIlogMemstore is not inited", K(ret));
CLOG_LOG(ERROR, "get_cursor_size failed", K(ret));
} else {
int64_t cursor_size_trigger_cfg = CURSOR_SIZE_TRIGGER;
#ifdef ERRSIM
cursor_size_trigger_cfg = 128;
#endif
need_switch_file =
(ATOMIC_LOAD(&cursor_size_) >= cursor_size_trigger_cfg || ATOMIC_LOAD(&clog_size_) >= CLOG_SIZE_TRIGGER);
cursor_size = ATOMIC_LOAD(&cursor_size_);
}
return ret;
}
int ObIlogMemstore::get_cursor_size(int64_t& cursor_size) const
int ObIlogMemstore::get_clog_size(int64_t &clog_size) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "get_cursor_size failed", K(ret));
CLOG_LOG(ERROR, "get_clog_size failed", K(ret));
} else {
cursor_size = cursor_size_;
clog_size = ATOMIC_LOAD(&clog_size_);
}
return ret;
}
......
......@@ -32,7 +32,8 @@ enum ObIlogFreezeTriggerType {
OB_INVALID_TRIGGER_TYPE = 6,
};
class ObIlogMemstore {
class ObIlogMemstore
{
friend ObIlogFileBuilder;
public:
......@@ -77,23 +78,26 @@ public:
// Return value:
// 1) OB_SUCCESS, query success
// 2) OB_PARTITION_NOT_EXIST, partition not exist
int get_min_log_id_and_ts(
const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, int64_t& ret_min_log_ts) const;
int get_min_log_id_and_ts(const common::ObPartitionKey &partition_key,
uint64_t &ret_min_log_id,
int64_t &ret_min_log_ts) const;
// Return value:
// 1) OB_SUCCESS, query success
// 2) OB_PARTITION_NOT_EXIST, partition not exist
int get_log_id_range(
const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, uint64_t& ret_max_log_id) const;
// The ilog_memstore that calls following function must be frozen
int check_need_switch_file(bool& need_switch_file) const;
int get_cursor_size(int64_t& cursor_size) const;
int insert_partition_meta_info(const common::ObPartitionKey& pkey, const IndexInfoBlockEntry& entry);
int insert_partition_memberlist_info(const common::ObPartitionKey& pkey, const MemberListInfo& member_list);
int insert_partition_log_cursor_ext_info(const ObPartitionLogInfo& log_info, const ObLogCursorExt& log_cursor);
template <class Function>
int operate_partition_meta_info(Function& fn)
int get_log_id_range(const common::ObPartitionKey &partition_key,
uint64_t &ret_min_log_id,
uint64_t &ret_max_log_id) const;
int get_cursor_size(int64_t &cursor_size) const;
int get_clog_size(int64_t &clog_size) const;
int insert_partition_meta_info(const common::ObPartitionKey &pkey,
const IndexInfoBlockEntry &entry);
int insert_partition_memberlist_info(const common::ObPartitionKey &pkey,
const MemberListInfo &member_list);
int insert_partition_log_cursor_ext_info(const ObPartitionLogInfo &log_info,
const ObLogCursorExt &log_cursor);
template<class Function>
int operate_partition_meta_info(Function &fn)
{
int ret = common::OB_SUCCESS;
if (OB_FAIL(partition_meta_info_.for_each(fn))) {
......
......@@ -567,7 +567,6 @@ int ObIlogAccessor::check_partition_ilog_can_be_purged(const common::ObPartition
can_purge = false;
uint64_t last_replay_log_id = OB_INVALID_ID;
int64_t unused_ts = OB_INVALID_TIMESTAMP;
storage::ObIPartitionGroupGuard guard;
if (false == inited_) {
ret = OB_NOT_INIT;
CSR_LOG(ERROR, "ObIlogAccessor is not init", K(ret));
......
......@@ -151,12 +151,16 @@ public:
// Return value
// 1) OB_SUCCESS
// 2) OB_ENTRY_NOT_EXIST
int get_file_id_range(file_id_t& min_file_id, file_id_t& max_file_id) const;
int locate_by_timestamp(const common::ObPartitionKey& partition_key, const int64_t start_ts, uint64_t& target_log_id,
int64_t& target_log_timestamp);
int locate_ilog_file_by_log_id(
const common::ObPartitionKey& pkey, const uint64_t start_log_id, uint64_t& end_log_id, file_id_t& ilog_id);
int get_file_id_range(file_id_t &min_file_id, file_id_t &max_file_id) const;
int locate_by_timestamp(const common::ObPartitionKey &partition_key,
const int64_t start_ts,
uint64_t &target_log_id,
int64_t &target_log_timestamp);
int locate_ilog_file_by_log_id(const common::ObPartitionKey &pkey,
const uint64_t start_log_id,
uint64_t &end_log_id,
file_id_t &ilog_id);
int wash_ilog_cache();
int purge_stale_file();
int purge_stale_ilog_index();
......
......@@ -1175,37 +1175,52 @@ void ObIlogStore::timer_check_need_freeze_()
}
}
int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& is_ilog_not_continous_trigger)
int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& need_switch_file)
{
int ret = OB_SUCCESS;
is_ilog_not_continous_trigger = false;
need_switch_file = false;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "ilog_store not inited", K(ret));
} else {
RLockGuard guard(lock_);
int64_t size = frozen_memstore_array_.count();
int64_t cursor_size = 0;
int64_t total_cursor_size = 0;
int64_t total_clog_size = 0;
int64_t tmp_cursor_size = 0;
int64_t tmp_clog_size = 0;
end_idx = 0;
// limit each ilog file should less than ObIlogMemstore::CURSOR_SIZE_TRIGGER
while (end_idx < size && cursor_size < ObIlogMemstore::CURSOR_SIZE_TRIGGER) {
int64_t tmp_cursor_size = 0;
ObIlogMemstore* memstore = frozen_memstore_array_[end_idx].memstore_;
while (end_idx < size && false == need_switch_file && OB_SUCC(ret)) {
ObIlogMemstore *memstore = frozen_memstore_array_[end_idx].memstore_;
ObIlogFreezeTriggerType trigger_type = frozen_memstore_array_[end_idx].trigger_type_;
// In normal case, when end_idx is 0, trigger_type mustn't be OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE
if (frozen_memstore_array_[end_idx].trigger_type_ == OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE) {
is_ilog_not_continous_trigger = true;
break;
need_switch_file = true;
} else if (OB_UNLIKELY(NULL == memstore)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpect error becauese memstore is nullptr", K(ret));
} else if (OB_FAIL(memstore->get_cursor_size(tmp_cursor_size))) {
CLOG_LOG(ERROR, "get_cusrsor_size failed", K(ret));
} else if (OB_FAIL(memstore->get_clog_size(tmp_clog_size))) {
CLOG_LOG(ERROR, "get_clog_size failed", K(ret));
// Try to ensure the total size of each file does not exceed 32MB,
// because of the ilog memstore may exceed 32MB in concurrent case.
} else if (true == (total_clog_size >= ObIlogMemstore::CLOG_SIZE_TRIGGER
|| total_cursor_size >= ObIlogMemstore::CURSOR_SIZE_TRIGGER)) {
need_switch_file = true;
break;
} else {
total_cursor_size += tmp_cursor_size;
total_clog_size += tmp_clog_size;
}
cursor_size += tmp_cursor_size;
end_idx++;
}
end_idx -= 1;
if (end_idx < 0) {
ret = OB_ERR_UNEXPECTED;
CSR_LOG(ERROR, "unexpect error whan caculate end_idx", K(ret), K(end_idx));
}
}
return ret;
}
......@@ -1213,14 +1228,14 @@ int ObIlogStore::get_merge_range_(int64_t& end_idx, bool& is_ilog_not_continous_
int ObIlogStore::merge_frozen_memstore_(int64_t& end_idx, FrozenMemstore& memstore_after_merge)
{
int ret = OB_SUCCESS;
bool is_ilog_not_continous_trigger = false;
bool need_switch_file = false;
end_idx = 0;
int64_t start_ts = ObTimeUtility::current_time();
if (memstore_after_merge.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(ERROR, "invalid argument", K(memstore_after_merge), K(end_idx));
} else {
if (OB_FAIL(get_merge_range_(end_idx, is_ilog_not_continous_trigger))) {
if (OB_FAIL(get_merge_range_(end_idx, need_switch_file))) {
CLOG_LOG(ERROR, "get_merge_range_ failed", K(ret));
} else {
// used to reduce the critical section size
......@@ -1238,8 +1253,9 @@ int ObIlogStore::merge_frozen_memstore_(int64_t& end_idx, FrozenMemstore& memsto
}
} while (0);
if (OB_SUCC(ret) && OB_FAIL(do_merge_frozen_memstore_(
tmp_frozen_memstore_array, is_ilog_not_continous_trigger, memstore_after_merge))) {
if (OB_SUCC(ret) && OB_FAIL(do_merge_frozen_memstore_(tmp_frozen_memstore_array,
need_switch_file,
memstore_after_merge))) {
if (ret == OB_EAGAIN) {
CLOG_LOG(WARN, "log not continous in do_merge_frozen_memstore_", K(tmp_frozen_memstore_array));
} else {
......@@ -1369,7 +1385,8 @@ bool ObIlogStore::need_merge_frozen_memstore_array_by_trigger_type_(const ObIlog
}
int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen_memstore_array,
bool is_ilog_not_continous_trigger, FrozenMemstore& memstore_after_merge)
bool need_switch_file,
FrozenMemstore& memstore_after_merge)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
......@@ -1385,24 +1402,29 @@ int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen
if (OB_FAIL(merge_container.init())) {
CLOG_LOG(ERROR, "init merge_container failed", K(ret));
}
for (int64_t i = 0; i < tmp_size && OB_SUCC(ret); i++) {
FrozenMemstore frozen_memstore = tmp_frozen_memstore_array[i];
if (!frozen_memstore.is_valid()) {
if (false == frozen_memstore.is_valid()) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpect error, invalid frozen_memstore", K(ret), K(frozen_memstore));
} else if (OB_FAIL(merge_container.merge_ilog_memstore_to_container(frozen_memstore.memstore_))) {
if (ret == OB_EAGAIN) {
} else if (OB_FAIL(merge_container.merge_ilog_memstore_to_container(frozen_memstore.memstore_))
&& OB_EAGAIN != ret) {
CLOG_LOG(ERROR, "merge ilog memstore failed", K(ret), K(frozen_memstore), K(tmp_frozen_memstore_array));
} else if (OB_EAGAIN == ret) {
if (i == 0) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpected error, there is no possiblity for merging frozen memstore\
failed because ilog not continous when i is 0", K(ret));
} else if (ret == OB_EAGAIN) {
WLockGuard guard(lock_);
frozen_memstore_array_[i].trigger_type_ = OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE;
CLOG_LOG(WARN,
"log not continous in merge_frozen_memstore, need modify its trigger_type",
K(ret),
K(frozen_memstore),
K(tmp_frozen_memstore_array));
frozen_memstore_array_[i-1].trigger_type_ = OB_ILOG_NOT_CONTINOUS_TRIGGER_TYPE;
CLOG_LOG(WARN, "log not continous in merge_frozen_memstore, need modify its trigger_type", K(ret),
K(frozen_memstore), K(tmp_frozen_memstore_array));
} else {
CLOG_LOG(ERROR, "ilog_memstore_merge failed", K(ret), K(frozen_memstore), K(tmp_frozen_memstore_array));
}
}
} else {}
}
if (OB_SUCC(ret) && OB_FAIL(merge_container.transfer_to_ilog_memstore(memstore))) {
......@@ -1414,22 +1436,13 @@ int ObIlogStore::do_merge_frozen_memstore_(const FrozenMemstoreArray& tmp_frozen
}
if (OB_SUCC(ret)) {
bool need_switch_file_by_not_continous = is_ilog_not_continous_trigger;
bool need_switch_file_by_ilog_or_clog = false;
if (OB_FAIL(memstore->check_need_switch_file(need_switch_file_by_ilog_or_clog))) {
CLOG_LOG(ERROR, "check_need_switch_file failed", K(ret));
} else {
int64_t seq = tmp_frozen_memstore_array[tmp_size - 1].seq_;
ObIlogFreezeTriggerType trigger_type = OB_INVALID_TRIGGER_TYPE;
if (need_switch_file_by_not_continous || need_switch_file_by_ilog_or_clog) {
trigger_type = OB_MERGE_NEED_SWITCH_FILE_TRIGGER_TYPE;
} else {
trigger_type = OB_TIMER_TRIGGER_TYPE;
}
int64_t seq = tmp_frozen_memstore_array[tmp_size-1].seq_;
ObIlogFreezeTriggerType trigger_type = (true == need_switch_file ? OB_MERGE_NEED_SWITCH_FILE_TRIGGER_TYPE :
OB_TIMER_TRIGGER_TYPE);
if (OB_FAIL(memstore_after_merge.set_frozen_memstore(trigger_type, memstore, seq))) {
CLOG_LOG(ERROR, "set_frozen_memstore failed", K(memstore_after_merge), K(trigger_type), K(memstore), K(seq));
}
if(OB_FAIL(memstore_after_merge.set_frozen_memstore(trigger_type, memstore, seq))) {
CLOG_LOG(ERROR, "set_frozen_memstore failed", K(memstore_after_merge), K(trigger_type),
K(memstore), K(seq));
}
}
......
......@@ -99,6 +99,7 @@ public:
// return value
// 1) OB_SUCCESS, query success
// 2) OB_PARTITION_NOT_EXIST, partition not exist
//
// query the min_log_id and min_log_ts in the partition
int get_memstore_min_log_id_and_ts(
const common::ObPartitionKey& partition_key, uint64_t& ret_min_log_id, int64_t& ret_min_log_ts) const;
......@@ -112,7 +113,7 @@ public:
private:
const static int64_t DEFAULT_MEMSTORE_COUNT = 16;
const static int64_t TIMER_TASK_INTERVAL = 2 * 1000 * 1000;
const static int64_t TIMER_TASK_INTERVAL = 1000;
// PINNED_MEMORY_SIZE should set as ObIlogMemstore::CURSOR_SIZE_TRIGGER(32MB)
// however, in concurrent secenarios, it will causes that the size of
// ObIlogMemstore will exceed than ObIlogMemstore::CURSOR_SIZE_TRIGGER.
......@@ -144,9 +145,8 @@ private:
// this function return the range of ObIlogMemstore which can be merged
// end_idx is the last index of ObIlogMemstore which can be merged
// is_ilog_not_continous_trigger means that the next ObIlogMemstore after
// end_idx is whether trigger by OB_ILOG_NOT_CONTINOUS.
int get_merge_range_(int64_t& end_idx, bool& is_ilog_not_continous_trigger);
// need_switch_file means that whether we need switch ilog file
int get_merge_range_(int64_t& end_idx, bool& need_switch_file);
// merge all ObIlogMemstore in [0, end_idx] to merge_after_memstore
// end_idx is the last index of frozen_memstore_array_
......@@ -175,8 +175,9 @@ private:
// after doing merge
bool need_merge_frozen_memstore_array_by_trigger_type_(const ObIlogFreezeTriggerType& trigger_type) const;
int do_merge_frozen_memstore_(const FrozenMemstoreArray& frozen_memstore_array, bool is_ilog_not_continous_trigger,
FrozenMemstore& memstore_after_merge);
int do_merge_frozen_memstore_(const FrozenMemstoreArray& frozen_memstore_array,
bool need_switch_file,
FrozenMemstore& memstore_after_merge);
void alloc_memstore_(ObIlogMemstore*& memstore);
......
......@@ -1237,10 +1237,11 @@ int ObIInfoBlockHandler::CheckPartitionNeedFreezeFunctor::do_check_full_partitio
// 2. INVALID, means that archive may be had started or stopped, cann't reclaime the log file, need
// wati next round
// 3. STOPING and STOPED, means that archive has stopped
} else if (ObLogArchiveStatus::STATUS::BEGINNING == info.status_.status_ ||
ObLogArchiveStatus::STATUS::DOING == info.status_.status_) {
if (OB_FAIL(
pls->get_last_archived_log_id(info.status_.incarnation_, info.status_.round_, last_archived_log_id))) {
} else if (ObLogArchiveStatus::STATUS::BEGINNING == info.status_.status_
|| ObLogArchiveStatus::STATUS::DOING == info.status_.status_) {
if (OB_FAIL(pls->get_last_archived_log_id(info.status_.incarnation_,
info.status_.round_,
last_archived_log_id))) {
CLOG_LOG(WARN, "failed to get_log_archive_backup_info", K(partition_key), K(info), KR(ret));
} else if (OB_INVALID_ID == last_archived_log_id || last_archived_log_id < max_log_id) {
can_skip_ = false;
......
......@@ -155,10 +155,7 @@ public:
}
return log_ts;
}
void set_submit_timestamp(const int64_t ts)
{
submit_timestamp_ = ts;
}
void set_submit_timestamp(const int64_t ts) { submit_timestamp_ = ts; }
bool is_batch_committed() const
{
bool bool_ret = false;
......@@ -185,7 +182,7 @@ public:
}
// Serialize submit_timestamp at specified offset
int serialize_submit_timestamp(char* buf, const int64_t buf_len, int64_t& pos);
int serialize_submit_timestamp(char *buf, const int64_t buf_len, int64_t &pos);
static bool check_magic_number(const int16_t magic_number)
{
......
......@@ -589,8 +589,14 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const
uint64_t cur_renew_log_id = cur_renew_ms_task_.get_log_id();
ObProposalID cur_ms_proposal_id = mm_->get_ms_proposal_id();
if (log_id != cur_renew_log_id || ms_proposal_id != cur_ms_proposal_id ||
ms_proposal_id != log_task.get_proposal_id()) {
int64_t confirmed_info_data_checksum = confirmed_info.get_data_checksum();
int64_t confirmed_info_accum_checksum = confirmed_info.get_accum_checksum();
int64_t confirmed_info_epoch_id = confirmed_info.get_epoch_id();
int64_t confirmed_info_submit_timestamp = confirmed_info.get_submit_timestamp();
if (log_id != cur_renew_log_id
|| ms_proposal_id != cur_ms_proposal_id
|| ms_proposal_id != log_task.get_proposal_id()) {
ret = OB_STATE_NOT_MATCH;
CLOG_LOG(WARN,
"log_id or ms_proposal_id not match with cur_renew_ms_task",
......@@ -602,7 +608,7 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const
K(log_task));
} else {
if (log_task.is_on_success_cb_called()) {
if (!log_task.is_checksum_verified(confirmed_info.get_data_checksum())) {
if (!log_task.is_checksum_verified(confirmed_info_data_checksum)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR,
"is_checksum_verified failed",
......@@ -619,14 +625,12 @@ int ObLogMembershipTaskMgr::submit_confirmed_info_(const uint64_t log_id, const
if (log_task.is_confirmed_info_exist()) {
} else {
if (log_task.is_submit_log_exist()) {
if ((log_task.get_data_checksum() != confirmed_info.get_data_checksum()) ||
(log_task.get_epoch_id() != confirmed_info.get_epoch_id())) {
CLOG_LOG(INFO,
"log_task and confirmed_info not match, reset",
K_(partition_key),
K(log_id),
K(log_task),
K(confirmed_info));
if ((log_task.get_data_checksum() != confirmed_info_data_checksum)
|| (log_task.get_epoch_id() != confirmed_info_epoch_id)
|| (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp
&& log_task.get_submit_timestamp() != confirmed_info_submit_timestamp)) {
CLOG_LOG(INFO, "log_task and confirmed_info not match, reset", K_(partition_key),
K(log_id), K(log_task), K(confirmed_info));
log_task.reset_log();
log_task.reset_state(false);
log_task.reset_log_cursor();
......@@ -954,8 +958,9 @@ int ObLogMembershipTaskMgr::standby_leader_submit_confirmed_info_(
log_task.lock();
const int64_t data_checksum = log_task.get_data_checksum();
const int64_t epoch_id = log_task.get_epoch_id();
const int64_t submit_timestamp = log_task.get_submit_timestamp();
log_task.unlock();
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum))) {
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum, submit_timestamp))) {
CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(ret));
} else if (OB_FAIL(submit_confirmed_info_(log_id, ms_proposal_id, confirmed_info, true))) {
CLOG_LOG(WARN, "submit_confirmed_info_ failed", K_(partition_key), K(ret), K(log_id), K(confirmed_info));
......
......@@ -16,11 +16,14 @@
#include "ob_log_entry.h"
#include "ob_log_file_pool.h"
namespace oceanbase {
namespace clog {
namespace oceanbase
{
namespace clog
{
// Used to set the parameters of reading files, the purpose is to add
// parameters int the future without changing the interface
struct ObReadParam {
struct ObReadParam
{
file_id_t file_id_;
offset_t offset_;
common::ObPartitionKey partition_key_;
......
......@@ -798,8 +798,8 @@ int ObLogReconfirm::confirm_log_()
if (OB_FAIL(try_update_nop_or_truncate_timestamp(*header))) {
CLOG_LOG(WARN, "try_update_nop_or_truncate_timestamp fail", K(ret), K_(partition_key));
} else if (OB_FAIL(sw_->submit_log(log_ptr->get_header(), log_ptr->get_buf(), NULL))) {
CLOG_LOG(
ERROR, "submit log failed", K_(partition_key), K(ret), K_(next_id), K_(start_id), K_(max_flushed_id));
CLOG_LOG(WARN, "submit log failed", K_(partition_key), K(ret), K_(next_id),
K_(start_id), K_(max_flushed_id));
break;
} else {
CLOG_LOG(TRACE, "submit log success", K_(partition_key), K_(next_id), K_(start_id), K_(max_flushed_id));
......@@ -814,15 +814,37 @@ int ObLogReconfirm::confirm_log_()
next_id_++;
}
}
} // end while
if (OB_SUCC(ret) && next_id_ <= max_flushed_id_ && next_id_ >= log_info_array_.get_end_id()) {
} // end while
// In case of rebuild in leader reconfirm:
// 1. when majority has already recycled specified log, the follower
// will trigger rebuild for leader, leader will advance the base
// storage info and truncate the sliding window, therefore, the log
// of next_id will not be majority, however, the start id of sliding
// window has greater than next_id.
// 2. when majority has specified log, but others has already recycled
// specified log, so these followers will trigger rebuild for leader,
// leader will advance the next_id until submit_log return
// OB_ERROR_OUT_OF_RANGE
// We need to ensure that next_id can be advanced correctly in above
// two scenarios.
if (OB_SUCC(ret) || OB_ERROR_OUT_OF_RANGE == ret) {
const uint64_t new_start_id = sw_->get_start_id();
if (new_start_id > next_id_) {
next_id_ = new_start_id;
CLOG_LOG(INFO, "there may execute a rebuild operation in\
leader reconfirm", K(ret), K(new_start_id), K(next_id_));
}
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)
&& next_id_ <= max_flushed_id_
&& next_id_ >= log_info_array_.get_end_id()) {
// process next log_range
if (OB_EAGAIN == (ret = init_log_info_range_(next_id_))) {
// ret is EAGAIN when some log has slide out, need update next_id_ and retry
const uint64_t new_start_id = sw_->get_start_id();
if (new_start_id > next_id_) {
next_id_ = new_start_id;
}
// ret is EAGAIN when some log has slide out, need update next_id_
// and retry
} else if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "init_log_info_range_ failed", K_(partition_key), K(ret));
} else {
......
......@@ -398,8 +398,9 @@ int ObLogSlidingWindow::send_confirmed_info_to_standby_children_(const uint64_t
const int64_t data_checksum = log_task->get_data_checksum();
const int64_t accum_checksum = log_task->get_accum_checksum();
const int64_t epoch_id = log_task->get_epoch_id();
const int64_t submit_timestamp = log_task->get_submit_timestamp();
const bool batch_committed = log_task->is_batch_committed();
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum))) {
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum, submit_timestamp))) {
CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(ret));
} else if (OB_FAIL(submit_confirmed_info_to_standby_children_(log_id, confirmed_info, batch_committed))) {
CLOG_LOG(WARN,
......@@ -1246,8 +1247,10 @@ int ObLogSlidingWindow::need_update_log_task_(
} else if (OB_ISNULL(state_mgr_) || OB_ISNULL(buff)) {
ret = OB_INVALID_ARGUMENT;
} else if (task.is_log_confirmed()) {
if (is_confirm_match_(
log_id, header.get_data_checksum(), header.get_epoch_id(), task.get_data_checksum(), task.get_epoch_id())) {
if (is_confirm_match_(log_id,
header.get_data_checksum(), header.get_epoch_id(),
header.get_submit_timestamp(), task.get_data_checksum(),
task.get_epoch_id(), task.get_submit_timestamp())) {
CLOG_LOG(DEBUG, "receive submit log after confirm log, match", K(header), K_(partition_key), K(task));
} else {
ret = OB_INVALID_LOG;
......@@ -1803,16 +1806,11 @@ int ObLogSlidingWindow::submit_confirmed_info_(
*/
if (log_task->is_submit_log_exist()) {
if (!is_confirm_match_(log_id,
log_task->get_data_checksum(),
log_task->get_epoch_id(),
confirmed_info.get_data_checksum(),
confirmed_info.get_epoch_id())) {
CLOG_LOG(INFO,
"log_task and confirmed_info not match, reset",
K_(partition_key),
K(log_id),
K(*log_task),
K(confirmed_info));
log_task->get_data_checksum(), log_task->get_epoch_id(),
log_task->get_submit_timestamp(), confirmed_info.get_data_checksum(),
confirmed_info.get_epoch_id(), confirmed_info.get_submit_timestamp())) {
CLOG_LOG(INFO, "log_task and confirmed_info not match, reset", K_(partition_key),
K(log_id), K(*log_task), K(confirmed_info));
log_task->reset_log();
log_task->reset_log_cursor();
}
......@@ -2739,19 +2737,22 @@ int ObLogSlidingWindow::get_log(const uint64_t log_id, const uint32_t log_attr,
return ret;
}
bool ObLogSlidingWindow::is_confirm_match_(const uint64_t log_id, const int64_t log_data_checksum,
const int64_t log_epoch_id, const int64_t confirmed_info_data_checksum, const int64_t confirmed_info_epoch_id)
bool ObLogSlidingWindow::is_confirm_match_(const uint64_t log_id,
const int64_t log_data_checksum,
const int64_t log_epoch_id,
const int64_t log_submit_timestamp,
const int64_t confirmed_info_data_checksum,
const int64_t confirmed_info_epoch_id,
const int64_t confirmed_info_submit_timestamp)
{
bool bret = false;
if ((log_data_checksum != confirmed_info_data_checksum) || (log_epoch_id != confirmed_info_epoch_id)) {
CLOG_LOG(WARN,
"confirm log not match",
K_(partition_key),
K(log_id),
K(log_data_checksum),
K(log_epoch_id),
K(confirmed_info_data_checksum),
K(confirmed_info_epoch_id));
if (log_data_checksum != confirmed_info_data_checksum
|| log_epoch_id != confirmed_info_epoch_id
|| (OB_INVALID_TIMESTAMP != confirmed_info_submit_timestamp
&& log_submit_timestamp != confirmed_info_submit_timestamp)) {
CLOG_LOG(WARN, "confirm log not match", K_(partition_key), K(log_id), K(log_data_checksum),
K(log_epoch_id), K(log_submit_timestamp), K(confirmed_info_data_checksum),
K(confirmed_info_epoch_id), K(confirmed_info_submit_timestamp));
} else {
bret = true;
}
......@@ -2862,9 +2863,10 @@ int ObLogSlidingWindow::leader_submit_confirmed_info_(
log_task->lock();
const int64_t data_checksum = log_task->get_data_checksum();
const int64_t epoch_id = log_task->get_epoch_id();
const int64_t submit_timestamp = log_task->get_submit_timestamp();
const bool batch_committed = log_task->is_batch_committed();
log_task->unlock();
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum))) {
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum, submit_timestamp))) {
CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(ret));
} else if (OB_FAIL(submit_confirmed_info_(log_id, confirmed_info, true, batch_committed))) {
CLOG_LOG(WARN,
......@@ -2902,9 +2904,10 @@ int ObLogSlidingWindow::standby_leader_transfer_confirmed_info_(const uint64_t l
const int64_t data_checksum = log_task->get_data_checksum();
const int64_t epoch_id = log_task->get_epoch_id();
const int64_t accum_checksum = log_task->get_accum_checksum();
const int64_t submit_timestamp = log_task->get_submit_timestamp();
const bool batch_committed = log_task->is_batch_committed();
log_task->unlock();
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum))) {
if (OB_FAIL(confirmed_info.init(data_checksum, epoch_id, accum_checksum, submit_timestamp))) {
CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(ret));
} else if (OB_FAIL(submit_confirmed_info_to_net_(log_id, confirmed_info, batch_committed))) {
CLOG_LOG(WARN,
......@@ -3803,9 +3806,19 @@ int ObLogSlidingWindow::do_fetch_log(const uint64_t start_id, const uint64_t end
int ret = OB_SUCCESS;
bool need_check_rebuild = false;
is_fetched = false;
// the follow code is used to test case clog/3050_rebuild_when_leader_reconfirm.test
// don't delete it
// user needs add this configuration to share/parameter/ob_parameter_seed.ipp
// DEF_BOOL(_enable_fetch_log, OB_CLUSTER_PARAMETER, "true",
// "enabl fetch log", ObParameterAttr(Section::OBSERVER, Source::DEFAULT
// , EditLevel::DYNAMIC_EFFECTIVE));
// const bool can_fetch_log = GCONF._enable_fetch_log;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (start_id <= 0 || end_id <= 0 || start_id >= end_id || OB_ISNULL(state_mgr_) || OB_ISNULL(log_engine_)) {
// } else if (!can_fetch_log) {
// CLOG_LOG(INFO, "can't fetch log", K(ret), K(partition_key_), K(start_id), K(end_id));
} else if (start_id <= 0 || end_id <= 0 || start_id >= end_id || OB_ISNULL(state_mgr_)
|| OB_ISNULL(log_engine_)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid arguments", K(ret), K(partition_key_), K(start_id), K(end_id));
} else if (!check_need_fetch_log_(start_id, need_check_rebuild)) {
......@@ -5179,7 +5192,10 @@ int ObLogSlidingWindow::set_confirmed_info_without_lock_(
{
int ret = OB_SUCCESS;
ObConfirmedInfo confirmed_info;
if (OB_FAIL(confirmed_info.init(header.get_data_checksum(), header.get_epoch_id(), accum_checksum))) {
if (OB_FAIL(confirmed_info.init(header.get_data_checksum(),
header.get_epoch_id(),
accum_checksum,
header.get_submit_timestamp()))) {
CLOG_LOG(ERROR, "confirmed_info init failed", K_(partition_key), K(header), KR(ret));
} else {
log_task.set_confirmed_info(confirmed_info);
......
......@@ -543,11 +543,19 @@ private:
const int64_t submit_timestamp, ObISubmitLogCb* cb);
int try_freeze_aggre_buffer_(const uint64_t log_id);
int submit_freeze_aggre_buffer_task_(const uint64_t log_id);
int submit_aggre_log_(ObAggreBuffer* buffer, const uint64_t log_id, const int64_t submit_timestamp);
int try_update_submit_timestamp(const int64_t base_ts) override;
bool is_confirm_match_(const uint64_t log_id, const int64_t log_data_checksum, const int64_t log_epoch_id,
const int64_t confirmed_info_data_checksum, const int64_t confirmed_info_epoch_id);
int receive_log_(const ObLogEntry& log_entry, const common::ObAddr& server, const int64_t cluster_id);
int submit_aggre_log_(ObAggreBuffer *buffer,
const uint64_t log_id,
const int64_t submit_timestamp);
int try_update_submit_timestamp(const int64_t base_ts);
bool is_confirm_match_(const uint64_t log_id,
const int64_t log_data_checksum,
const int64_t log_epoch_id,
const int64_t log_submit_timestamp,
const int64_t confirmed_info_data_checksum,
const int64_t confirmed_info_epoch_id,
const int64_t confirmed_info_submit_timestamp);
int receive_log_(const ObLogEntry &log_entry, const common::ObAddr &server,
const int64_t cluster_id);
void update_max_log_id_(const uint64_t log_id);
int submit_to_sliding_window_(const ObLogEntryHeader& header, const char* buff, ObISubmitLogCb* cb,
const bool need_replay, const bool send_slave, const common::ObAddr& server, const int64_t cluster_id,
......
......@@ -655,15 +655,22 @@ void ObLogTask::set_confirmed_info(const ObConfirmedInfo& confirmed_info)
{
state_map_.set_map(CONFIRMED_INFO_EXIST);
const int64_t arg_data_checksum = confirmed_info.get_data_checksum();
const int64_t arg_accum_checksum = confirmed_info.get_accum_checksum();
const int64_t arg_epoch_id = confirmed_info.get_epoch_id();
const int64_t arg_submit_timestamp = confirmed_info.get_submit_timestamp();
if (is_submit_log_exist()) {
// check data_checksum_ and epoch_id_ when log exists
if (data_checksum_ != arg_data_checksum || epoch_id_ != confirmed_info.get_epoch_id()) {
CLOG_LOG(ERROR, "set_confirmed_info meta info not match", K(data_checksum_), K(epoch_id_), K(confirmed_info));
if (data_checksum_ != arg_data_checksum
|| epoch_id_ != arg_epoch_id
|| (OB_INVALID_TIMESTAMP != arg_submit_timestamp
&& submit_timestamp_ != arg_submit_timestamp)) {
CLOG_LOG(ERROR, "set_confirmed_info meta info not match", K(data_checksum_),
K(epoch_id_), K(confirmed_info));
}
}
epoch_id_ = confirmed_info.get_epoch_id();
epoch_id_ = arg_epoch_id;
data_checksum_ = arg_data_checksum;
accum_checksum_ = confirmed_info.get_accum_checksum();
accum_checksum_ = arg_accum_checksum;
}
void ObLogTask::set_log_confirmed()
......
......@@ -16,18 +16,25 @@
namespace oceanbase {
using namespace common;
namespace clog {
int ObConfirmedInfo::init(const int64_t data_checksum, const int64_t epoch_id, const int64_t accum_checksum)
namespace clog
{
int ObConfirmedInfo::init(const int64_t data_checksum,
const int64_t epoch_id,
const int64_t accum_checksum,
const int64_t submit_timestamp)
{
int ret = OB_SUCCESS;
data_checksum_ = data_checksum;
epoch_id_ = epoch_id;
accum_checksum_ = accum_checksum;
submit_timestamp_ = submit_timestamp;
return ret;
}
// used for RPC
OB_SERIALIZE_MEMBER(ObConfirmedInfo, data_checksum_, epoch_id_, accum_checksum_);
OB_SERIALIZE_MEMBER(ObConfirmedInfo, data_checksum_,
epoch_id_, accum_checksum_,
submit_timestamp_);
ObMembershipLog::ObMembershipLog()
: version_(MS_LOG_VERSION),
......
......@@ -24,36 +24,23 @@ class ObConfirmedInfo {
OB_UNIS_VERSION(1);
public:
ObConfirmedInfo() : data_checksum_(0), epoch_id_(common::OB_INVALID_TIMESTAMP), accum_checksum_(0)
{}
~ObConfirmedInfo()
{}
ObConfirmedInfo() : data_checksum_(0), epoch_id_(common::OB_INVALID_TIMESTAMP),
accum_checksum_(0), submit_timestamp_(common::OB_INVALID_TIMESTAMP) {}
~ObConfirmedInfo() {}
public:
int init(const int64_t data_checksum, const int64_t epoch_id, const int64_t accum_checksum);
int64_t get_data_checksum() const
{
return data_checksum_;
}
int64_t get_epoch_id() const
{
return epoch_id_;
}
int64_t get_accum_checksum() const
{
return accum_checksum_;
}
void reset()
{
data_checksum_ = 0;
epoch_id_ = common::OB_INVALID_TIMESTAMP;
accum_checksum_ = 0;
}
void deep_copy(const ObConfirmedInfo& confirmed_info)
int init(const int64_t data_checksum, const int64_t epoch_id,
const int64_t accum_checksum, const int64_t submit_timestamp_);
int64_t get_data_checksum() const { return data_checksum_; }
int64_t get_epoch_id() const { return epoch_id_; }
int64_t get_accum_checksum() const { return accum_checksum_; }
int64_t get_submit_timestamp() const { return submit_timestamp_; }
void reset() { data_checksum_ = 0; epoch_id_ = common::OB_INVALID_TIMESTAMP; accum_checksum_ = 0; }
void deep_copy(const ObConfirmedInfo &confirmed_info)
{
data_checksum_ = confirmed_info.data_checksum_;
epoch_id_ = confirmed_info.epoch_id_;
accum_checksum_ = confirmed_info.accum_checksum_;
submit_timestamp_ = confirmed_info.submit_timestamp_;
}
friend bool operator==(const ObConfirmedInfo& lhs, const ObConfirmedInfo& rhs);
TO_STRING_KV(K_(data_checksum), K_(epoch_id), K_(accum_checksum));
......@@ -62,7 +49,7 @@ private:
int64_t data_checksum_;
int64_t epoch_id_;
int64_t accum_checksum_;
int64_t submit_timestamp_;
private:
DISALLOW_COPY_AND_ASSIGN(ObConfirmedInfo);
};
......@@ -70,7 +57,7 @@ private:
inline bool operator==(const ObConfirmedInfo& lhs, const ObConfirmedInfo& rhs)
{
return (lhs.data_checksum_ == rhs.data_checksum_) && (lhs.epoch_id_ == rhs.epoch_id_) &&
(lhs.accum_checksum_ == rhs.accum_checksum_);
(lhs.accum_checksum_ == rhs.accum_checksum_) && (lhs.submit_timestamp_ == rhs.submit_timestamp_);
}
class ObMembershipLog {
......
......@@ -2922,15 +2922,8 @@ int ObPartitionLogService::fetch_log_from_ilog_storage_(const uint64_t log_id, c
CLOG_LOG(INFO, "this replica need rebuild", K(ret), K(server), K(partition_key_), K(log_id), K(read_from_clog));
uint64_t last_replay_log_id = OB_INVALID_ID;
if (OB_FAIL(get_storage_last_replay_log_id_(last_replay_log_id))) {
CLOG_LOG(ERROR,
"get_storage_last_replay_log_id_ failed",
K(ret),
K(partition_key_),
K(log_id),
K(fetch_type),
K(proposal_id),
K(server),
K(need_send_confirm_info));
CLOG_LOG(WARN, "get_storage_last_replay_log_id_ failed", K(ret), K(partition_key_), K(log_id), K(fetch_type),
K(proposal_id), K(server), K(need_send_confirm_info));
} else if (log_id > last_replay_log_id) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR,
......@@ -5897,9 +5890,10 @@ int ObPartitionLogService::send_confirm_info_(const common::ObAddr& server, cons
} else {
const uint64_t log_id = log_entry.get_header().get_log_id();
ObConfirmedInfo confirmed_info;
if (OB_SUCCESS !=
(ret = confirmed_info.init(
log_entry.get_header().get_data_checksum(), log_entry.get_header().get_epoch_id(), accum_checksum))) {
if (OB_SUCCESS != (ret = confirmed_info.init(log_entry.get_header().get_data_checksum(),
log_entry.get_header().get_epoch_id(),
accum_checksum,
log_entry.get_header().get_submit_timestamp()))) {
CLOG_LOG(WARN, "confirmed_info init failed", K_(partition_key), K(ret));
} else if (OB_SUCCESS != (ret = log_engine_->submit_confirmed_info(
list, partition_key_, log_id, confirmed_info, batch_committed))) {
......@@ -8277,7 +8271,8 @@ int ObPartitionLogService::check_and_try_leader_revoke(const ObElection::RevokeT
CLOG_LOG(ERROR, "check_majority_replica_clog_disk_full_ failed", K(ret));
} else {
need_revoke = !majority_is_clog_disk_full;
CLOG_LOG(INFO, "partition may need revoke, ", "need revoke is ", need_revoke, "and revoke type is ", revoke_type);
CLOG_LOG(INFO, "partition may need revoke, ", "need revoke is ", need_revoke,
"and revoke type is ", revoke_type);
}
}
......@@ -8465,5 +8460,5 @@ int ObPartitionLogService::get_role_and_leader_epoch_unlock_(
return ret;
}
} // namespace clog
} // namespace oceanbase
} // namespace clog
} // namespace oceanbase
......@@ -102,8 +102,9 @@ inline int parse_log_item_type(const char* buf, const int64_t len, ObCLogItemTyp
// or the magic of this block is a ilog entry or clog entry), we need to read all subsequent contents of this
// file, check whether there is a valid block and the timestamp recorded in the block header is greater than
// or equal to last_block_ts, if not, the end of the file is read.
template <class Type, class Interface>
class ObRawEntryIterator : public Interface {
template<class Type, class Interface>
class ObRawEntryIterator: public Interface
{
public:
ObRawEntryIterator();
virtual ~ObRawEntryIterator();
......@@ -149,7 +150,7 @@ private:
//
// Since our log disk space is large enough, log files will not be reused within two seconds,
// so this constant is safe in the scenario of reusing files.
static const int64_t CHECK_LAST_BLOCK_TS_INTERVAL = 2000 * 1000; // 2s
static const int64_t CHECK_LAST_BLOCK_TS_INTERVAL = 2000 * 1000; // 2s
private:
bool is_inited_;
ObILogDirectReader* reader_;
......@@ -715,9 +716,10 @@ int ObRawEntryIterator<Type, Interface>::next_entry(Type& entry, ObReadParam& pa
// last_block_ts must be vaild, because of this:
// 1. Write file header is atomic, therefore, the last_block_ts is valid
// 2. else, file header is ObNewLogFileBuf
template <class Type, class Interface>
bool ObRawEntryIterator<Type, Interface>::check_last_block_(
const file_id_t file_id, const offset_t start_offset, const int64_t last_block_ts) const
template<class Type, class Interface>
bool ObRawEntryIterator<Type, Interface>::check_last_block_(const file_id_t file_id,
const offset_t start_offset,
const int64_t last_block_ts) const
{
int ret = common::OB_SUCCESS;
bool bool_ret = false;
......
......@@ -164,7 +164,8 @@ public:
return ret;
}
void destroy()
{}
{
}
private:
common::ObString function_name_;
......
......@@ -753,12 +753,13 @@ DEF_INT(clog_max_unconfirmed_log_count, OB_TENANT_PARAMETER, "1500", "[100, 5000
"Range: [100, 50000]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_ob_clog_timeout_to_force_switch_leader, OB_CLUSTER_PARAMETER, "0s", "[0s, 60m]",
"When log sync is blocking, leader need wait this interval before revoke."
"The default value is 0s, use 0s to close this function. Range: [0s, 60m]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_ob_clog_disk_buffer_cnt, OB_CLUSTER_PARAMETER, "64", "[1, 2000]", "clog disk buffer cnt. Range: [1, 2000]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_ob_clog_timeout_to_force_switch_leader, OB_CLUSTER_PARAMETER, "10s", "[0s, 60m]",
"When log sync is blocking, leader need wait this interval before revoke."
"The default value is 0s, use 0s to close this function. Range: [0s, 60m]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_ob_clog_disk_buffer_cnt, OB_CLUSTER_PARAMETER, "64", "[1, 2000]",
"clog disk buffer cnt. Range: [1, 2000]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_ob_trans_rpc_timeout, OB_CLUSTER_PARAMETER, "3s", "[0s, 3600s]",
"transaction rpc timeout(s). Range: [0s, 3600s]",
ObParameterAttr(Section::TRANS, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
......
......@@ -5912,12 +5912,18 @@ int ObMigratePrepareTask::try_hold_local_partition()
LOG_WARN("failed to get leader", K(ret), "arg", ctx_->replica_op_arg_);
} else if (OB_FAIL(partition->get_role(role))) {
LOG_WARN("failed to get partition role", K(ret), "arg", ctx_->replica_op_arg_);
} else if (leader.is_valid() && leader == MYADDR && is_strong_leader(role) &&
(ADD_REPLICA_OP == ctx_->replica_op_arg_.type_ || MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_ ||
FAST_MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_ ||
REBUILD_REPLICA_OP == ctx_->replica_op_arg_.type_ ||
CHANGE_REPLICA_OP == ctx_->replica_op_arg_.type_ ||
LINK_SHARE_MAJOR_OP == ctx_->replica_op_arg_.type_)) {
} else if (leader.is_valid()
&& leader == MYADDR
// support rebuild in leader reconfirm
&& is_strong_leader(role)
//TODO(wait yanmu)
//&& is_strong_leader(role)
&& (ADD_REPLICA_OP == ctx_->replica_op_arg_.type_
|| MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_
|| FAST_MIGRATE_REPLICA_OP == ctx_->replica_op_arg_.type_
|| REBUILD_REPLICA_OP == ctx_->replica_op_arg_.type_
|| CHANGE_REPLICA_OP == ctx_->replica_op_arg_.type_
|| LINK_SHARE_MAJOR_OP == ctx_->replica_op_arg_.type_)) {
if (REBUILD_REPLICA_OP == ctx_->replica_op_arg_.type_) {
if (OB_FAIL(MIGRATOR.get_partition_service()->turn_off_rebuild_flag(ctx_->replica_op_arg_))) {
LOG_WARN("Failed to report_rebuild_replica off", K(ret), "arg", ctx_->replica_op_arg_);
......@@ -8104,7 +8110,9 @@ int ObMigratePostPrepareTask::deal_with_rebuild_partition()
LOG_WARN("failed to get leader", K(ret), "arg", ctx_->replica_op_arg_);
} else if (OB_FAIL(partition->get_role(role))) {
LOG_WARN("failed to get real role", K(ret), "arg", ctx_->replica_op_arg_);
} else if (leader.is_valid() && leader == MYADDR && is_strong_leader(role)) {
} else if (leader.is_valid() && leader == MYADDR
// support rebuild in leader reconfirm
&& is_strong_leader(role)) {
if (OB_FAIL(MIGRATOR.get_partition_service()->turn_off_rebuild_flag(ctx_->replica_op_arg_))) {
LOG_WARN("Failed to report_rebuild_replica off", K(ret), "arg", ctx_->replica_op_arg_);
} else {
......
// Copyright 2014 Alibaba Inc. All Rights Reserved.
// Author:
// zhangshuai.zs@alibaba-inc.com
//
// This file is for...
//
#ifndef OCEANBASE_UNITTEST_MOCK_LOG_UTILS_H_
#define OCEANBASE_UNITTEST_MOCK_LOG_UTILS_H_
#include "gtest/gtest.h"
#include "lib/allocator/ob_malloc.h"
#include <locale.h>
#include "clog/ob_log_state_mgr.h"
#include "clog/ob_i_net_log_buffer.h"
#include "clog/ob_i_disk_log_buffer.h"
#include "clog/ob_i_net_log_buffer_mgr.h"
#include "clog/ob_log_engine.h"
#include "storage/ob_i_partition_mgr.h"
#include "../storage/mockcontainer/mock_ob_partition.h"
#include "../storage/mockcontainer/mock_ob_election_mgr.h"
#include "clog/ob_log_callback_engine.h"
#include "../storage/mockcontainer/mock_ob_partition_mgr.h"
#include "election/ob_election_rpc.h"
namespace oceanbase
{
namespace clog
{
using namespace oceanbase::election;
class MockSubmitLogCb : public ObISubmitLogCb
{
public:
MockSubmitLogCb() {}
~MockSubmitLogCb() {}
//virtual int on_success(const common::ObPartitionKey &parition_key, const uint64_t log_id, const int64_t version) = 0;
virtual int on_success(const common::ObPartitionKey &partition_key, const uint64_t log_id,
const int64_t trans_version)
{
UNUSED(partition_key);
UNUSED(log_id);
UNUSED(trans_version);
return OB_SUCCESS;
}
};
/*
class MockLogEngine: public ObILogEngine
{
public:
MockLogEngine() {}
virtual ~MockLogEngine() {}
virtual ObIRawLogIterator *alloc_raw_log_iterator(const uint64_t file_id, const int64_t timeout)
{
UNUSED(file_id);
UNUSED(timeout);
MockRawLogIterator *ptr = static_cast<MockRawLogIterator *>(common::ob_malloc(sizeof(
MockRawLogIterator), common::ObModIds::OB_UPS_LOG));
ptr->init();
return ptr;
}
virtual void revert_raw_log_iterator(ObIRawLogIterator *iter)
{
common::ob_free(iter);
}
virtual int read_log_by_location(const clog::ObReadParam &param, ObLogEntry &entry)
{
UNUSED(param);
UNUSED(entry);
return common::OB_SUCCESS;
}
virtual int read_log_by_id(const clog::ObReadParam &param, ObLogEntry &entry)
{
UNUSED(param);
UNUSED(entry);
return common::OB_SUCCESS;
}
virtual int submit_flush_task(FlushTask *task)
{
ObLogCursor log_cursor;
task->after_consume(common::OB_SUCCESS, &log_cursor);
return common::OB_SUCCESS;
};
virtual int post_log(const common::ObAddr &server,
const common::ObPartitionKey &key,
const char *buf,
int64_t len,
common::ObProposalID propose_id)
{
UNUSED(server);
UNUSED(key);
UNUSED(buf);
UNUSED(len);
UNUSED(propose_id);
return common::OB_SUCCESS;
}
virtual int submit_net_task(const common::ObMemberList &mem_list,
const common::ObPartitionKey &key,
ObILogNetTask *task)
{
UNUSED(mem_list);
UNUSED(key);
UNUSED(task);
return common::OB_SUCCESS;
}
virtual int submit_log_ack(const common::ObAddr &server,
const common::ObPartitionKey &key,
const uint64_t log_id,
const common::ObProposalID propose_id,
ObLogType type)
{
UNUSED(server);
UNUSED(key);
UNUSED(log_id);
UNUSED(propose_id);
UNUSED(type);
return common::OB_SUCCESS;
}
virtual int fetch_log_from_all_follower(const common::ObMemberList &mem_list,
const common::ObPartitionKey &key,
const uint64_t start_id,
const common::ObProposalID propose_id)
{
UNUSED(mem_list);
UNUSED(key);
UNUSED(start_id);
UNUSED(propose_id);
return common::OB_SUCCESS;
}
virtual int fetch_log_from_leader(const common::ObAddr &server,
const common::ObPartitionKey &key,
const uint64_t start_id,
const uint64_t end_id,
const common::ObProposalID propose_id)
{
UNUSED(server);
UNUSED(key);
UNUSED(start_id);
UNUSED(end_id);
UNUSED(propose_id);
return common::OB_SUCCESS;
}
virtual int submit_prepare_rqst(const common::ObMemberList &mem_list,
const common::ObPartitionKey &key,
const common::ObProposalID propose_id)
{
UNUSED(mem_list);
UNUSED(key);
UNUSED(propose_id);
return common::OB_SUCCESS;
}
virtual int broadcast_mc_log_to_members(const common::ObPartitionKey &partition_key,
const common::ObMemberList &sendout_member_list,
const char *buff,
const int64_t buff_len)
{
UNUSED(partition_key);
UNUSED(sendout_member_list);
UNUSED(buff);
UNUSED(buff_len);
int ret = common::OB_SUCCESS;
return ret;
}
virtual int fetch_latest_mc_log(const common::ObPartitionKey &partition_key,
const common::ObAddr &leader)
{
UNUSED(partition_key);
UNUSED(leader);
int ret = common::OB_SUCCESS;
return ret;
}
virtual int ack_mc_log_to_leader(const common::ObPartitionKey &partition_key,
const common::ObAddr &leader,
const common::ObAddr &server,
const int64_t mc_timestamp,
const common::ObProposalID proposal_id)
{
UNUSED(partition_key);
UNUSED(leader);
UNUSED(server);
UNUSED(mc_timestamp);
UNUSED(proposal_id);
int ret = common::OB_SUCCESS;
return ret;
}
virtual int prepare_response(const common::ObAddr &server,
const common::ObPartitionKey &partition_key,
const uint64_t max_log_id,
const common::ObProposalID propose_id)
{
UNUSED(server);
UNUSED(partition_key);
UNUSED(max_log_id);
UNUSED(propose_id);
return common::OB_SUCCESS;
}
virtual int get_current_cursor(uint64_t &file_id, int64_t &offset)
{
UNUSED(file_id);
UNUSED(offset);
return common::OB_SUCCESS;
}
virtual int get_search_min_fid(const oceanbase::clog::ObReadParam &a, uint64_t &b)
{
UNUSED(a);
UNUSED(b);
return common::OB_SUCCESS;
}
virtual int submit_fetch_log_resp(const oceanbase::common::ObMemberList &a,
const oceanbase::common::ObPartitionKey &b, oceanbase::clog::ObILogNetTask *c)
{
UNUSED(a);
UNUSED(b);
UNUSED(c);
return common::OB_SUCCESS;
}
virtual int get_cur_min_fid(uint64_t &file_id)
{
UNUSED(file_id);
return common::OB_SUCCESS;
}
virtual ObILogPartitionMetaReader *alloc_partition_meta_reader()
{
return NULL;
}
virtual void revert_partition_meta_reader(ObILogPartitionMetaReader *reader)
{
UNUSED(reader);
return;
}
};
*/
class MockElectionMgr : public election::MockObIElectionMgr
{
public:
MockElectionMgr() : leader_() {}
virtual int start_all()
{
return 0;
}
int stop_all()
{
return 0;
}
virtual int handle_election_msg(const ObElectionMsgBuffer &msgbuf,
obrpc::ObElectionRpcResult &result)
{
UNUSED(msgbuf);
UNUSED(result);
return 0;
}
int init(const common::ObAddr &self, obrpc::ObElectionRpcProxy *client_manager)
{
UNUSED(self);
UNUSED(client_manager);
return 0;
}
int add_partition(const common::ObPartitionKey &partition, int64_t replica_num,
election::ObIElectionCallback *election_cb)
{
UNUSED(partition);
UNUSED(replica_num);
UNUSED(election_cb);
return 0;
}
int remove_partition(const common::ObPartitionKey &partition)
{
UNUSED(partition);
return 0;
}
int change_leader_async(const common::ObPartitionKey &partition, const common::ObAddr &leader)
{
UNUSED(partition);
UNUSED(leader);
change_leader_ = true;
return 0;
}
int start(const common::ObPartitionKey &partition)
{
UNUSED(partition);
return 0;
}
int stop(const common::ObPartitionKey &partition)
{
UNUSED(partition);
return 0;
}
virtual int set_candidate(const common::ObPartitionKey &partition,
const common::ObMemberList &prev_mlist,
const common::ObMemberList &curr_mlist)
{
UNUSED(partition);
UNUSED(prev_mlist);
UNUSED(curr_mlist);
return 0;
}
int get_prev_candidate(const common::ObPartitionKey &partition, common::ObMemberList &mlist) const
{
UNUSED(partition);
UNUSED(mlist);
return 0;
}
int get_curr_candidate(const common::ObPartitionKey &partition, common::ObMemberList &mlist) const
{
UNUSED(partition);
UNUSED(mlist);
return 0;
}
int get_leader(const common::ObPartitionKey &partition, common::ObAddr &leader) const
{
UNUSED(partition);
leader = leader_;
return 0;
}
public:
bool change_leader_;
ObAddr leader_;
};
class MockLogCallbackEngine: public ObILogCallbackEngine
{
public:
virtual int init(common::S2MQueueThread *worker_thread_pool,
common::S2MQueueThread *sp_thread_pool)
{
UNUSED(worker_thread_pool);
UNUSED(sp_thread_pool);
return common::OB_SUCCESS;
}
virtual void destroy()
{
return;
}
virtual int submit_flush_cb_task(const common::ObPartitionKey &partition_key,
const ObLogFlushCbArg &flush_cb_arg)
{
UNUSED(partition_key);
UNUSED(flush_cb_arg);
return common::OB_SUCCESS;
}
virtual int submit_member_change_success_cb_task(const common::ObPartitionKey &partition_key,
const int64_t mc_timestamp,
const common::ObMemberList &prev_member_list,
const common::ObMemberList &curr_member_list)
{
UNUSED(partition_key);
UNUSED(mc_timestamp);
UNUSED(prev_member_list);
UNUSED(curr_member_list);
return common::OB_SUCCESS;
}
virtual int submit_leader_takeover_cb_task(const common::ObPartitionKey &partition_key)
{
UNUSED(partition_key);
return common::OB_SUCCESS;
}
virtual int submit_leader_revoke_cb_task(const common::ObPartitionKey &partition_key)
{
UNUSED(partition_key);
return common::OB_SUCCESS;
}
};
class MockObPSCb : public ObIPSCb
{
public:
virtual int64_t get_min_using_file_id() const {return 0;}
virtual int on_leader_revoke(const common::ObPartitionKey &partition_key)
{UNUSED(partition_key); return 0;}
virtual int on_leader_takeover(const common::ObPartitionKey &partition_key)
{UNUSED(partition_key); return 0;}
virtual int on_leader_active(const common::ObPartitionKey &partition_key)
{UNUSED(partition_key); return 0;}
virtual int on_member_change_success(
const common::ObPartitionKey &partition_key,
const int64_t mc_timestamp,
const common::ObMemberList &prev_member_list,
const common::ObMemberList &curr_member_list)
{
UNUSED(partition_key);
UNUSED(mc_timestamp);
UNUSED(prev_member_list);
UNUSED(curr_member_list);
return 0;
}
virtual int handle_log_missing(const common::ObPartitionKey &pkey,
const common::ObAddr &server)
{
UNUSED(pkey);
UNUSED(server);
return common::OB_SUCCESS;
}
virtual int get_server_locality_array(
common::ObIArray<share::ObServerLocality> &server_locality_array,
bool &has_readonly_zone) const
{
UNUSED(server_locality_array);
UNUSED(has_readonly_zone);
return common::OB_SUCCESS;
}
};
} // namespace clog
} // namespace oceanbase
#endif // OCEANBASE_UNITTEST_MOCK_LOG_UTILS_H_
// Copyright 2014 Alibaba Inc. All Rights Reserved.
// Author:
// zhenzhong.jzz@alibaba-inc.com
// Owner:
// zhenzhong.jzz@alibaba-inc.com
//
//This file is for the unit test of log_callback_engine and the related
//threadpool and handler.
#include <gtest/gtest.h>
#include "clog/ob_log_callback_engine.h"
#include "clog/ob_log_callback_task.h"
#include "clog/ob_log_callback_thread_pool.h"
#include "clog/ob_log_callback_handler.h"
#include "clog/ob_log_define.h"
//#include "storage/ob_replay_status.h"
#include "storage/ob_partition_component_factory.h"
#include "storage/ob_i_partition_group.h"
#include "common/storage/ob_freeze_define.h"
#include "../storage/mockcontainer/mock_ob_partition.h"
#include "../storage/mockcontainer/mock_ob_partition_service.h"
#include "common/ob_queue_thread.h"
#include "common/ob_partition_key.h"
#include "share/ob_errno.h"
#include "lib/net/ob_addr.h"
#include "lib/allocator/ob_concurrent_fifo_allocator.h"
#include "lib/utility/ob_tracepoint.h"
#include "gtest/gtest.h"
#include "share/ob_i_ps_cb.h"
#include "clog_mock_container/mock_ps_cb.h"
namespace oceanbase
{
using namespace common;
using namespace clog;
using namespace storage;
namespace storage
{
class ObPartitionComponentFactory;
}
namespace unittest
{
class MockPartition: public MockObIPartition
{
public:
MockPartition() {partition_key_.init(1, 1, 1);}
~MockPartition() {}
const common::ObPartitionKey &get_partition_key() const
{
return partition_key_;
}
virtual int get_safe_publish_version(int64_t& publish_version)
{
UNUSED(publish_version);
return 0;
}
private:
ObPartitionKey partition_key_;
ObReplayStatus relay_ststus_;
//Uncommenting this line will cause coverity to report an error, if necessary,
//please perform related initialization operations.
// ObPartitionMCState partition_mc_state_;
};
class MockPartitionMgr : public MockObIPartitionService
{
public:
MockPartitionMgr(): is_inited_(true) {}
int get_partition(const ObPartitionKey &partition_key,
ObIPartitionGroup *&partition)
{
int ret = OB_SUCCESS;
if (partition_key == mock_partition_.get_partition_key()) {
partition = &mock_partition_;
} else {
ret = OB_ENTRY_NOT_EXIST;
}
return ret;
}
private:
bool is_inited_;
MockPartition mock_partition_;
};
//class MockObPSCb : public share::ObIPSCb
//{
//public:
// int on_leader_revoke(const ObPartitionKey &partition_key)
// {
// UNUSED(partition_key);
// return OB_SUCCESS;
// }
// int on_leader_takeover(const ObPartitionKey &partition_key)
// {
// UNUSED(partition_key);
// return OB_SUCCESS;
//
// }
// int on_leader_active(const ObPartitionKey &partition_key)
// {
// UNUSED(partition_key);
// return OB_SUCCESS;
//
// }
// int on_member_change_success(
// const ObPartitionKey &partition_key,
// const int64_t mc_timestamp,
// const ObMemberList &prev_member_list,
// const ObMemberList &curr_member_list)
// {
// UNUSED(partition_key);
// UNUSED(mc_timestamp);
// UNUSED(prev_member_list);
// UNUSED(curr_member_list);
// return OB_SUCCESS;
// }
// int64_t get_min_using_file_id() const { return 0; }
//};
class ObTestLogCallback : public ::testing::Test
{
public :
void SetUp()
{
int ret = OB_SUCCESS;
const uint64_t TABLE_ID = 198;
const int32_t PARTITION_IDX = 125;
const int32_t PARTITION_CNT = 168;
const int64_t TOTAL_LIMIT = 1 << 30;
const int64_t HOLD_LIMIT = 1 << 29;
const int64_t PAGE_SIZE = 1 << 16;
const char *ip = "127.0.0.1";
const int32_t PORT = 80;
if (OB_SUCCESS != (ret = partition_key_.init(TABLE_ID, PARTITION_IDX, PARTITION_CNT))) {
CLOG_LOG(ERROR, "partition_key_.init failed");
}
if (OB_SUCCESS != (ret = allocator_.init(TOTAL_LIMIT, HOLD_LIMIT, PAGE_SIZE))) {
CLOG_LOG(ERROR, "allocator_.init failed");
}
self_addr_.set_ip_addr(ip, PORT);
}
void TearDown()
{
allocator_.destroy();
}
MockPartitionMgr partition_service_;
common::ObAddr self_addr_ ;
common::ObConcurrentFIFOAllocator allocator_;
common::ObPartitionKey partition_key_;
MockObPSCb partition_service_cb_;
clog::ObLogCallbackThreadPool log_callback_thread_pool_;
clog::ObLogCallbackThreadPool worker_thread_pool_;
clog::ObLogCallbackThreadPool sp_thread_pool_;
};// class ObTestLogCallback
TEST_F(ObTestLogCallback, init_test)
{
const int64_t THREAD_NUM = 15;
const int64_t TASK_LIMIT_NUM = 12;
clog::ObLogCallbackEngine log_callback_engine;
clog::ObLogCallbackHandler log_callback_handler;
//test the init method of ObLogCallbackHandler
EXPECT_EQ(OB_INVALID_ARGUMENT, log_callback_handler.init(NULL, NULL));
EXPECT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
EXPECT_EQ(OB_INIT_TWICE, log_callback_handler.init(&partition_service_, &partition_service_cb_));
//test the init method of ObLogCallbackThreadPool
EXPECT_EQ(OB_INVALID_ARGUMENT, log_callback_thread_pool_.init(NULL, THREAD_NUM, TASK_LIMIT_NUM,
self_addr_));
TP_SET_ERROR("ob_log_callback_thread_pool_.cpp", "init", "test_a", OB_ERROR);
// EXPECT_EQ(OB_ERROR, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
// TASK_LIMIT_NUM,
// self_addr_));
TP_SET("ob_log_callback_thread_pool_.cpp", "init", "test_a", NULL);
EXPECT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
TASK_LIMIT_NUM, self_addr_));
EXPECT_EQ(OB_INIT_TWICE, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
TASK_LIMIT_NUM, self_addr_));
//test the init method of ObLogCallbackEngine
worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
EXPECT_EQ(OB_INVALID_ARGUMENT, log_callback_engine.init(NULL, NULL));
EXPECT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
EXPECT_EQ(OB_INIT_TWICE, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
sleep(10);
log_callback_engine.destroy();
log_callback_thread_pool_.destroy();
worker_thread_pool_.destroy();
sp_thread_pool_.destroy();
}
//TEST_F(ObTestLogCallback, submit_flush_cb_task_test)
//{
// const uint64_t LOG_ID = 190;
// const int64_t MC_TIMESTAMP = 865;
// const common::ObProposalID PROPOSAL_ID;
// const int64_t THREAD_NUM = 15;
// const int64_t TASK_LIMIT_NUM = 12;
// bool NEED_ACK = false;
//
// clog::ObLogCallbackEngine log_callback_engine;
// clog::ObLogCallbackThreadPool log_callback_thread_pool_;
// clog::ObLogCallbackHandler log_callback_handler;
// clog::ObLogCallbackThreadPool worker_thread_pool_;
// clog::ObLogCallbackThreadPool sp_thread_pool_;
//
// ObLogType type = OB_LOG_MEMBER_CHANGE;
// clog::ObLogCursor log_cursor;
// log_cursor.file_id_ = 0;
// log_cursor.offset_ = 0;
// log_cursor.size_ = 0;
// clog::ObLogFlushCbArg flush_cb_arg(type, LOG_ID, MC_TIMESTAMP, PROPOSAL_ID, NEED_ACK, self_addr_,
// log_cursor, 0);
//
// EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_flush_cb_task(partition_key_, flush_cb_arg));
// ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
// ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
// TASK_LIMIT_NUM, self_addr_));
// worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
// sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
// ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
// EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_flush_cb_task(partition_key_, flush_cb_arg));
//
// sleep(10);
// log_callback_engine.destroy();
// log_callback_thread_pool_.destroy();
// worker_thread_pool_.destroy();
// sp_thread_pool_.destroy();
//}
TEST_F(ObTestLogCallback, submit_member_change_success_cb_task_test)
{
const int64_t THREAD_NUM = 15;
const int64_t TASK_LIMIT_NUM = 12;
clog::ObLogCallbackEngine log_callback_engine;
clog::ObLogCallbackHandler log_callback_handler;
const int64_t MC_TIMESTAMP = 64;
const common::ObMemberList PREV_MEMBER_LIST;
const common::ObMemberList CURR_MEMBER_LIST;
EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_member_change_success_cb_task(partition_key_,
MC_TIMESTAMP, PREV_MEMBER_LIST, CURR_MEMBER_LIST));
ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
TASK_LIMIT_NUM, self_addr_));
worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_member_change_success_cb_task(partition_key_,
MC_TIMESTAMP, PREV_MEMBER_LIST, CURR_MEMBER_LIST));
sleep(10);
log_callback_engine.destroy();
log_callback_thread_pool_.destroy();
worker_thread_pool_.destroy();
sp_thread_pool_.destroy();
}
//TEST_F(ObTestLogCallback, submit_leader_takeover_cb_task_test)
//{
// const int64_t THREAD_NUM = 15;
// const int64_t TASK_LIMIT_NUM = 12;
// clog::ObLogCallbackEngine log_callback_engine;
// clog::ObLogCallbackThreadPool log_callback_thread_pool_;
// clog::ObLogCallbackHandler log_callback_handler;
// clog::ObLogCallbackThreadPool worker_thread_pool_;
// clog::ObLogCallbackThreadPool sp_thread_pool_;
//
// EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_leader_takeover_cb_task(partition_key_));
// ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
// ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
// TASK_LIMIT_NUM, self_addr_));
// worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
// sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
// ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
// EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_leader_takeover_cb_task(partition_key_));
//
// sleep(10);
// log_callback_engine.destroy();
// log_callback_thread_pool_.destroy();
// worker_thread_pool_.destroy();
// sp_thread_pool_.destroy();
//}
//TEST_F(ObTestLogCallback, submit_leader_revoke_cb_task_test)
//{
// const int64_t THREAD_NUM = 15;
// const int64_t TASK_LIMIT_NUM = 12;
// clog::ObLogCallbackEngine log_callback_engine;
// clog::ObLogCallbackThreadPool log_callback_thread_pool_;
// clog::ObLogCallbackHandler log_callback_handler;
// clog::ObLogCallbackThreadPool worker_thread_pool_;
// clog::ObLogCallbackThreadPool sp_thread_pool_;
//
// EXPECT_EQ(OB_NOT_INIT, log_callback_engine.submit_leader_revoke_cb_task(partition_key_));
// ASSERT_EQ(OB_SUCCESS, log_callback_handler.init(&partition_service_, &partition_service_cb_));
// ASSERT_EQ(OB_SUCCESS, log_callback_thread_pool_.init(&log_callback_handler, THREAD_NUM,
// TASK_LIMIT_NUM, self_addr_));
// worker_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
// sp_thread_pool_.init(&log_callback_handler, THREAD_NUM, TASK_LIMIT_NUM, self_addr_);
// ASSERT_EQ(OB_SUCCESS, log_callback_engine.init(&worker_thread_pool_, &sp_thread_pool_));
// EXPECT_EQ(OB_SUCCESS, log_callback_engine.submit_leader_revoke_cb_task(partition_key_));
//
// sleep(10);
// log_callback_engine.destroy();
// log_callback_thread_pool_.destroy();
// worker_thread_pool_.destroy();
// sp_thread_pool_.destroy();
//}
}//namespace unittest
}//namespace oceanbase
int main(int argc, char **argv)
{
OB_LOGGER.set_file_name("test_log_callback_engine.log", true);
OB_LOGGER.set_log_level("INFO");
CLOG_LOG(INFO, "begin unittest: test_ob_log_callback_engine");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
// Copyright 2014 Alibaba Inc. All Rights Reserved.
// Author:
// zhenzhong.jzz@alibaba-inc.com
// Owner:
// zhenzhong.jzz@alibaba-inc.com
//
// This file is for unit test of ObLogReconfirm
#include "clog/ob_log_reconfirm.h"
#include "gtest/gtest.h"
#include "lib/allocator/ob_malloc.h"
#include "clog/ob_log_state_mgr.h"
#include "clog/ob_i_net_log_buffer.h"
#include "clog/ob_i_disk_log_buffer.h"
#include "clog/ob_i_net_log_buffer_mgr.h"
#include "clog/ob_i_log_engine.h"
#include "clog/ob_log_define.h"
#include "test_accessor.h"
#include "lib/utility/ob_tracepoint.h"
#include "clog_mock_container/mock_log_membership_mgr.h"
#include "clog_mock_container/mock_log_engine.h"
#include "clog_mock_container/mock_log_replay_engine.h"
#include "clog_mock_container/mock_log_allocator.h"
#include "clog_mock_container/mock_log_state_mgr.h"
#include "clog_mock_container/mock_submit_log_cb.h"
#include "clog_mock_container/mock_log_sliding_window.h"
using namespace oceanbase::common;
namespace oceanbase
{
using namespace common;
namespace clog
{
class MockLogSWForReconfirm: public ObILogSWForReconfirm
{
public:
MockLogSWForReconfirm() : start_id_(0), epoch_id_(0), max_id_(0), next_id_(0),
submit_ret_(OB_SUCCESS),
sw_ret_(OB_SUCCESS), array_(NULL), call_start_working_cnt_(0), call_submit_cnt_(0),
get_log_ok_(true), get_log_task_ret_(OB_SUCCESS), use_cnt_(0) {}
uint64_t get_start_id() const
{
return start_id_;
}
int64_t get_epoch_id() const
{
return epoch_id_;
}
int try_update_max_log_id(const uint64_t log_id)
{
max_id_ = log_id;
return OB_SUCCESS;
}
int submit_log(const ObLogEntryHeader &header, const char *buff,
ObISubmitLogCb *cb)
{
UNUSED(header);
UNUSED(buff);
UNUSED(cb);
ATOMIC_INC(&call_submit_cnt_);
return submit_ret_;
}
void revert_log_task(const int64_t *ref)
{
UNUSED(ref);
}
//The even number returns the log_task that exists,
//and the odd number returns the local non-existent
int get_log_task(const uint64_t log_id, ObLogTask *&log_task, const int64_t *&ref)
{
int ret = OB_SUCCESS;
UNUSED(ref);
if (get_log_ok_) {
if (array_ != NULL && log_id % 2 == 0) {
log_task = array_ + log_id;
} else {
ret = OB_ERR_NULL_VALUE;
}
} else {
ret = OB_ERR_UNEXPECTED;
}
if (get_log_task_ret_ != OB_SUCCESS && use_cnt_ < 3) {
use_cnt_++;
return get_log_task_ret_;
} else {
return ret;
}
}
uint64_t get_max_log_id() const
{
return max_id_;
}
void submit_replay_task(const bool is_appending) { UNUSED(is_appending); }
public:
uint64_t start_id_;
int64_t epoch_id_;
uint64_t max_id_;
uint64_t next_id_;
int submit_ret_;
int sw_ret_;
ObLogTask *array_;
int64_t call_start_working_cnt_;
int64_t call_submit_cnt_;
bool get_log_ok_;
int get_log_task_ret_;
uint32_t use_cnt_;
};
class MockLogStateMgr : public MockObLogStateMgr
{
public:
MockLogStateMgr() {}
~MockLogStateMgr() {}
public:
void set_proposal_id(common::ObProposalID proposal_id)
{
proposal_id_ = proposal_id;
}
common::ObProposalID get_proposal_id() const
{
return proposal_id_;
}
public:
common::ObProposalID proposal_id_;
};
class ReLogAllocator : public MockObLogAllocator
{
public:
ReLogAllocator()
{
alloc_.init(512 * 1024 *1024, 512 * 1024 * 1024, 64 * 1024);
}
void *re_alloc(const int64_t sz)
{
return malloc(sz);
}
void re_free(void *ptr)
{
free(ptr);
}
common::ObIAllocator *get_re_allocator()
{
return &alloc_;
}
private:
common::ObConcurrentFIFOAllocator alloc_;
};
class MockLogMembershipMgr : public MockObLogMembershipMgr
{
public:
MockLogMembershipMgr() {}
virtual ~MockLogMembershipMgr() {}
public:
int add_member(const common::ObMember &member)
{
return curr_member_list_.add_member(member);
}
const common::ObMemberList &get_curr_member_list() const
{
return curr_member_list_;
}
private:
common::ObMemberList curr_member_list_;
};
}//namespace clog
namespace unittest
{
class ReconfirmStateAccessorForTest
{
public:
ObLogReconfirm::State get_fetch_max_lsn()
{
return ObLogReconfirm::FETCH_MAX_LSN;
}
ObLogReconfirm::State get_reconfirming()
{
return ObLogReconfirm::RECONFIRMING;
}
ObLogReconfirm::State get_start_working()
{
return ObLogReconfirm::START_WORKING;
}
int64_t get_majority_tag_bit()
{
return ObLogReconfirm::MAJORITY_TAG_BIT;
}
};
class TestReconfirm : public ::testing::Test
{
public:
virtual void SetUp()
{
const uint64_t TABLE_ID = 120;
const int32_t PARTITION_IDX = 1400;
const int32_t PARTITION_CNT = 3;
partition_key_.init(TABLE_ID, PARTITION_IDX, PARTITION_CNT);
self_.parse_from_cstring("127.0.0.1:8111");
helper_.set_assert_on(reconfirm_, false);
common::ObMember self_member(self_, ObTimeUtility::current_time());
mm_.add_member(self_member);
}
virtual void TearDown()
{
helper_.set_assert_on(reconfirm_, true);
}
protected:
ReconfirmStateAccessorForTest state_accessor_;
ObLogReconfirmAccessor helper_;
clog::MockLogSWForReconfirm sw_;
clog::MockLogStateMgr state_mgr_;
clog::MockObLogEngine log_engine_;
ReLogAllocator alloc_;
MockObSubmitLogCb submit_cb_;
ObPartitionKey partition_key_;
ObLogReconfirm reconfirm_;
MockLogMembershipMgr mm_;
ObAddr self_;
};
TEST_F(TestReconfirm, reconfirm_test)
{
ASSERT_EQ(OB_SUCCESS, reconfirm_.init(&sw_, &state_mgr_, &mm_, &log_engine_, &alloc_,
partition_key_, self_));
//The state is INITED, and the maximum log_id stored by oob
//is greater than the sliding window
ASSERT_TRUE(mm_.get_curr_member_list().contains(self_));
sw_.max_id_ = 15;
EXPECT_EQ(OB_EAGAIN, reconfirm_.reconfirm());
//Handle FLUSHING_PREPARE_LOG
ObProposalID proposal_id_tmp;
proposal_id_tmp.addr_ = self_;
proposal_id_tmp.ts_ = 1500;
state_mgr_.set_proposal_id(proposal_id_tmp);
proposal_id_tmp.ts_ -= 100;
helper_.get_new_proposal_id(reconfirm_) = proposal_id_tmp;
EXPECT_EQ(OB_EAGAIN, reconfirm_.reconfirm());
//New proposal_id has flushed
state_mgr_.set_proposal_id(proposal_id_tmp);
EXPECT_EQ(OB_EAGAIN, reconfirm_.reconfirm());
helper_.set_state(reconfirm_, state_accessor_.get_reconfirming());
}
TEST_F(TestReconfirm, receive_max_log_id_test)
{
const uint64_t LOG_ID = 1400;
ASSERT_EQ(OB_SUCCESS, reconfirm_.init(&sw_, &state_mgr_, &mm_, &log_engine_, &alloc_,
partition_key_, self_));
int64_t max_log_ts = 0;
//Incorrect state
EXPECT_EQ(OB_STATE_NOT_MATCH, reconfirm_.receive_max_log_id(self_, LOG_ID, max_log_ts));
helper_.set_state(reconfirm_, state_accessor_.get_fetch_max_lsn());
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_max_log_id(self_, LOG_ID, max_log_ts));
helper_.get_max_log_ack_map(reconfirm_).test_and_set(state_accessor_.get_majority_tag_bit());
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_max_log_id(self_, LOG_ID, max_log_ts));
helper_.get_max_log_ack_map(reconfirm_).reset_map(state_accessor_.get_majority_tag_bit());
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_max_log_id(self_, LOG_ID, max_log_ts));
}
TEST_F(TestReconfirm, receive_log_test)
{
ObProposalID proposal_id_tmp;
proposal_id_tmp.addr_ = self_;
proposal_id_tmp.ts_ = 1500;
uint64_t log_id = 2000;
int64_t data_len = 0;
int64_t generate_timestamp = 1990;
int64_t epoch_id = 1800;
int64_t submit_timestamp = 1990;
ObLogEntry log_entry;
ObLogEntryHeader header;
common::ObVersion freeze_version;
ASSERT_EQ(OB_SUCCESS, reconfirm_.init(&sw_, &state_mgr_, &mm_, &log_engine_, &alloc_,
partition_key_, self_));
EXPECT_TRUE(reconfirm_.need_start_up());
EXPECT_EQ(OB_STATE_NOT_MATCH, reconfirm_.receive_log(log_entry, self_));
helper_.set_state(reconfirm_, state_accessor_.get_reconfirming());
helper_.get_max_flushed_id(reconfirm_) = 1800;
helper_.prepare_map(reconfirm_);
sw_.start_id_ = 1000;
header.generate_header(OB_LOG_SUBMIT, partition_key_, log_id, NULL, data_len, generate_timestamp,
epoch_id,
proposal_id_tmp, submit_timestamp,
freeze_version);
log_entry.generate_entry(header, NULL);
EXPECT_EQ(OB_INVALID_ARGUMENT, reconfirm_.receive_log(log_entry, ObAddr()));
log_id = 100;
header.generate_header(OB_LOG_SUBMIT, partition_key_, log_id, NULL, data_len, generate_timestamp,
epoch_id,
proposal_id_tmp, submit_timestamp,
freeze_version);
log_entry.generate_entry(header, NULL);
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_log(log_entry, self_));
log_id = 1500;
header.generate_header(OB_LOG_NOT_EXIST, partition_key_, log_id, NULL, data_len, generate_timestamp,
epoch_id,
proposal_id_tmp, submit_timestamp,
freeze_version);
log_entry.generate_entry(header, NULL);
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_log(log_entry, self_));
header.generate_header(OB_LOG_SUBMIT, partition_key_, log_id, NULL, data_len, generate_timestamp,
epoch_id,
proposal_id_tmp, submit_timestamp,
freeze_version);
log_entry.generate_entry(header, NULL);
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_log(log_entry, self_));
EXPECT_EQ(OB_SUCCESS, reconfirm_.receive_log(log_entry, self_));
}
TEST_F(TestReconfirm, private_test)
{
ObProposalID proposal_id_tmp;
proposal_id_tmp.addr_ = self_;
proposal_id_tmp.ts_ = 1500;
ObLogEntry log_entry;
ObLogEntryHeader header;
common::ObVersion freeze_version;
ASSERT_EQ(OB_SUCCESS, reconfirm_.init(&sw_, &state_mgr_, &mm_, &log_engine_, &alloc_,
partition_key_, self_));
helper_.set_state(reconfirm_, state_accessor_.get_reconfirming());
helper_.get_max_flushed_id(reconfirm_) = 1800;
helper_.prepare_map(reconfirm_);
//Test the processing of try_fetch_log_ under normal circumstances when
//the starting ID of the sliding window is less than the maximum fetching ID
sw_.start_id_ = 120;
helper_.get_start_id(reconfirm_) = 120;
helper_.get_max_flushed_id(reconfirm_) = 150;
EXPECT_EQ(OB_SUCCESS, helper_.execute_try_fetch_log(reconfirm_));
//Test the processing of try_filter_invalid_log_ function when the timestamp
//and leader_ts disagree
uint64_t log_id = 1500;
int64_t data_len = 0;
int64_t generate_timestamp = 1990;
int64_t epoch_id = 1800;
int64_t submit_timestamp = 1990;
header.generate_header(OB_LOG_SUBMIT, partition_key_, log_id, NULL, data_len, generate_timestamp,
epoch_id,
proposal_id_tmp,
submit_timestamp,
freeze_version);
helper_.get_leader_ts(reconfirm_) = 1600;
helper_.get_start_id(reconfirm_) = 140;
helper_.get_next_id(reconfirm_) = 150;
int idx = 10;
ObLogEntry *log_array = helper_.get_log_array(reconfirm_);
(log_array + idx) -> generate_entry(header, NULL);
EXPECT_EQ(OB_ERR_UNEXPECTED, helper_.execute_try_filter_invalid_log(reconfirm_));
//Handle start-working log
log_id = 1500;
header.generate_header(OB_LOG_START_MEMBERSHIP, partition_key_, log_id, NULL, data_len,
generate_timestamp,
epoch_id, proposal_id_tmp,
submit_timestamp, freeze_version);
(log_array + idx) -> generate_entry(header, NULL);
EXPECT_EQ(OB_SUCCESS, helper_.execute_try_filter_invalid_log(reconfirm_));
//Replace ghost log, may be non-empty log
epoch_id = 1000;
helper_.get_start_id(reconfirm_) = 140;
header.generate_header(OB_LOG_SUBMIT, partition_key_, log_id, NULL, data_len, generate_timestamp,
epoch_id,
proposal_id_tmp,
submit_timestamp, freeze_version);
(log_array + idx) -> generate_entry(header, NULL);
EXPECT_EQ(OB_SUCCESS, helper_.execute_try_filter_invalid_log(reconfirm_));
// sw_.start_id_ = 100;
// helper_.get_start_id(reconfirm_) = 100;
// helper_.get_next_id(reconfirm_) = 150;
// TP_SET_ERROR("ob_log_reconfirm.cpp", "retry_confirm_log_", "test_c", OB_ERROR);
// EXPECT_EQ(OB_ERROR, helper_.execute_retry_confirm_log(reconfirm_));
// TP_SET("ob_log_reconfirm.cpp", "retry_confirm_log_", "test_c", NULL);
//
//
// helper_.get_last_retry_reconfirm_ts(reconfirm_) = 0;
// sw_.start_id_ = 100;
// helper_.get_last_check_start_id(reconfirm_) = 120;
// EXPECT_TRUE(helper_.execute_need_retry_reconfirm(reconfirm_));
// sw_.start_id_ = 120;
// EXPECT_TRUE(helper_.execute_need_retry_reconfirm(reconfirm_));
//init_reconfirm, when the maximum logid stored in oob_log_handler_ is greater
//than in the sliding window, that is, when there is a log that cannot enter the sliding window
sw_.max_id_ = 1300;
EXPECT_EQ(OB_SUCCESS, helper_.execute_init_reconfirm(reconfirm_));
//The operation of the processing code for several cases of failure to obtain
//the log in get_start_id_and_leader_ts_
sw_.get_log_task_ret_ = OB_ERROR_OUT_OF_RANGE;
EXPECT_EQ(OB_SUCCESS, helper_.execute_get_start_id_and_leader_ts(reconfirm_));
}
//majority is 1
TEST(SampleRecnofirmTest, single_member)
{
ObLogReconfirmAccessor helper;
MockLogSWForReconfirm sw;
MockLogStateMgr state_mgr;
MockObLogEngine log_engine;
clog::MockObSlidingCallBack sliding_cb;
ReLogAllocator alloc;
MockObSubmitLogCb submit_cb;
ObPartitionKey partition_key;
ObLogReconfirm reconfirm;
MockLogMembershipMgr mm;
ObAddr self;
self.parse_from_cstring("127.0.0.1:8111");
common::ObMemberList curr_member_list;
common::ObMember self_member(self, ObTimeUtility::current_time());
mm.add_member(self_member);
ASSERT_EQ(OB_INVALID_ARGUMENT, reconfirm.init(NULL, &state_mgr, &mm, &log_engine, &alloc,
partition_key, self));
const uint64_t TABLE_ID = 120;
const int32_t PARTITION_IDX = 1400;
const int32_t PARTITION_CNT = 3;
partition_key.init(TABLE_ID, PARTITION_IDX, PARTITION_CNT);
ASSERT_EQ(OB_SUCCESS, reconfirm.init(&sw, &state_mgr, &mm, &log_engine, &alloc,
partition_key, self
));
reconfirm.clear();
////////////////////////////////////////////////////////////////////////////////////////////////////////
//Verify that there are no pending logs
sw.start_id_ = 1;
sw.max_id_ = 0;
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
//Wait for the new proposal_id to refresh successfully
// Modify the proposal_id of StateMgr to indicate that the flash disk and
// the callback are executed successfully
state_mgr.set_proposal_id(helper.get_new_proposal_id(reconfirm));
//Verification enters the START_WORKING stage
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
// Retransmit after timeout
usleep(CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT + 100);
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
sw.start_id_ = 2;
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
reconfirm.clear();
////////////////////////////////////////////////////////////////////////////////////////////////////////
// Verify that there are three logs that require Reconfirm, and their log_ids are 0, 1, and 2,
// where log 2 is already in the local confirmation state, log 1 does not exist locally, and
// log 0 has a generate_header() parameter comparison table locally.
ObProposalID pid;
pid.ts_ = 123456;
pid.addr_ = self;
sw.start_id_ = 0;
sw.max_id_ = 2;
ObLogTask task_array[3];
ObLogEntryHeader header;
ObLogSimpleBitMap bit_map;
common::ObVersion freeze_version;
ObConfirmedInfo confirmed_info;
header.generate_header(OB_LOG_SUBMIT, partition_key, 0, NULL, 0, ::oceanbase::common::ObTimeUtility::current_time(), 0,
pid, 0, freeze_version);
task_array[0].init(&alloc, &sliding_cb, &submit_cb, 1);
task_array[0].set_log(header, NULL, true);
header.generate_header(OB_LOG_SUBMIT, partition_key, 1, NULL, 0, ::oceanbase::common::ObTimeUtility::current_time(), 0,
pid, 0, freeze_version);
task_array[1].init(&alloc, &sliding_cb, &submit_cb, 1);
task_array[1].set_log(header, NULL, true);
header.generate_header(OB_LOG_SUBMIT, partition_key, 2, NULL, 0, ::oceanbase::common::ObTimeUtility::current_time(), 0,
pid, 0, freeze_version);
task_array[2].init(&alloc, &sliding_cb, &submit_cb, 1);
task_array[2].set_log(header, NULL, true);
// Set log 2 to confirm the status
task_array[2].set_confirmed_info(confirmed_info);
sw.array_ = task_array;
state_mgr.proposal_id_ = pid;
EXPECT_EQ(OB_EAGAIN, reconfirm.reconfirm());
// Set proposal_id to brush disk successfully and call back
state_mgr.proposal_id_ = helper.get_new_proposal_id(reconfirm);
//Verification timeout does not form a majority, resend
usleep(CLOG_LEADER_RECONFIRM_SYNC_TIMEOUT + 1);
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
// Move the left edge of the sliding window, the synchronization of pending
// logs is completed, and enter the writing START_WORKING phase
sw.start_id_ = sw.max_id_ + 1;
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
//Move the left border of the sliding window, START_WORKING is complete,
//and the RECONFIRM process is complete
sw.start_id_ = sw.max_id_ + 2;
ASSERT_EQ(OB_EAGAIN, reconfirm.reconfirm());
reconfirm.clear();
}
}
}
int main(int argc, char **argv)
{
oceanbase::common::ObLogger::get_logger().set_log_level("INFO");
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
return 0;
}
// Copyright 2014 Alibaba Inc. All Rights Reserved.
// Author:
// qiaoli.xql@alibaba-inc.com
// Owner:
// lujun.wlj@alibaba-inc.com
//
// This file tests ObLogPartitionMetaReader.
//
//l1: Write the correct trailer information and read the correct PartitionMeta information
//l2: Write the correct trailer information and read the incorrect PartitionMeta information
//l3: Write the correct trailer information and can not read PartitionMeta information
//14: not write trailer
//
#include "clog/ob_log_partition_meta_reader.h"
#include "clog/ob_log_file_trailer.h"
#include "clog/ob_log_direct_reader.h"
#include "clog/ob_log_file_pool.h"
#include "clog/ob_log_common.h"
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "lib/tbsys.h"
#include <libaio.h>
using namespace oceanbase::common;
namespace oceanbase
{
using namespace clog;
namespace unittest
{
class MyMinUsingFileIDGetter: public ObIGetMinUsingFID
{
public:
MyMinUsingFileIDGetter() {}
virtual ~MyMinUsingFileIDGetter() {}
int on_leader_revoke(const common::ObPartitionKey &partition_key)
{ UNUSED(partition_key); return OB_SUCCESS; }
int on_leader_takeover(const common::ObPartitionKey &partition_key)
{ UNUSED(partition_key); return OB_SUCCESS; }
int on_member_change_success(
const common::ObPartitionKey &partition_key,
const int64_t mc_timestamp,
const common::ObMemberList &prev_member_list,
const common::ObMemberList &curr_member_list)
{ UNUSED(partition_key); UNUSED(mc_timestamp); UNUSED(prev_member_list); UNUSED(curr_member_list); return OB_SUCCESS; }
int64_t get_min_using_file_id() const
{ return 1; }
};
class TestObLogPartitionMetaReader: public ::testing::Test
{
public:
virtual void SetUp();
virtual void TearDown();
int write_data(const int fd, const char *buf, const int64_t buf_len, const int64_t offset);
int submit_aio(struct iocb *p);
int wait_event(struct iocb *p);
int write_file();
static const int64_t data_size = 1 << 9; //512
static const int64_t align_size = 1 << 10; //1K
static const char *label = "2";
static const char *label2 = "3";
io_context_t ctx_;
ObAlignedBuffer buffer;
ObAlignedBuffer buffer2;
ObLogFileTrailer trailer;
ObLogCache cache;
const char *path;
int64_t blk_len;
char buf[align_size];
};
void TestObLogPartitionMetaReader::SetUp()
{
path = "readers";
EXPECT_LE(0, system("mkdir readers"));
EXPECT_LE(0, system("rm readers/*"));
memset(&ctx_, 0, sizeof(ctx_));
memset(buf, 0, sizeof(buf));
EXPECT_EQ(0, io_setup(1, &ctx_));
EXPECT_EQ(OB_SUCCESS, buffer.init(align_size, align_size, label));
EXPECT_EQ(OB_SUCCESS, buffer2.init(data_size, data_size, label2));
const int64_t bucket_num = 1024;
const int64_t max_cache_size = 1024 * 1024 * 512;
const int64_t block_size = common::OB_MALLOC_BIG_BLOCK_SIZE;
ObKVGlobalCache::get_instance().init(bucket_num, max_cache_size, block_size);
EXPECT_EQ(OB_SUCCESS, cache.init("TestObLogCache", 5));
}
void TestObLogPartitionMetaReader::TearDown()
{
if (NULL != ctx_) {
io_destroy(ctx_);
}
buffer.destroy();
cache.destroy();
ObKVGlobalCache::get_instance().destroy();
}
int TestObLogPartitionMetaReader::write_data(const int fd, const char *buf, const int64_t buf_len,
const int64_t offset)
{
int ret = OB_SUCCESS;
struct iocb io;
struct iocb *p = &io;
memset(&io, 0x0, sizeof(io));
io_prep_pwrite(&io, fd, (void *)buf, buf_len, offset);
EXPECT_EQ(1, io_submit(ctx_, 1, &p));
EXPECT_EQ(OB_SUCCESS, wait_event(p));
return ret;
}
int TestObLogPartitionMetaReader::wait_event(struct iocb *p)
{
int ret = OB_SUCCESS;
struct io_event e;
struct timespec timeout;
OB_ASSERT(p->u.c.offset != -1);
OB_ASSERT(static_cast<int64_t>(p->u.c.nbytes) >= 0);
timeout.tv_sec = 100;
timeout.tv_nsec = 0; //100s
EXPECT_EQ(1, io_getevents(ctx_, 1, 1, &e, &timeout));
EXPECT_EQ(0U, e.res2);
EXPECT_EQ(p->u.c.nbytes, e.res);
EXPECT_EQ(p->data, e.data);
return ret;
}
int TestObLogPartitionMetaReader::write_file()
{
int ret = OB_SUCCESS;
int fd = -1;
int64_t pos = 0;
int64_t num = 2;
int64_t file_size = 1 << 22; //4K
int64_t percent = 100;
file_id_t start_file_id = 11; //11-14 are test file
ObLogBlockMetaV2::MetaContent block;
ObLogWriteFilePool write_pool;
MyMinUsingFileIDGetter min_getter;
EXPECT_EQ(OB_SUCCESS, write_pool.init(path, file_size, percent, &min_getter));
EXPECT_EQ(OB_SUCCESS, block.generate_block(buf, data_size, OB_DATA_BLOCK));
CLOG_LOG(INFO, "block", "meta", to_cstring(block), "buf", buf, "data_size", (int64_t)data_size);
EXPECT_EQ(OB_SUCCESS, block.serialize(buffer.get_align_buf(), align_size, pos));
memcpy((buffer.get_align_buf() + pos), buf, data_size);
offset_t start_pos = -1;
file_id_t next_file_id = 0;
blk_len = block.get_total_len();
//--------------test case 1-----------------------
//Write file which file id is 11, content is correct, write tow block
EXPECT_EQ(OB_SUCCESS, write_pool.get_fd(start_file_id, fd));
for (int64_t i = 0; i < num; i++) {
if (OB_SUCCESS != (ret = write_data(fd, buffer.get_align_buf(), blk_len, i * blk_len))) {
CLOG_LOG(ERROR, "write_data fail", KERRMSG);
}
}
pos = 0;
start_pos = 1024;
next_file_id = start_file_id + 1;
//Write the trailer of file which file id is 11, include start address and next file id
EXPECT_EQ(OB_SUCCESS, trailer.build_serialized_trailer(buffer2.get_align_buf(), data_size, start_pos, next_file_id, pos));
EXPECT_EQ(OB_SUCCESS, write_data(fd, buffer2.get_align_buf(), data_size, CLOG_TRAILER_OFFSET));
if (fd >= 0) {
close(fd);
}
//--------------test case 2-----------------------
// Write file whild file id is 13, write data failed, check block integrity fail
start_file_id = next_file_id;
EXPECT_EQ(OB_SUCCESS, write_pool.get_fd(start_file_id, fd));
buffer.get_align_buf()[512] = 'B';
for (int64_t i = 0; i < num; i++) {
if (OB_SUCCESS != (ret = write_data(fd, buffer.get_align_buf(), blk_len, i * blk_len))) {
CLOG_LOG(ERROR, "write_data fail", KERRMSG);
}
}
pos = 0;
start_pos = 0;
next_file_id = start_file_id + 1;
//Write the trailer of file which file id is 12, include start address and next file id
EXPECT_EQ(OB_SUCCESS, trailer.build_serialized_trailer(buffer2.get_align_buf(), data_size, start_pos, next_file_id, pos));
EXPECT_EQ(OB_SUCCESS, write_data(fd, buffer2.get_align_buf(), data_size, CLOG_TRAILER_OFFSET));
if (fd >= 0) {
close(fd);
}
//--------------test case 3-----------------------
// Write file whild file id is 13, write one block, data correct,
// check meat checksum fail
start_file_id = next_file_id;
EXPECT_EQ(OB_SUCCESS, write_pool.get_fd(start_file_id, fd));
EXPECT_EQ(OB_SUCCESS, write_data(fd, buffer.get_align_buf(), blk_len, 0));
pos = 0;
start_pos = 512;
next_file_id = start_file_id + 1;
//Write the trailer of file which file id is 13, include start address and next file id
EXPECT_EQ(OB_SUCCESS, trailer.build_serialized_trailer(buffer2.get_align_buf(), data_size, start_pos, next_file_id, pos));
EXPECT_EQ(OB_SUCCESS, write_data(fd, buffer2.get_align_buf(), data_size, CLOG_TRAILER_OFFSET));
if (fd >= 0) {
close(fd);
}
//--------------test case 4-----------------------
// Write file whild file id is 14, write one block, data correct
start_file_id = next_file_id;
EXPECT_EQ(OB_SUCCESS, write_pool.get_fd(start_file_id, fd));
EXPECT_EQ(OB_SUCCESS, write_data(fd, buffer.get_align_buf(), blk_len, 0));
if (fd >= 0) {
close(fd);
}
return ret;
}
TEST_F(TestObLogPartitionMetaReader, test_read_partition_meta)
{
ObLogReadFilePool reader_pool;
ObLogDirectReader dreader;
ObLogPartitionMetaReader reader;
ObReadParam param;
ObReadRes res;
file_id_t start_file_id = 11;
param.timeout_ = 5000000; //5s
param.read_len_ = OB_MAX_LOG_BUFFER_SIZE;
param.file_id_ = start_file_id;
EXPECT_EQ(OB_NOT_INIT, reader.read_partition_meta(param, res));
EXPECT_EQ(OB_INVALID_ARGUMENT, reader.init(NULL));
EXPECT_EQ(OB_NOT_INIT, reader.read_partition_meta(param, res));
//Test read
CLOG_LOG(INFO, "begin unittest::test_ob_log_partition_meta_reader");
EXPECT_EQ(OB_SUCCESS, reader_pool.init(path));
EXPECT_EQ(OB_SUCCESS, dreader.init(&reader_pool, &cache));
EXPECT_EQ(OB_SUCCESS, reader.init(&dreader));
EXPECT_EQ(OB_INIT_TWICE, reader.init(&dreader));
EXPECT_EQ(OB_SUCCESS, write_file());
//Test read first block
param.file_id_ = start_file_id;
EXPECT_EQ(OB_SUCCESS, reader.read_partition_meta(param, res));
EXPECT_EQ((int64_t)data_size, res.data_len_);
param.file_id_ = param.file_id_ + 1;
EXPECT_EQ(OB_INVALID_DATA, reader.read_partition_meta(param, res));
param.file_id_ = param.file_id_ + 1;
EXPECT_EQ(OB_INVALID_DATA, reader.read_partition_meta(param, res));
param.file_id_ = param.file_id_ + 1;
EXPECT_EQ(OB_READ_NOTHING, reader.read_partition_meta(param, res));
//Test failure
param.reset();
EXPECT_EQ(OB_INVALID_ARGUMENT, reader.read_partition_meta(param, res));
//Test deserialize failed
param.file_id_ = start_file_id;
param.read_len_ = 1;
param.timeout_ = 5000000; //5s
EXPECT_EQ(OB_DESERIALIZE_ERROR, reader.read_partition_meta(param, res));
//Test invalid argument
param.file_id_ = start_file_id;
param.read_len_ = 1;
param.timeout_ = 0;
res.reset();
EXPECT_EQ(OB_INVALID_ARGUMENT, reader.read_partition_meta(param, res));
}
} // end namespace unittest
} // end namespace oceanbase
int main(int argc, char **argv)
{
OB_LOGGER.set_file_name("test_ob_log_partition_meta_reader.log", true);
OB_LOGGER.set_log_level("INFO");
CLOG_LOG(INFO, "begin unittest::test_ob_log_partition_meta_reader");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册