提交 529e541d 编写于 作者: O obdev 提交者: ob-robot

fix mysqltest

上级 362f7075
......@@ -740,22 +740,22 @@ int ObKVGlobalCache::reload_wash_interval()
bool is_exist = false;
if (OB_FAIL(TG_TASK_EXIST(lib::TGDefIDs::KVCacheWash, wash_task_, is_exist))) {
COMMON_LOG(ERROR, "failed to check wash task exist", K(ret));
} else if (is_exist) {
TG_CANCEL(lib::TGDefIDs::KVCacheWash, wash_task_);
} else if (is_exist && OB_FAIL(TG_CANCEL_R(lib::TGDefIDs::KVCacheWash, wash_task_))) {
COMMON_LOG(WARN, "failed to cancel wash task", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::KVCacheWash, wash_task_, wash_interval, true))) {
COMMON_LOG(ERROR, "failed to schedule wash task", K(ret));
}
is_exist = false;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(TG_TASK_EXIST(lib::TGDefIDs::KVCacheRep, replace_task_, is_exist))) {
COMMON_LOG(ERROR, "failed to check replace task exist", K(ret));
} else if (is_exist) {
TG_CANCEL(lib::TGDefIDs::KVCacheRep, replace_task_);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::KVCacheWash, wash_task_, wash_interval, true))) {
COMMON_LOG(ERROR, "failed to schedule wash task", K(ret));
} else if (is_exist && OB_FAIL(TG_CANCEL_R(lib::TGDefIDs::KVCacheRep, replace_task_))) {
COMMON_LOG(WARN, "failed to cancel replace task", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(lib::TGDefIDs::KVCacheRep, replace_task_, wash_interval, true))) {
COMMON_LOG(ERROR, "failed to schedule replace task", K(ret));
} else {
}
if (OB_SUCC(ret)) {
COMMON_LOG(INFO, "success to reload_wash_interval", K(wash_interval));
}
} else if (!inited_) {
......
......@@ -746,6 +746,9 @@ DEF_BOOL(_enable_parallel_minor_merge, OB_TENANT_PARAMETER, "True",
"specifies whether enable parallel minor merge. "
"Value: True:turned on; False: turned off",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_adaptive_compaction, OB_TENANT_PARAMETER, "True",
"specifies whether allow adaptive compaction schedule and information collection",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(compaction_low_thread_score, OB_TENANT_PARAMETER, "0", "[0,100]",
"the current work thread score of low priority compaction. Range: [0,100] in integer. Especially, 0 means default value",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
......
......@@ -583,7 +583,7 @@ int ObIDag::get_next_ready_task(ObITask *&task)
if (ObIDag::DAG_STATUS_NODE_RUNNING == dag_status_) {
ObITask *cur_task = task_list_.get_first();
const ObITask *head = task_list_.get_header();
while (!found && head != cur_task) {
while (!found && head != cur_task && nullptr != cur_task) {
if (0 == cur_task->get_indegree()
&& (ObITask::TASK_STATUS_WAITING == cur_task->get_status()
|| ObITask::TASK_STATUS_RETRY == cur_task->get_status())) {
......
......@@ -170,25 +170,34 @@ OB_INLINE int ObMultipleMerge::check_need_refresh_table(bool &need_refresh)
OB_INLINE int ObMultipleMerge::update_and_report_tablet_stat()
{
int ret = OB_SUCCESS;
EVENT_ADD(ObStatEventIds::STORAGE_READ_ROW_COUNT, scan_cnt_);
access_ctx_->table_store_stat_.access_row_cnt_ += row_stat_.filt_del_count_;
if (NULL != access_ctx_->table_scan_stat_) {
access_ctx_->table_scan_stat_->access_row_cnt_ += row_stat_.filt_del_count_;
access_ctx_->table_scan_stat_->rowkey_prefix_ = access_ctx_->table_store_stat_.rowkey_prefix_;
access_ctx_->table_scan_stat_->bf_filter_cnt_ += access_ctx_->table_store_stat_.bf_filter_cnt_;
access_ctx_->table_scan_stat_->bf_access_cnt_ += access_ctx_->table_store_stat_.bf_access_cnt_;
access_ctx_->table_scan_stat_->empty_read_cnt_ += access_ctx_->table_store_stat_.get_empty_read_cnt();
access_ctx_->table_scan_stat_->fuse_row_cache_hit_cnt_ += access_ctx_->table_store_stat_.fuse_row_cache_hit_cnt_;
access_ctx_->table_scan_stat_->fuse_row_cache_miss_cnt_ += access_ctx_->table_store_stat_.fuse_row_cache_miss_cnt_;
access_ctx_->table_scan_stat_->block_cache_hit_cnt_ += access_ctx_->table_store_stat_.block_cache_hit_cnt_;
access_ctx_->table_scan_stat_->block_cache_miss_cnt_ += access_ctx_->table_store_stat_.block_cache_miss_cnt_;
access_ctx_->table_scan_stat_->row_cache_hit_cnt_ += access_ctx_->table_store_stat_.row_cache_hit_cnt_;
access_ctx_->table_scan_stat_->row_cache_miss_cnt_ += access_ctx_->table_store_stat_.row_cache_miss_cnt_;
bool enable_adaptive_compaction = true;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
enable_adaptive_compaction = tenant_config->_enable_adaptive_compaction;
}
}
if (lib::is_diagnose_info_enabled()) {
collect_merge_stat(access_ctx_->table_store_stat_);
if (enable_adaptive_compaction) {
EVENT_ADD(ObStatEventIds::STORAGE_READ_ROW_COUNT, scan_cnt_);
access_ctx_->table_store_stat_.access_row_cnt_ += row_stat_.filt_del_count_;
if (NULL != access_ctx_->table_scan_stat_) {
access_ctx_->table_scan_stat_->access_row_cnt_ += row_stat_.filt_del_count_;
access_ctx_->table_scan_stat_->rowkey_prefix_ = access_ctx_->table_store_stat_.rowkey_prefix_;
access_ctx_->table_scan_stat_->bf_filter_cnt_ += access_ctx_->table_store_stat_.bf_filter_cnt_;
access_ctx_->table_scan_stat_->bf_access_cnt_ += access_ctx_->table_store_stat_.bf_access_cnt_;
access_ctx_->table_scan_stat_->empty_read_cnt_ += access_ctx_->table_store_stat_.get_empty_read_cnt();
access_ctx_->table_scan_stat_->fuse_row_cache_hit_cnt_ += access_ctx_->table_store_stat_.fuse_row_cache_hit_cnt_;
access_ctx_->table_scan_stat_->fuse_row_cache_miss_cnt_ += access_ctx_->table_store_stat_.fuse_row_cache_miss_cnt_;
access_ctx_->table_scan_stat_->block_cache_hit_cnt_ += access_ctx_->table_store_stat_.block_cache_hit_cnt_;
access_ctx_->table_scan_stat_->block_cache_miss_cnt_ += access_ctx_->table_store_stat_.block_cache_miss_cnt_;
access_ctx_->table_scan_stat_->row_cache_hit_cnt_ += access_ctx_->table_store_stat_.row_cache_hit_cnt_;
access_ctx_->table_scan_stat_->row_cache_miss_cnt_ += access_ctx_->table_store_stat_.row_cache_miss_cnt_;
}
if (lib::is_diagnose_info_enabled()) {
collect_merge_stat(access_ctx_->table_store_stat_);
}
report_tablet_stat();
}
report_tablet_stat();
return ret;
}
......
......@@ -142,7 +142,6 @@ int ObSSTableSecMetaIterator::open(
if (OB_FAIL(ret) || is_prefetch_end_) {
} else if (OB_UNLIKELY(start_key_beyond_range)) {
is_inited_ = true;
set_iter_end();
is_inited_ = true;
}
......
......@@ -710,7 +710,7 @@ int ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
bool need_merge = false;
LOG_DEBUG("schedule_tablet_medium_merge", K(schedule_scn), K(ls_id), K(tablet_id));
if (schedule_scn > 0) {
if (OB_FAIL(check_need_merge_and_schedule(ls_id, tablet, schedule_scn, need_merge))) {
if (OB_FAIL(check_need_merge_and_schedule(ls, tablet, schedule_scn, need_merge))) {
LOG_WARN("failed to check medium merge", K(ret), K(ls_id), K(tablet_id), K(schedule_scn));
}
}
......@@ -773,7 +773,7 @@ int ObMediumCompactionScheduleFunc::freeze_memtable_to_get_medium_info()
}
int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
const ObLSID &ls_id,
ObLS &ls,
ObTablet &tablet,
const int64_t schedule_scn,
bool &need_merge)
......@@ -784,6 +784,7 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
bool can_merge = false;
if (OB_FAIL(ObPartitionMergePolicy::check_need_medium_merge(
ls,
tablet,
schedule_scn,
need_merge,
......@@ -794,7 +795,7 @@ int ObMediumCompactionScheduleFunc::check_need_merge_and_schedule(
if (OB_FAIL(tablet.get_medium_compaction_info_list().get_specified_scn_info(schedule_scn, medium_info))) {
LOG_WARN("failed to get specified scn info", K(ret), K(schedule_scn));
} else if (OB_TMP_FAIL(ObTenantTabletScheduler::schedule_merge_dag(
ls_id,
ls.get_ls_id(),
tablet.get_tablet_meta().tablet_id_,
MEDIUM_MERGE,
schedule_scn,
......
......@@ -91,7 +91,7 @@ protected:
ObMediumCompactionInfo &medium_info,
ObGetMergeTablesResult &result);
static int check_need_merge_and_schedule(
const share::ObLSID &ls_id,
ObLS &ls,
ObTablet &tablet,
const int64_t schedule_scn,
bool &need_merge);
......
......@@ -27,6 +27,7 @@
#include "storage/compaction/ob_tenant_compaction_progress.h"
#include "observer/omt/ob_tenant_config_mgr.h"
#include "share/scn.h"
#include "storage/compaction/ob_tenant_tablet_scheduler.h"
using namespace oceanbase;
using namespace common;
......@@ -859,6 +860,7 @@ int ObPartitionMergePolicy::refine_minor_merge_result(
// call this func means have serialized medium compaction clog = medium_snapshot
int ObPartitionMergePolicy::check_need_medium_merge(
ObLS &ls,
storage::ObTablet &tablet,
const int64_t medium_snapshot,
bool &need_merge,
......@@ -877,7 +879,8 @@ int ObPartitionMergePolicy::check_need_medium_merge(
need_merge = last_major->get_snapshot_version() < medium_snapshot;
if (need_merge
&& is_tablet_data_status_complete
&& tablet.get_tablet_meta().max_serialized_medium_scn_ >= medium_snapshot) {
&& tablet.get_tablet_meta().max_serialized_medium_scn_ >= medium_snapshot
&& ObTenantTabletScheduler::check_weak_read_ts_ready(medium_snapshot, ls)) {
can_merge = true;
}
}
......
......@@ -71,6 +71,7 @@ public:
const storage::ObTablet &tablet,
storage::ObGetMergeTablesResult &result);
static int check_need_medium_merge(
ObLS &ls,
storage::ObTablet &tablet,
const int64_t medium_snapshot,
bool &need_merge,
......
......@@ -1281,11 +1281,17 @@ int ObTabletMergeFinishTask::try_schedule_compaction_after_mini(
int tmp_ret = OB_SUCCESS;
const ObTabletID &tablet_id = ctx.param_.tablet_id_;
ObLSID ls_id = ctx.param_.ls_id_;
bool enable_adaptive_compaction = true;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
enable_adaptive_compaction = tenant_config->_enable_adaptive_compaction;
}
}
// report tablet stat
if (0 == ctx.get_merge_info().get_sstable_merge_info().macro_block_count_) {
// empty mini compaction, no need to reprot stat
} else if (OB_TMP_FAIL(try_report_tablet_stat_after_mini(ctx))) {
} else if (enable_adaptive_compaction && OB_TMP_FAIL(try_report_tablet_stat_after_mini(ctx))) {
LOG_WARN("failed to report table stat after mini compaction", K(tmp_ret), K(ls_id), K(tablet_id));
}
if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
......
......@@ -274,8 +274,13 @@ int ObTenantTabletScheduler::reload_tenant_config()
} else if (is_stop_) {
// do nothing
} else if (schedule_interval_ != merge_schedule_interval) {
if (OB_FAIL(restart_schedule_timer_task(merge_schedule_interval))) {
if (OB_FAIL(restart_schedule_timer_task(merge_schedule_interval, medium_loop_tg_id_, medium_loop_task_))) {
LOG_WARN("failed to reload new merge schedule interval", K(merge_schedule_interval));
} else if (OB_FAIL(restart_schedule_timer_task(merge_schedule_interval, merge_loop_tg_id_, merge_loop_task_))) {
LOG_WARN("failed to reload new merge schedule interval", K(merge_schedule_interval));
} else {
schedule_interval_ = merge_schedule_interval;
LOG_INFO("succeeded to reload new merge schedule interval", K(merge_schedule_interval));
}
}
return ret;
......@@ -522,7 +527,7 @@ int ObTenantTabletScheduler::schedule_merge(const int64_t broadcast_version)
"last_merged_version",
merged_version_);
if (OB_FAIL(restart_schedule_timer_task(CHECK_WEAK_READ_TS_SCHEDULE_INTERVAL))) {
if (OB_FAIL(restart_schedule_timer_task(CHECK_WEAK_READ_TS_SCHEDULE_INTERVAL, medium_loop_tg_id_, medium_loop_task_))) {
LOG_WARN("failed to restart schedule timer task", K(ret));
}
}
......@@ -870,6 +875,13 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
bool is_leader = false;
bool could_major_merge = false;
const int64_t major_frozen_scn = get_frozen_version();
bool enable_adaptive_compaction = true;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
enable_adaptive_compaction = tenant_config->_enable_adaptive_compaction;
}
}
if (MTL(ObTenantTabletScheduler *)->could_major_merge_start()) {
could_major_merge = true;
......@@ -933,12 +945,14 @@ int ObTenantTabletScheduler::schedule_ls_medium_merge(
} else if (ObTimeUtility::fast_current_time() <
tablet->get_medium_compaction_info_list().get_wait_check_medium_scn() + WAIT_MEDIUM_CHECK_THRESHOLD) {
// need wait 10 mins before schedule meta major
} else if (OB_TMP_FAIL(schedule_tablet_meta_major_merge(ls_handle, tablet_handle))) {
} else if (enable_adaptive_compaction && OB_TMP_FAIL(schedule_tablet_meta_major_merge(ls_handle, tablet_handle))) {
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
LOG_WARN("failed to schedule tablet merge", K(tmp_ret), K(ls_id), K(tablet_id));
}
}
} else if (could_major_merge && OB_TMP_FAIL(func.schedule_next_medium_for_leader(
} else if (could_major_merge
&& (!tablet_merge_finish || enable_adaptive_compaction)
&& OB_TMP_FAIL(func.schedule_next_medium_for_leader(
tablet_merge_finish ? 0 : merge_version))) { // schedule another round
LOG_WARN("failed to schedule next medium", K(tmp_ret), K(ls_id), K(tablet_id));
} else {
......@@ -1050,8 +1064,8 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
}
} else {
schedule_stats_.check_weak_read_ts_cnt_++;
if (OB_FAIL(restart_schedule_timer_task(CHECK_WEAK_READ_TS_SCHEDULE_INTERVAL))) {
LOG_WARN("failed to restart schedule timer task", K(ret));
if (OB_FAIL(restart_schedule_timer_task(CHECK_WEAK_READ_TS_SCHEDULE_INTERVAL, medium_loop_tg_id_, medium_loop_task_))) {
LOG_WARN("failed to restart schedule timer task", K(ret), K(medium_loop_tg_id_));
}
}
......@@ -1097,21 +1111,19 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
return ret;
}
int ObTenantTabletScheduler::restart_schedule_timer_task(const int64_t schedule_interval)
int ObTenantTabletScheduler::restart_schedule_timer_task(
const int64_t schedule_interval,
const int64_t tg_id,
common::ObTimerTask &timer_task)
{
int ret = OB_SUCCESS;
bool is_exist = false;
if (OB_FAIL(TG_TASK_EXIST(medium_loop_tg_id_, medium_loop_task_, is_exist))) {
LOG_ERROR("failed to check merge schedule task exist", K(ret));
} else if (is_exist) {
TG_CANCEL(medium_loop_tg_id_, medium_loop_task_);
}
if (OB_FAIL(ret)) {
} else if (is_exist && OB_FAIL(TG_CANCEL_R(medium_loop_tg_id_, medium_loop_task_))) {
LOG_WARN("failed to cancel task", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(medium_loop_tg_id_, medium_loop_task_, schedule_interval, true/*repeat*/))) {
LOG_WARN("Fail to schedule minor merge scan task", K(ret));
} else {
schedule_interval_ = schedule_interval;
LOG_INFO("succeeded to reload new merge schedule interval", K(schedule_interval));
}
return ret;
}
......
......@@ -166,7 +166,10 @@ private:
int schedule_ls_minor_merge(
ObLS &ls);
int try_remove_old_table(ObLS &ls);
int restart_schedule_timer_task(const int64_t interval);
int restart_schedule_timer_task(
const int64_t interval,
const int64_t tg_id,
common::ObTimerTask &timer_task);
int update_report_scn_as_ls_leader(
ObLS &ls);
......
......@@ -152,6 +152,8 @@ int ObStorageSchemaRecorder::inner_replay_clog(
} else if (OB_FAIL(tmp_tablet_handle.get_obj()->save_multi_source_data_unit(&replay_storage_schema, scn,
true/*for_replay*/, memtable::MemtableRefOp::NONE))) {
LOG_WARN("failed to save storage schema", K(ret), K_(tablet_id), K(replay_storage_schema));
} else {
LOG_INFO("success to replay schema clog", K(ret), K(replay_storage_schema.get_schema_version()), K(replay_storage_schema.compat_mode_));
}
replay_storage_schema.reset();
tmp_tablet_handle.reset();
......
......@@ -1487,8 +1487,17 @@ int ObTablet::do_rowkey_exists(
// ROWKEY IN_ROW_CACHE / NOT EXIST
} else if (FALSE_IT(store_ctx.tablet_stat_.exist_row_read_table_cnt_ = check_table_cnt)) {
} else if (FALSE_IT(store_ctx.tablet_stat_.exist_row_total_table_cnt_ = table_iter.count())) {
} else if (OB_TMP_FAIL(MTL(ObTenantTabletStatMgr *)->report_stat(store_ctx.tablet_stat_))) {
LOG_WARN("failed to report tablet stat", K(tmp_ret), K(stat));
} else {
bool enable_adaptive_compaction = true;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
enable_adaptive_compaction = tenant_config->_enable_adaptive_compaction;
}
}
if (enable_adaptive_compaction && OB_TMP_FAIL(MTL(ObTenantTabletStatMgr *)->report_stat(store_ctx.tablet_stat_))) {
LOG_WARN("failed to report tablet stat", K(tmp_ret), K(stat));
}
}
}
......@@ -1539,8 +1548,17 @@ int ObTablet::do_rowkeys_exist(ObTableStoreIterator &tables_iter, ObRowsInfo &ro
int tmp_ret = OB_SUCCESS;
if (0 == access_ctx.table_store_stat_.exist_row_.empty_read_cnt_) {
// ROWKEY IN_ROW_CACHE / NOT EXIST
} else if (OB_TMP_FAIL(MTL(ObTenantTabletStatMgr *)->report_stat(tablet_stat))) {
LOG_WARN("failed to report tablet stat", K(tmp_ret), K(tablet_stat));
} else {
bool enable_adaptive_compaction = true;
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
enable_adaptive_compaction = tenant_config->_enable_adaptive_compaction;
}
}
if (enable_adaptive_compaction && OB_TMP_FAIL(MTL(ObTenantTabletStatMgr *)->report_stat(tablet_stat))) {
LOG_WARN("failed to report tablet stat", K(tmp_ret), K(tablet_stat));
}
}
}
return ret;
......@@ -2451,7 +2469,7 @@ int ObTablet::get_kept_multi_version_start(
uint64_t compat_version = 0;
if (OB_TMP_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), compat_version))) {
LOG_WARN("fail to get data version", K(tmp_ret));
} else if (compat_version >= DATA_VERSION_4_1_0_0) {
} else if (compat_version >= DATA_VERSION_4_1_0_0 && ls.get_min_reserved_snapshot() > 0) {
ls_min_reserved_snapshot = ls.get_min_reserved_snapshot();
}
if (OB_SUCC(ret)) {
......
......@@ -129,6 +129,9 @@ int ObTabletMemtableMgr::init_storage_recorder(
TRANS_LOG(WARN, "failed to init schema recorder", K(ret), K(max_saved_schema_version), KP(log_handler));
} else if (OB_FAIL(medium_info_recorder_.init(ls_id, tablet_id, max_saved_medium_scn, log_handler))) {
TRANS_LOG(WARN, "failed to init medium info recorder", K(ret), K(max_saved_medium_scn), KP(log_handler));
} else {
TRANS_LOG(INFO, "success to init storage recorder", K(ret), K(ls_id), K(tablet_id), K(max_saved_schema_version),
K(max_saved_medium_scn), K(compat_mode));
}
return ret;
}
......
......@@ -27,3 +27,4 @@ alter tenant mysql set variables ob_enable_truncate_flashback = 'on';
alter system set ob_compaction_schedule_interval = '10s' tenant all;
alter system set merger_check_interval = '10s' tenant all;
alter system set enable_sql_extension=true tenant all;
alter system set _enable_adaptive_compaction = false tenant all;
......@@ -228,6 +228,7 @@ _cache_wash_interval
_chunk_row_store_mem_limit
_ctx_memory_limit
_data_storage_io_timeout
_enable_adaptive_compaction
_enable_block_file_punch_hole
_enable_compaction_diagnose
_enable_convert_real_to_decimal
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册