diff --git a/src/storage/compaction/ob_partition_merge_progress.cpp b/src/storage/compaction/ob_partition_merge_progress.cpp index 6d5d97613f5fed2771f7db1a2ac1f8035c1ab372..0aa4511bf21c48f4b3b2619d3ba63807da6eb9bd 100644 --- a/src/storage/compaction/ob_partition_merge_progress.cpp +++ b/src/storage/compaction/ob_partition_merge_progress.cpp @@ -134,7 +134,6 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx) LOG_WARN("get invalid arguments", K(ret), K(ctx)); } else { const ObIArray &tables = ctx->tables_handle_.get_tables(); - ObITable *table = nullptr; int64_t old_major_data_size = 0; if (OB_UNLIKELY(0 == tables.count() || NULL == tables.at(0))) { ret = OB_ERR_UNEXPECTED; @@ -162,26 +161,30 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx) } else { estimate_row_cnt_ = MAX(1, part_estimate.physical_row_count_); for (int64_t i = tables.count() - 1; i >= 0; --i) { - if (OB_ISNULL(table = tables.at(i))) { + if (OB_UNLIKELY(nullptr == tables.at(i) || !tables.at(i)->is_memtable())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null table", K(ret), K(i), K(table)); - } else if (table->is_memtable()) { - estimate_occupy_size_ = static_cast(table)->get_occupied_size(); + LOG_WARN("get unexpected null table", K(ret), K(i), K(tables.at(i))); + } else { + estimate_occupy_size_ += static_cast(tables.at(i))->get_occupied_size(); } } } } } else { + int64_t total_macro_block_cnt = 0; + ObSSTable *table = nullptr; for (int64_t i = 0; OB_SUCC(ret) && i < tables.count(); ++i) { - if (OB_ISNULL(table = tables.at(i))) { + if (OB_UNLIKELY(nullptr == tables.at(i) || !tables.at(i)->is_sstable())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("get unexpected null table", K(ret), K(i), K(table)); - } else if (table->is_sstable()) { - const ObSSTableBasicMeta &meta = static_cast(table)->get_meta().get_basic_meta(); + LOG_WARN("get unexpected table", K(ret), K(i), KPC(tables.at(i))); + } else { + table = static_cast(tables.at(i)); + const ObSSTableBasicMeta &meta = table->get_meta().get_basic_meta(); if (meta.get_total_macro_block_count() <= 0) { LOG_DEBUG("table is empty, skip it", K(i), KPC(static_cast(table))); continue; } else { + total_macro_block_cnt += meta.get_total_macro_block_count() - meta.get_total_use_old_macro_block_count(); estimate_row_cnt_ += meta.row_count_; estimate_occupy_size_ += meta.occupy_size_; if (table->is_major_sstable()) { @@ -190,6 +193,10 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx) } } } + if (0 != total_macro_block_cnt) { + estimate_row_cnt_ = (0 == estimate_row_cnt_) ? DEFAULT_ROW_CNT_PER_MACRO_BLOCK * total_macro_block_cnt : estimate_row_cnt_; + estimate_occupy_size_ = (0 == estimate_occupy_size_) ? common::OB_DEFAULT_MACRO_BLOCK_SIZE * total_macro_block_cnt : estimate_occupy_size_; + } } if (OB_SUCC(ret)) { @@ -241,9 +248,6 @@ int ObPartitionMergeProgress::update_merge_progress( if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObPartitionMergeProgress not inited", K(ret)); - } else if (OB_UNLIKELY(0 == estimate_row_cnt_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected zero estimate_total_units_", K(ret)); } else if (OB_UNLIKELY(idx < 0 || idx >= concurrent_cnt_ || scanned_row_cnt < 0 || output_block_cnt < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("get invalid arguments", K(ret), K(idx), K(concurrent_cnt_), K(scanned_row_cnt), K(output_block_cnt)); @@ -264,7 +268,7 @@ int ObPartitionMergeProgress::update_merge_progress( } if (scanned_row_cnt >= estimate_row_cnt_) { - estimate_row_cnt_ += scanned_row_cnt - pre_scanned_row_cnt_; + estimate_row_cnt_ += MAX(scanned_row_cnt / DEFAULT_INCREMENT_ROW_FACTOR, 1); avg_row_length_ = estimate_occupy_size_ * 1.0 / estimate_row_cnt_; } @@ -295,14 +299,14 @@ int ObPartitionMergeProgress::update_merge_info(ObSSTableMergeInfo &merge_info) void ObPartitionMergeProgress::update_estimated_finish_time_() { int64_t current_time = ObTimeUtility::fast_current_time(); - if (0 == pre_scanned_row_cnt_) { + if (0 == pre_scanned_row_cnt_) { // first time to init merge_progress int64_t spend_time = estimate_occupy_size_ / common::OB_DEFAULT_MACRO_BLOCK_SIZE * ObCompactionProgress::MERGE_SPEED + ObCompactionProgress::EXTRA_TIME; estimated_finish_time_ = spend_time + current_time + UPDATE_INTERVAL; } else { - int64_t rest_time = (estimate_row_cnt_ - pre_scanned_row_cnt_) - * (current_time - merge_dag_->get_start_time()) / pre_scanned_row_cnt_; - estimated_finish_time_ = current_time + rest_time + UPDATE_INTERVAL; + int64_t delta_row_cnt = estimate_row_cnt_ - pre_scanned_row_cnt_; + int64_t rest_time = MAX(1, delta_row_cnt) * (current_time - merge_dag_->get_start_time()) / pre_scanned_row_cnt_; + estimated_finish_time_ = MAX(estimated_finish_time_, current_time + rest_time + UPDATE_INTERVAL); } } @@ -378,7 +382,7 @@ int ObPartitionMajorMergeProgress::update_merge_progress( } if (scanned_row_cnt >= estimate_row_cnt_) { - estimate_row_cnt_ += scanned_row_cnt - pre_scanned_row_cnt_; + estimate_row_cnt_ += MAX(scanned_row_cnt / DEFAULT_INCREMENT_ROW_FACTOR, 1); avg_row_length_ = estimate_occupy_size_ * 1.0 / estimate_row_cnt_; } diff --git a/src/storage/compaction/ob_partition_merge_progress.h b/src/storage/compaction/ob_partition_merge_progress.h index 995226661c99f651bc0e831d7881879ca34ecdb6..0624f4f77f7535e5fd048cca5089c086767f63be 100644 --- a/src/storage/compaction/ob_partition_merge_progress.h +++ b/src/storage/compaction/ob_partition_merge_progress.h @@ -50,6 +50,8 @@ public: public: static const int32_t UPDATE_INTERVAL = 2 * 1000 * 1000; // 2 second static const int32_t NORMAL_UPDATE_PARAM = 300; + static const int32_t DEFAULT_ROW_CNT_PER_MACRO_BLOCK = 1000; + static const int32_t DEFAULT_INCREMENT_ROW_FACTOR = 10; protected: int estimate(ObTabletMergeCtx *ctx); void update_estimated_finish_time_();