提交 ead14a9b 编写于 作者: Y yy0 提交者: LINGuanRen

optimize trans part ctx recycle

上级 6f0bdfec
......@@ -965,13 +965,17 @@ int ObPGSSTableMgr::GetCleanOutLogIdFunctor::operator()(ObITable& table, bool& i
{
int ret = OB_SUCCESS;
is_full = false;
if (table.get_ref() > 0 && table.is_multi_version_minor_sstable() && table.get_end_log_ts() < clean_out_log_ts_) {
clean_out_log_ts_ = table.get_end_log_ts();
if (table.get_ref() > 0 && table.is_multi_version_minor_sstable()) {
if (table.is_complement_minor_sstable()) {
min_complement_log_ts_ = std::min(min_complement_log_ts_, table.get_end_log_ts());
} else {
clean_out_log_ts_.push_back(table.get_end_log_ts());
}
}
return ret;
}
int ObPGSSTableMgr::get_clean_out_log_ts(int64_t& clean_out_log_ts)
int ObPGSSTableMgr::get_clean_out_log_ts(ObIArray<int64_t> &clean_out_log_ts)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
......@@ -982,7 +986,21 @@ int ObPGSSTableMgr::get_clean_out_log_ts(int64_t& clean_out_log_ts)
if (OB_FAIL(table_map_.foreach (functor))) {
LOG_WARN("fail to foreach table map", K(ret));
} else {
clean_out_log_ts = functor.get_clean_out_log_ts();
functor.sort();
const ObIArray<int64_t> &log_ts_array = functor.get_clean_out_log_ts();
int64_t last_log_ts = -1;
// duplicate log ts may exist, need to remove duplicate
for (int64_t i = 0; OB_SUCC(ret) && i < log_ts_array.count(); i++) {
if (log_ts_array.at(i) < functor.get_min_complement_log_ts()) {
if (last_log_ts != log_ts_array.at(i)) {
if (OB_FAIL(clean_out_log_ts.push_back(log_ts_array.at(i)))) {
LOG_WARN("failed to push back log ts", K(ret));
} else {
last_log_ts = log_ts_array.at(i);
}
}
}
}
}
}
return ret;
......
......@@ -19,6 +19,7 @@
#include "blocksstable/ob_block_sstable_struct.h"
#include "storage/ob_i_table.h"
#include "storage/ob_sstable.h"
#include "lib/container/ob_se_array.h"
namespace oceanbase {
namespace storage {
......@@ -56,7 +57,7 @@ public:
{
return file_handle_.assign(file_handle);
}
int get_clean_out_log_ts(int64_t& clean_out_log_ts);
int get_clean_out_log_ts(ObIArray<int64_t> &clean_out_log_ts);
private:
static const int64_t DEFAULT_HASH_BUCKET_COUNT = 100;
......@@ -130,17 +131,29 @@ private:
};
class GetCleanOutLogIdFunctor : public TableMap::ForeachFunctor {
public:
GetCleanOutLogIdFunctor() : clean_out_log_ts_(INT64_MAX)
{}
GetCleanOutLogIdFunctor() : clean_out_log_ts_()
{
min_complement_log_ts_ = INT64_MAX;
clean_out_log_ts_.push_back(0);
}
virtual ~GetCleanOutLogIdFunctor() = default;
int64_t get_clean_out_log_ts() const
const common::ObIArray<int64_t> &get_clean_out_log_ts() const
{
return clean_out_log_ts_;
}
virtual int operator()(ObITable& table, bool& is_full) override;
void sort()
{
std::sort(clean_out_log_ts_.begin(), clean_out_log_ts_.end());
}
int64_t get_min_complement_log_ts()
{
return min_complement_log_ts_;
}
virtual int operator()(ObITable &table, bool &is_full) override;
private:
int64_t clean_out_log_ts_;
int64_t min_complement_log_ts_;
common::ObSEArray<int64_t, 10> clean_out_log_ts_;
};
void destroy();
int write_add_sstable_log(ObSSTable& sstable);
......
......@@ -6917,22 +6917,28 @@ int ObPGStorage::remove_mem_ctx_for_trans_ctx_(memtable::ObMemtable* mt)
return ret;
}
int ObPGStorage::get_max_cleanout_log_ts(int64_t& max_cleanout_log_ts)
int ObPGStorage::get_max_cleanout_log_ts(ObIArray<int64_t> &sstable_cleanout_log_ts)
{
int ret = OB_SUCCESS;
int64_t sstable_clean_out_log_ts = 0;
max_cleanout_log_ts = 0;
ObTablesHandle handle;
sstable_cleanout_log_ts.reset();
ObSEArray<int64_t, 10> log_ts_array;
int64_t last_replay_log_ts = 0;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("pg storage is not inited", K(ret));
} else if (OB_FAIL(sstable_mgr_.get_clean_out_log_ts(sstable_clean_out_log_ts))) {
} else if (OB_FAIL(sstable_mgr_.get_clean_out_log_ts(log_ts_array))) {
LOG_WARN("failed to get clean out log id", K(ret), K(pkey_));
} else {
{
TCRLockGuard lock_guard(lock_);
max_cleanout_log_ts =
std::min(sstable_clean_out_log_ts, meta_->storage_info_.get_data_info().get_last_replay_log_ts());
last_replay_log_ts = meta_->storage_info_.get_data_info().get_last_replay_log_ts();
}
for (int64_t i = 0; OB_SUCC(ret) && i < log_ts_array.count(); i++) {
if (log_ts_array.at(i) < last_replay_log_ts) {
if (OB_FAIL(sstable_cleanout_log_ts.push_back(log_ts_array.at(i)))) {
LOG_WARN("failed to push back log ts", K(ret), K(pkey_));
}
}
}
}
return ret;
......@@ -6981,7 +6987,7 @@ int ObPGStorage::get_min_schema_version(int64_t& min_schema_version)
int ObPGStorage::clear_unused_trans_status()
{
int ret = OB_SUCCESS;
int64_t max_cleanout_log_ts = 0;
ObSEArray<int64_t, 10> max_cleanout_log_ts;
if (OB_FAIL(get_max_cleanout_log_ts(max_cleanout_log_ts))) {
LOG_WARN("failed to get max cleanout log id", K(ret), K_(pkey));
} else if (OB_FAIL(txs_->clear_unused_trans_status(pkey_, max_cleanout_log_ts))) {
......
......@@ -351,7 +351,7 @@ public:
transaction::ObTransService& trans_service, const int64_t frozen_version);
int restore_mem_trans_table(ObSSTable& trans_sstable);
int restore_mem_trans_table();
int get_max_cleanout_log_ts(int64_t& max_cleanout_log_ts);
int get_max_cleanout_log_ts(ObIArray<int64_t> &max_cleanout_log_ts);
int clear_unused_trans_status();
int physical_flashback(const int64_t flashback_scn);
int set_meta_block_list(const common::ObIArray<blocksstable::MacroBlockId>& meta_block_list);
......
......@@ -1724,7 +1724,7 @@ int ObPartitionTransCtxMgr::get_max_trans_version_before_given_log_ts(
return ret;
}
int ObPartitionTransCtxMgr::clear_unused_trans_status(const int64_t max_cleanout_log_ts)
int ObPartitionTransCtxMgr::clear_unused_trans_status(const ObIArray<int64_t> &max_cleanout_log_ts)
{
int ret = OB_SUCCESS;
......@@ -5099,7 +5099,8 @@ int ObPartTransCtxMgr::get_max_trans_version_before_given_log_ts(
return ret;
}
int ObPartTransCtxMgr::clear_unused_trans_status(const ObPartitionKey& pkey, const int64_t max_cleanout_log_ts)
int ObPartTransCtxMgr::clear_unused_trans_status(
const ObPartitionKey &pkey, const ObIArray<int64_t> &max_cleanout_log_ts)
{
int ret = OB_SUCCESS;
ObPartitionTransCtxMgr* ctx_mgr = nullptr;
......
......@@ -323,8 +323,8 @@ public:
const transaction::ObTransID& data_trans_id, const int32_t sql_sequence,
storage::ObStoreRowLockState& lock_state);
int get_max_trans_version_before_given_log_ts(
const int64_t log_ts, int64_t& max_trans_version, bool& is_all_rollback_trans);
int clear_unused_trans_status(const int64_t max_cleanout_log_ts);
const int64_t log_ts, int64_t &max_trans_version, bool &is_all_rollback_trans);
int clear_unused_trans_status(const ObIArray<int64_t> &max_cleanout_log_ts);
int has_terminated_trx_in_given_log_ts_range(
const int64_t start_log_ts, const int64_t end_log_ts, bool& has_terminated_trx);
bool is_clog_aggregation_enabled();
......@@ -975,8 +975,8 @@ public:
int get_applied_log_ts(const common::ObPartitionKey& pkey, int64_t& applied_log_ts);
int get_max_trans_version_before_given_log_ts(
const ObPartitionKey& pkey, const int64_t log_ts, int64_t& max_trans_version, bool& is_all_rollback_trans);
int clear_unused_trans_status(const ObPartitionKey& pkey, const int64_t max_cleanout_log_id);
const ObPartitionKey &pkey, const int64_t log_ts, int64_t &max_trans_version, bool &is_all_rollback_trans);
int clear_unused_trans_status(const ObPartitionKey &pkey, const ObIArray<int64_t> &max_cleanout_log_id);
int has_terminated_trx_in_given_log_ts_range(
const ObPartitionKey& pkey, const int64_t start_log_ts, const int64_t end_log_ts, bool& has_terminated_trx);
int set_last_restore_log_info(
......
......@@ -3123,13 +3123,15 @@ public:
: status_(ObTransTableStatusType::RUNNING),
trans_version_(common::OB_INVALID_VERSION),
undo_status_(),
start_log_ts_(0),
end_log_ts_(INT64_MAX)
{}
ObTransTableStatusType status_;
int64_t trans_version_;
ObTransUndoStatus undo_status_;
int64_t start_log_ts_;
int64_t end_log_ts_;
TO_STRING_KV(K_(status), K_(trans_version), K_(undo_status), K_(end_log_ts));
TO_STRING_KV(K_(status), K_(trans_version), K_(undo_status), K_(start_log_ts), K_(end_log_ts));
};
class ObTransTableStatusInfo {
......
......@@ -1752,7 +1752,8 @@ private:
class ObCleanTransTableFunctor {
public:
explicit ObCleanTransTableFunctor(const int64_t max_cleanout_log_ts) : max_cleanout_log_ts_(max_cleanout_log_ts)
explicit ObCleanTransTableFunctor(const ObIArray<int64_t> &max_cleanout_log_ts)
: max_cleanout_log_ts_(max_cleanout_log_ts)
{}
bool operator()(const ObTransID& trans_id, ObTransCtx* ctx_base)
{
......@@ -1767,9 +1768,10 @@ public:
TRANS_LOG(WARN, "failed to get trans table status info", K(ret));
} else if ((ObTransTableStatusType::COMMIT == trans_info.status_ ||
ObTransTableStatusType::ABORT == trans_info.status_) &&
ctx->is_exiting()) {
if (trans_info.end_log_ts_ <= max_cleanout_log_ts_) {
if (ctx->is_dirty_trans()) {
ctx->is_exiting() && ctx->is_dirty_trans()) {
for (int64_t i = 0; i < max_cleanout_log_ts_.count() - 1; i++) {
if (trans_info.start_log_ts_ > max_cleanout_log_ts_.at(i) &&
trans_info.end_log_ts_ <= max_cleanout_log_ts_.at(i + 1)) {
ctx->remove_trans_table();
}
}
......@@ -1779,7 +1781,7 @@ public:
}
private:
int64_t max_cleanout_log_ts_;
const ObIArray<int64_t> &max_cleanout_log_ts_;
};
class ObCheckHasTerminatedTrxBeforeLogFunction {
......
......@@ -10690,6 +10690,8 @@ int ObPartTransCtx::get_trans_state_and_version_without_lock(ObTransStatusInfo&
if (trans_info.status_ == ObTransTableStatusType::COMMIT || trans_info.status_ == ObTransTableStatusType::ABORT) {
trans_info.end_log_ts_ = ATOMIC_LOAD(&end_log_ts_);
}
trans_info.start_log_ts_ = ATOMIC_LOAD(&min_log_ts_);
return ret;
}
......
......@@ -9042,7 +9042,8 @@ int ObTransService::get_max_trans_version_before_given_log_ts(
return ret;
}
int ObTransService::clear_unused_trans_status(const ObPartitionKey& pg_key, const int64_t max_cleanout_log_ts)
int ObTransService::clear_unused_trans_status(
const ObPartitionKey &pg_key, const ObIArray<int64_t> &max_cleanout_log_ts)
{
int ret = OB_SUCCESS;
......
......@@ -554,8 +554,8 @@ public:
return &trans_status_mgr_;
}
int get_max_trans_version_before_given_log_ts(
const ObPartitionKey& pkey, const int64_t log_ts, int64_t& max_trans_version, bool& is_all_rollback_trans);
int clear_unused_trans_status(const ObPartitionKey& pg_key, const int64_t max_cleanout_log_ts);
const ObPartitionKey &pkey, const int64_t log_ts, int64_t &max_trans_version, bool &is_all_rollback_trans);
int clear_unused_trans_status(const ObPartitionKey &pg_key, const ObIArray<int64_t> &max_cleanout_log_ts);
virtual int has_terminated_trx_in_given_log_ts_range(
const ObPartitionKey& pkey, const int64_t start_log_ts, const int64_t end_log_ts, bool& has_terminated_trx);
ObPartTransSameLeaderBatchRpcMgr* get_part_trans_same_leader_batch_rpc_mgr()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册