提交 d6d35f67 编写于 作者: O obdev 提交者: wangzelin.wzl

cherry pick commits to opensource

上级 a9e70b7b
...@@ -387,7 +387,7 @@ int ObTransCallbackList::mark_frozen_data( ...@@ -387,7 +387,7 @@ int ObTransCallbackList::mark_frozen_data(
TRANS_LOG(WARN, "memtable is null", KP(frozen_memtable), KP(active_memtable), K(ret)); TRANS_LOG(WARN, "memtable is null", KP(frozen_memtable), KP(active_memtable), K(ret));
} else if (FALSE_IT(first_memtable = get_first_memtable())) { } else if (FALSE_IT(first_memtable = get_first_memtable())) {
} else if (OB_ISNULL(first_memtable) || first_memtable == active_memtable) { } else if (OB_ISNULL(first_memtable) || first_memtable == active_memtable) {
TRANS_LOG(INFO, TRANS_LOG(DEBUG,
"skip mark frozen data", "skip mark frozen data",
K(callback_mgr_.get_ctx()), K(callback_mgr_.get_ctx()),
KP(first_memtable), KP(first_memtable),
...@@ -415,11 +415,11 @@ int ObTransCallbackList::mark_frozen_data( ...@@ -415,11 +415,11 @@ int ObTransCallbackList::mark_frozen_data(
if (!marked) { if (!marked) {
int64_t i = 0; int64_t i = 0;
for (ObITransCallback* iter = start; NULL != iter && iter != end; iter = iter->get_prev()) { for (ObITransCallback* iter = start; NULL != iter && iter != end; iter = iter->get_prev()) {
TRANS_LOG(INFO, "debug: iter callback", K(*iter)); TRANS_LOG(DEBUG, "debug: iter callback", K(*iter));
} }
} }
TRANS_LOG(INFO, "iterate callbacks", K(callback_mgr_.get_ctx()), K(iter_cnt), K(cb_cnt), K(marked)); TRANS_LOG(DEBUG, "iterate callbacks", K(callback_mgr_.get_ctx()), K(iter_cnt), K(cb_cnt), K(marked));
} }
return ret; return ret;
......
...@@ -355,12 +355,9 @@ int ObFreezeInfoSnapshotMgr::inner_get_neighbour_major_freeze( ...@@ -355,12 +355,9 @@ int ObFreezeInfoSnapshotMgr::inner_get_neighbour_major_freeze(
if (snapshot_version < next_info.freeze_ts) { if (snapshot_version < next_info.freeze_ts) {
found = true; found = true;
if (0 == i) { if (0 == i) {
if (!GCTX.is_standby_cluster()) { ret = OB_ENTRY_NOT_EXIST;
ret = OB_ERR_SYS; STORAGE_LOG(WARN, "cannot get neighbour major freeze before bootstrap",
LOG_ERROR("cannot get neighbour major freeze before bootstrap", K(ret), K(snapshot_version), K(next_info)); K(ret), K(snapshot_version), K(next_info));
} else {
ret = OB_ENTRY_NOT_EXIST;
}
} else { } else {
info.next = next_info; info.next = next_info;
info.prev = info_list.at(i - 1); info.prev = info_list.at(i - 1);
...@@ -1133,6 +1130,7 @@ int ObFreezeInfoSnapshotMgr::SchemaCache::init(ObISQLClient& sql_proxy) ...@@ -1133,6 +1130,7 @@ int ObFreezeInfoSnapshotMgr::SchemaCache::init(ObISQLClient& sql_proxy)
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc head node", K(ret)); STORAGE_LOG(WARN, "fail to alloc head node", K(ret));
} else { } else {
p->reset();
head_ = tail_ = p; head_ = tail_ = p;
sql_proxy_ = &sql_proxy; sql_proxy_ = &sql_proxy;
inited_ = true; inited_ = true;
...@@ -1368,6 +1366,7 @@ int ObFreezeInfoSnapshotMgr::SchemaCache::update_freeze_schema( ...@@ -1368,6 +1366,7 @@ int ObFreezeInfoSnapshotMgr::SchemaCache::update_freeze_schema(
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc schema node", K(ret)); STORAGE_LOG(WARN, "fail to alloc schema node", K(ret));
} else { } else {
p->reset();
p->set(tenant_id, freeze_version, schema_version); p->set(tenant_id, freeze_version, schema_version);
insert(p); insert(p);
cnt_++; cnt_++;
......
...@@ -333,13 +333,7 @@ private: ...@@ -333,13 +333,7 @@ private:
schema_node* prev; schema_node* prev;
schema_node* next; schema_node* next;
schema_node() schema_node() { reset(); }
: tenant_id(common::OB_INVALID_ID),
freeze_version(common::OB_INVALID_VERSION),
schema_version(common::OB_INVALID_VERSION),
prev(NULL),
next(NULL)
{}
void set(const uint64_t tenant, const int64_t freeze, const int64_t schema) void set(const uint64_t tenant, const int64_t freeze, const int64_t schema)
{ {
...@@ -348,6 +342,15 @@ private: ...@@ -348,6 +342,15 @@ private:
schema_version = schema; schema_version = schema;
} }
void reset()
{
tenant_id = common::OB_INVALID_ID;
freeze_version = common::OB_INVALID_VERSION;
schema_version = common::OB_INVALID_VERSION;
prev = NULL;
next = NULL;
}
TO_STRING_KV(K(tenant_id), K(freeze_version), K(schema_version)); TO_STRING_KV(K(tenant_id), K(freeze_version), K(schema_version));
}; };
......
...@@ -3823,7 +3823,10 @@ int ObPartitionGroup::freeze_log_and_data_v2_(const bool emergency, const bool f ...@@ -3823,7 +3823,10 @@ int ObPartitionGroup::freeze_log_and_data_v2_(const bool emergency, const bool f
STORAGE_LOG(WARN, "fail to freeze log", K(ret), K(pkey_)); STORAGE_LOG(WARN, "fail to freeze log", K(ret), K(pkey_));
} }
} else if (OB_FAIL(check_range_changed_(old_handle, is_leader, changed))) { } else if (OB_FAIL(check_range_changed_(old_handle, is_leader, changed))) {
if (OB_EAGAIN != ret) { if (OB_STATE_NOT_MATCH == ret) {
STORAGE_LOG(INFO, "skip freeze due to clog state", K(ret), K(pkey_));
ret = OB_SUCCESS;
} else if (OB_EAGAIN != ret) {
STORAGE_LOG(WARN, "failed to check log_id or version range changed", K(ret), K(old_handle)); STORAGE_LOG(WARN, "failed to check log_id or version range changed", K(ret), K(old_handle));
} }
} else if (!changed) { } else if (!changed) {
......
...@@ -1136,7 +1136,11 @@ void ObPartitionService::rollback_partition_register(const ObPartitionKey& pkey, ...@@ -1136,7 +1136,11 @@ void ObPartitionService::rollback_partition_register(const ObPartitionKey& pkey,
} else { } else {
if (rb_rp_eg) { if (rb_rp_eg) {
if (OB_SUCCESS != (err = rp_eg_->remove_partition(pkey))) { if (OB_SUCCESS != (err = rp_eg_->remove_partition(pkey))) {
STORAGE_LOG(ERROR, "rollback partition from replay engine failed", K(pkey), K(err)); if (OB_PARTITION_NOT_EXIST != err) {
STORAGE_LOG(WARN, "rollback partition already been removed", K(err), K(pkey));
} else if (OB_NOT_RUNNING != err) {
STORAGE_LOG(ERROR, "rollback partition from replay engine failed", K(pkey), K(err));
}
} }
} }
if (rb_txs) { if (rb_txs) {
...@@ -1144,7 +1148,7 @@ void ObPartitionService::rollback_partition_register(const ObPartitionKey& pkey, ...@@ -1144,7 +1148,7 @@ void ObPartitionService::rollback_partition_register(const ObPartitionKey& pkey,
if (OB_SUCCESS != (err = txs_->remove_partition(pkey, graceful))) { if (OB_SUCCESS != (err = txs_->remove_partition(pkey, graceful))) {
if (OB_PARTITION_NOT_EXIST == err) { if (OB_PARTITION_NOT_EXIST == err) {
STORAGE_LOG(WARN, "rollback partition already been removed", K(err), K(pkey)); STORAGE_LOG(WARN, "rollback partition already been removed", K(err), K(pkey));
} else { } else if (OB_NOT_RUNNING != err) {
STORAGE_LOG(ERROR, "rollback partition from transaction service failed", K(pkey), K(err), K(graceful)); STORAGE_LOG(ERROR, "rollback partition from transaction service failed", K(pkey), K(err), K(graceful));
} }
} }
...@@ -1871,6 +1875,7 @@ int ObPartitionService::create_batch_pg_partitions( ...@@ -1871,6 +1875,7 @@ int ObPartitionService::create_batch_pg_partitions(
batch_res.reuse(); batch_res.reuse();
int64_t start_timestamp = ObTimeUtility::current_time(); int64_t start_timestamp = ObTimeUtility::current_time();
const int64_t CLOG_TIMEOUT = 10 * 1000 * 1000; const int64_t CLOG_TIMEOUT = 10 * 1000 * 1000;
bool revert_cnt = false;
if (OB_UNLIKELY(!is_inited_)) { if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
...@@ -1885,6 +1890,7 @@ int ObPartitionService::create_batch_pg_partitions( ...@@ -1885,6 +1890,7 @@ int ObPartitionService::create_batch_pg_partitions(
STORAGE_LOG(WARN, "partition group not master, need retry", K(batch_arg)); STORAGE_LOG(WARN, "partition group not master, need retry", K(batch_arg));
} else if (OB_FAIL(try_inc_total_partition_cnt(target_batch_arg.count(), true /*need_check*/))) { } else if (OB_FAIL(try_inc_total_partition_cnt(target_batch_arg.count(), true /*need_check*/))) {
LOG_WARN("failed to inc total_partition_cnt", K(ret)); LOG_WARN("failed to inc total_partition_cnt", K(ret));
} else if (FALSE_IT(revert_cnt = true)) {
} else if (OB_FAIL(partitions.reserve(target_batch_arg.count()))) { } else if (OB_FAIL(partitions.reserve(target_batch_arg.count()))) {
STORAGE_LOG(WARN, "reserve array failed", K(ret), "count", target_batch_arg.count()); STORAGE_LOG(WARN, "reserve array failed", K(ret), "count", target_batch_arg.count());
} else if (OB_FAIL(log_id_arr.reserve(target_batch_arg.count()))) { } else if (OB_FAIL(log_id_arr.reserve(target_batch_arg.count()))) {
...@@ -1939,12 +1945,18 @@ int ObPartitionService::create_batch_pg_partitions( ...@@ -1939,12 +1945,18 @@ int ObPartitionService::create_batch_pg_partitions(
add_partition_to_pg_log_id, add_partition_to_pg_log_id,
sstables_handle))) { sstables_handle))) {
STORAGE_LOG(WARN, "create pg partition failed.", K(ret)); STORAGE_LOG(WARN, "create pg partition failed.", K(ret));
} else {
revert_cnt = false;
} }
} }
} }
} }
tg.click(); tg.click();
if (OB_FAIL(ret) && revert_cnt) {
try_inc_total_partition_cnt(-target_batch_arg.count(), false /*need check*/);
}
STORAGE_LOG(INFO, STORAGE_LOG(INFO,
"batch create partition to pg result.", "batch create partition to pg result.",
K(ret), K(ret),
...@@ -2084,6 +2096,7 @@ int ObPartitionService::replay_add_partition_to_pg_clog( ...@@ -2084,6 +2096,7 @@ int ObPartitionService::replay_add_partition_to_pg_clog(
ObIPartitionGroup* pg = NULL; ObIPartitionGroup* pg = NULL;
common::ObReplicaType replica_type; common::ObReplicaType replica_type;
bool can_replay = true; bool can_replay = true;
bool revert_cnt = false;
#ifdef ERRSIM #ifdef ERRSIM
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
...@@ -2111,6 +2124,7 @@ int ObPartitionService::replay_add_partition_to_pg_clog( ...@@ -2111,6 +2124,7 @@ int ObPartitionService::replay_add_partition_to_pg_clog(
STORAGE_LOG(INFO, "no need to replay this log", K(arg), K(log_id)); STORAGE_LOG(INFO, "no need to replay this log", K(arg), K(log_id));
} else if (OB_FAIL(try_inc_total_partition_cnt(1, false /*need check*/))) { } else if (OB_FAIL(try_inc_total_partition_cnt(1, false /*need check*/))) {
LOG_ERROR("failed to inc total partition cnt", K(ret), K(arg), K(log_id)); LOG_ERROR("failed to inc total partition cnt", K(ret), K(arg), K(log_id));
} else if (FALSE_IT(revert_cnt = true)) {
} else if (OB_FAIL(batch_arg.push_back(arg))) { } else if (OB_FAIL(batch_arg.push_back(arg))) {
STORAGE_LOG(WARN, "batch arg push back error", K(ret), K(arg), K(log_id)); STORAGE_LOG(WARN, "batch arg push back error", K(ret), K(arg), K(log_id));
} else if (OB_FAIL(pg->get_pg_storage().get_replica_type(replica_type))) { } else if (OB_FAIL(pg->get_pg_storage().get_replica_type(replica_type))) {
...@@ -2160,6 +2174,8 @@ int ObPartitionService::replay_add_partition_to_pg_clog( ...@@ -2160,6 +2174,8 @@ int ObPartitionService::replay_add_partition_to_pg_clog(
log_id, log_id,
sstables_handle))) { sstables_handle))) {
STORAGE_LOG(WARN, "failed to create pg partition", K(ret)); STORAGE_LOG(WARN, "failed to create pg partition", K(ret));
} else {
revert_cnt = false;
} }
} else { } else {
tg.click(); tg.click();
...@@ -2173,6 +2189,8 @@ int ObPartitionService::replay_add_partition_to_pg_clog( ...@@ -2173,6 +2189,8 @@ int ObPartitionService::replay_add_partition_to_pg_clog(
log_id, log_id,
handle))) { handle))) {
STORAGE_LOG(WARN, "failed to create pg partition", K(ret)); STORAGE_LOG(WARN, "failed to create pg partition", K(ret));
} else {
revert_cnt = false;
} }
tg.click(); tg.click();
} }
...@@ -2185,6 +2203,9 @@ int ObPartitionService::replay_add_partition_to_pg_clog( ...@@ -2185,6 +2203,9 @@ int ObPartitionService::replay_add_partition_to_pg_clog(
tg.click(); tg.click();
} else { } else {
FLOG_WARN("replay add partition to pg clog error", K(arg), "cost", ObTimeUtility::current_time() - start_timestamp); FLOG_WARN("replay add partition to pg clog error", K(arg), "cost", ObTimeUtility::current_time() - start_timestamp);
if (revert_cnt) {
try_inc_total_partition_cnt(-1, false /*need check*/);
}
} }
return ret; return ret;
} }
...@@ -4172,6 +4193,7 @@ int ObPartitionService::inner_add_partition( ...@@ -4172,6 +4193,7 @@ int ObPartitionService::inner_add_partition(
LOG_WARN("failed to inc total_partition_cnt", K(ret)); LOG_WARN("failed to inc total_partition_cnt", K(ret));
} else if (OB_FAIL(pg_mgr_.add_pg(partition, need_check_tenant, allow_multi_value))) { } else if (OB_FAIL(pg_mgr_.add_pg(partition, need_check_tenant, allow_multi_value))) {
STORAGE_LOG(WARN, "add partition group error", K(ret)); STORAGE_LOG(WARN, "add partition group error", K(ret));
try_inc_total_partition_cnt(-new_partition_cnt, false /*need check*/);
} else if (partition.is_pg()) { } else if (partition.is_pg()) {
// do nothing // do nothing
} else { } else {
......
...@@ -27,6 +27,7 @@ ObPGSSTableGCTask::~ObPGSSTableGCTask() ...@@ -27,6 +27,7 @@ ObPGSSTableGCTask::~ObPGSSTableGCTask()
void ObPGSSTableGCTask::runTimerTask() void ObPGSSTableGCTask::runTimerTask()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
disable_timeout_check();
storage::ObIPartitionGroupIterator* partition_iter = nullptr; storage::ObIPartitionGroupIterator* partition_iter = nullptr;
ObIPartitionGroup* partition_group = nullptr; ObIPartitionGroup* partition_group = nullptr;
int64_t left_recycle_cnt = ONE_ROUND_RECYCLE_COUNT_THRESHOLD; int64_t left_recycle_cnt = ONE_ROUND_RECYCLE_COUNT_THRESHOLD;
...@@ -109,4 +110,4 @@ void ObPGSSTableGarbageCollector::wait() ...@@ -109,4 +110,4 @@ void ObPGSSTableGarbageCollector::wait()
void ObPGSSTableGarbageCollector::destroy() void ObPGSSTableGarbageCollector::destroy()
{ {
timer_.destroy(); timer_.destroy();
} }
\ No newline at end of file
...@@ -52,9 +52,17 @@ static TransObjFactory<TransRpcTask> trans_rpc_task_factory("OB_TRANS_RPC_TASK") ...@@ -52,9 +52,17 @@ static TransObjFactory<TransRpcTask> trans_rpc_task_factory("OB_TRANS_RPC_TASK")
#define OB_FREE(object, ...) ob_free(object) #define OB_FREE(object, ...) ob_free(object)
#define RP_FREE(object, LABEL) rp_free(object, LABEL) #define RP_FREE(object, LABEL) rp_free(object, LABEL)
#define OB_ALLOC(object, LABEL) new (ob_malloc(sizeof(object), LABEL)) object() #define OB_ALLOC(object, LABEL) object##alloc()
#define RP_ALLOC(object, LABEL) rp_alloc(object, LABEL) #define RP_ALLOC(object, LABEL) rp_alloc(object, LABEL)
#define MAKE_OB_ALLOC(object_name, LABEL) \
object_name *object_name##alloc() \
{ \
object_name *object = NULL; \
object = (object_name*)ob_malloc(sizeof(object_name), ObModIds::LABEL); \
return object == NULL ? object : new(object) object_name(); \
} \
#define MAKE_FACTORY_CLASS_IMPLEMENT(object_name, LABEL, allocator_type, arg...) \ #define MAKE_FACTORY_CLASS_IMPLEMENT(object_name, LABEL, allocator_type, arg...) \
int64_t object_name##Factory::alloc_count_ = 0; \ int64_t object_name##Factory::alloc_count_ = 0; \
int64_t object_name##Factory::release_count_ = 0; \ int64_t object_name##Factory::release_count_ = 0; \
...@@ -281,6 +289,10 @@ const char* TransRpcTaskFactory::get_mod_type() ...@@ -281,6 +289,10 @@ const char* TransRpcTaskFactory::get_mod_type()
return trans_rpc_task_factory.get_mod_type(); return trans_rpc_task_factory.get_mod_type();
} }
MAKE_OB_ALLOC(ObDupTablePartitionMgr, OB_DUP_TABLE_PARTITION_MGR)
MAKE_OB_ALLOC(ObGtsRpcProxy, OB_GTS_RPC_PROXY)
MAKE_OB_ALLOC(ObGtsRequestRpc, OB_GTS_REQUEST_RPC)
MAKE_FACTORY_CLASS_IMPLEMENT_USE_RP_ALLOC(ClogBuf, OB_TRANS_CLOG_BUF) MAKE_FACTORY_CLASS_IMPLEMENT_USE_RP_ALLOC(ClogBuf, OB_TRANS_CLOG_BUF)
MAKE_FACTORY_CLASS_IMPLEMENT_USE_RP_ALLOC(MutatorBuf, OB_TRANS_MUTATOR_BUF) MAKE_FACTORY_CLASS_IMPLEMENT_USE_RP_ALLOC(MutatorBuf, OB_TRANS_MUTATOR_BUF)
MAKE_FACTORY_CLASS_IMPLEMENT_USE_RP_ALLOC(SubmitLogTask, OB_TRANS_SUBMIT_LOG_TASK) MAKE_FACTORY_CLASS_IMPLEMENT_USE_RP_ALLOC(SubmitLogTask, OB_TRANS_SUBMIT_LOG_TASK)
......
...@@ -363,7 +363,6 @@ void ObPartTransCtx::reset() ...@@ -363,7 +363,6 @@ void ObPartTransCtx::reset()
is_dup_table_prepare_ = false; is_dup_table_prepare_ = false;
dup_table_syncing_log_id_ = UINT64_MAX; dup_table_syncing_log_id_ = UINT64_MAX;
dup_table_syncing_log_ts_ = INT64_MAX; dup_table_syncing_log_ts_ = INT64_MAX;
async_applying_log_id_ = UINT64_MAX;
async_applying_log_ts_ = INT64_MAX; async_applying_log_ts_ = INT64_MAX;
is_prepare_leader_revoke_ = false; is_prepare_leader_revoke_ = false;
is_local_trans_ = true; is_local_trans_ = true;
...@@ -940,17 +939,17 @@ int ObPartTransCtx::end_task_( ...@@ -940,17 +939,17 @@ int ObPartTransCtx::end_task_(
} else if (OB_UNLIKELY(is_exiting_)) { } else if (OB_UNLIKELY(is_exiting_)) {
TRANS_LOG(WARN, "transaction is exiting", "context", *this); TRANS_LOG(WARN, "transaction is exiting", "context", *this);
ret = OB_TRANS_IS_EXITING; ret = OB_TRANS_IS_EXITING;
} else if (stmt_info_.stmt_expired(sql_no)) {
ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
TRANS_LOG(WARN, "sql sequence is illegal", KR(ret), "context", *this, K(sql_no), K_(stmt_info));
} else if (FALSE_IT(stmt_info_.end_task())) {
} else if (OB_UNLIKELY(for_replay_)) { } else if (OB_UNLIKELY(for_replay_)) {
ret = OB_NOT_MASTER; ret = OB_NOT_MASTER;
TRANS_LOG(WARN, "invalid state, transaction is replaying", KR(ret), "context", *this); TRANS_LOG(WARN, "invalid state, transaction is replaying", KR(ret), "context", *this);
} else if (is_in_2pc_()) { } else if (is_in_2pc_()) {
TRANS_LOG(WARN, "transaction is in 2pc", "context", *this); TRANS_LOG(WARN, "transaction is in 2pc", "context", *this);
ret = OB_TRANS_HAS_DECIDED; ret = OB_TRANS_HAS_DECIDED;
} else if (stmt_info_.stmt_expired(sql_no)) {
ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
TRANS_LOG(WARN, "sql sequence is illegal", KR(ret), "context", *this, K(sql_no), K_(stmt_info));
} else { } else {
stmt_info_.end_task();
if (is_sp_trans_()) { if (is_sp_trans_()) {
if (is_rollback) { if (is_rollback) {
--commit_task_count_; --commit_task_count_;
...@@ -1002,7 +1001,9 @@ int ObPartTransCtx::end_task_( ...@@ -1002,7 +1001,9 @@ int ObPartTransCtx::end_task_(
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
if (is_changing_leader_ && prepare_changing_leader_state_ == CHANGING_LEADER_STATE::STATEMENT_NOT_FINISH) { if (is_changing_leader_
&& prepare_changing_leader_state_ == CHANGING_LEADER_STATE::STATEMENT_NOT_FINISH
&& stmt_info_.is_task_match()) {
if (0 == submit_log_count_) { if (0 == submit_log_count_) {
if (OB_SUCCESS != (tmp_ret = submit_log_incrementally_(true /*need state log*/))) { if (OB_SUCCESS != (tmp_ret = submit_log_incrementally_(true /*need state log*/))) {
TRANS_LOG(WARN, TRANS_LOG(WARN,
...@@ -1623,7 +1624,6 @@ int ObPartTransCtx::callback_big_trans( ...@@ -1623,7 +1624,6 @@ int ObPartTransCtx::callback_big_trans(
TRANS_LOG(WARN, "unexpected log type", K(log_type), K(log_id), K(timestamp), "context", *this); TRANS_LOG(WARN, "unexpected log type", K(log_type), K(log_id), K(timestamp), "context", *this);
} }
async_applying_log_id_ = UINT64_MAX;
async_applying_log_ts_ = INT64_MAX; async_applying_log_ts_ = INT64_MAX;
} }
} }
...@@ -1754,7 +1754,7 @@ int ObPartTransCtx::on_sync_log_success( ...@@ -1754,7 +1754,7 @@ int ObPartTransCtx::on_sync_log_success(
if (redo_log_no_ > 1) { if (redo_log_no_ > 1) {
ret = submit_big_trans_callback_task_(log_type, log_id, timestamp); ret = submit_big_trans_callback_task_(log_type, log_id, timestamp);
} else { } else {
if (OB_FAIL(on_sp_commit_(commit))) { if (OB_FAIL(on_sp_commit_(commit, timestamp))) {
TRANS_LOG(WARN, "ObPartTransCtx on sp commit error", KR(ret), K(commit), "context", *this); TRANS_LOG(WARN, "ObPartTransCtx on sp commit error", KR(ret), K(commit), "context", *this);
} }
} }
...@@ -2055,7 +2055,6 @@ int ObPartTransCtx::submit_big_trans_callback_task_( ...@@ -2055,7 +2055,6 @@ int ObPartTransCtx::submit_big_trans_callback_task_(
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
async_applying_log_id_ = log_id;
async_applying_log_ts_ = timestamp; async_applying_log_ts_ = timestamp;
if (OB_FAIL(big_trans_worker_->submit_big_trans_callback_task(self_, log_type, log_id, timestamp, this))) { if (OB_FAIL(big_trans_worker_->submit_big_trans_callback_task(self_, log_type, log_id, timestamp, this))) {
...@@ -5545,7 +5544,11 @@ int ObPartTransCtx::fill_trans_state_log_(char* buf, const int64_t size, int64_t ...@@ -5545,7 +5544,11 @@ int ObPartTransCtx::fill_trans_state_log_(char* buf, const int64_t size, int64_t
ObTimeGuard timeguard("fill_trans_state_log", 10 * 1000); ObTimeGuard timeguard("fill_trans_state_log", 10 * 1000);
const int64_t available_capacity = size - OB_TRANS_REDO_LOG_RESERVE_SIZE; const int64_t available_capacity = size - OB_TRANS_REDO_LOG_RESERVE_SIZE;
if (OB_FAIL(ctx_dependency_wrap_.get_prev_trans_arr_guard(guard))) { if (OB_UNLIKELY(!stmt_info_.is_task_match())) {
ret = OB_ERR_UNEXPECTED;
need_print_trace_log_ = true;
TRANS_LOG(ERROR, "fill trans state log when task not match", KR(ret), K(*this));
} else if (OB_FAIL(ctx_dependency_wrap_.get_prev_trans_arr_guard(guard))) {
TRANS_LOG(WARN, "get prev trans arr guard error", KR(ret), K(*this)); TRANS_LOG(WARN, "get prev trans arr guard error", KR(ret), K(*this));
} else if (OB_FAIL(log.init(OB_LOG_TRANS_STATE, } else if (OB_FAIL(log.init(OB_LOG_TRANS_STATE,
self_, self_,
...@@ -6047,6 +6050,7 @@ int ObPartTransCtx::gts_elapse_callback(const MonotonicTs srr, const int64_t gts ...@@ -6047,6 +6050,7 @@ int ObPartTransCtx::gts_elapse_callback(const MonotonicTs srr, const int64_t gts
} }
need_revert_ctx = true; need_revert_ctx = true;
is_gts_waiting_ = false; is_gts_waiting_ = false;
async_applying_log_ts_ = INT64_MAX;
} }
} }
REC_TRANS_TRACE_EXT(tlog_, gts_elapse_callback, Y(ret), OB_ID(srr), srr.mts_, Y(gts)); REC_TRANS_TRACE_EXT(tlog_, gts_elapse_callback, Y(ret), OB_ID(srr), srr.mts_, Y(gts));
...@@ -8827,7 +8831,7 @@ int ObPartTransCtx::on_dist_abort_() ...@@ -8827,7 +8831,7 @@ int ObPartTransCtx::on_dist_abort_()
} }
// Need to wait for gts has pushed over the gts // Need to wait for gts has pushed over the gts
int ObPartTransCtx::on_sp_commit_(const bool commit) int ObPartTransCtx::on_sp_commit_(const bool commit, const int64_t timestamp)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObITsMgr* ts_mgr = get_ts_mgr_(); ObITsMgr* ts_mgr = get_ts_mgr_();
...@@ -8845,6 +8849,9 @@ int ObPartTransCtx::on_sp_commit_(const bool commit) ...@@ -8845,6 +8849,9 @@ int ObPartTransCtx::on_sp_commit_(const bool commit)
if (OB_FAIL(ts_mgr->wait_gts_elapse(self_.get_tenant_id(), global_trans_version_, this, need_wait))) { if (OB_FAIL(ts_mgr->wait_gts_elapse(self_.get_tenant_id(), global_trans_version_, this, need_wait))) {
TRANS_LOG(WARN, "wait gts elapse failed", KR(ret), "context", *this); TRANS_LOG(WARN, "wait gts elapse failed", KR(ret), "context", *this);
} else if (need_wait) { } else if (need_wait) {
if (OB_INVALID_TIMESTAMP != timestamp) {
async_applying_log_ts_ = timestamp;
}
is_gts_waiting_ = true; is_gts_waiting_ = true;
gts_request_ts_ = ObTimeUtility::current_time(); gts_request_ts_ = ObTimeUtility::current_time();
if (OB_FAIL(partition_mgr_->acquire_ctx_ref(trans_id_))) { if (OB_FAIL(partition_mgr_->acquire_ctx_ref(trans_id_))) {
......
...@@ -485,7 +485,7 @@ private: ...@@ -485,7 +485,7 @@ private:
int do_clear_(); int do_clear_();
int on_prepare_redo_(); // after redo/prepare log written int on_prepare_redo_(); // after redo/prepare log written
int on_prepare_(const bool batch_committed, const int64_t timestamp); int on_prepare_(const bool batch_committed, const int64_t timestamp);
int on_sp_commit_(const bool commit); int on_sp_commit_(const bool commit, const int64_t timestamp = OB_INVALID_TIMESTAMP);
int on_dist_commit_(); int on_dist_commit_();
int on_dist_abort_(); int on_dist_abort_();
int on_clear_(const bool need_response); int on_clear_(const bool need_response);
...@@ -672,7 +672,6 @@ private: ...@@ -672,7 +672,6 @@ private:
bool is_dup_table_prepare_; bool is_dup_table_prepare_;
uint64_t dup_table_syncing_log_id_; uint64_t dup_table_syncing_log_id_;
int64_t dup_table_syncing_log_ts_; int64_t dup_table_syncing_log_ts_;
uint64_t async_applying_log_id_;
int64_t async_applying_log_ts_; int64_t async_applying_log_ts_;
ObTransUndoStatus undo_status_; ObTransUndoStatus undo_status_;
int32_t max_durable_sql_no_; int32_t max_durable_sql_no_;
......
...@@ -851,9 +851,9 @@ int ObTransService::do_dist_rollback_( ...@@ -851,9 +851,9 @@ int ObTransService::do_dist_rollback_(
bool use_tmp_sche_ctx = false; bool use_tmp_sche_ctx = false;
ObScheTransCtx* sche_ctx = NULL; ObScheTransCtx* sche_ctx = NULL;
if (OB_FAIL(alloc_tmp_sche_ctx_(trans_desc, use_tmp_sche_ctx))) { if (OB_FAIL(acquire_sche_ctx_(trans_desc, sche_ctx, use_tmp_sche_ctx))) {
TRANS_LOG(WARN, "fail to get tmp scheduler", K(ret), K(trans_desc)); TRANS_LOG(WARN, "fail to get tmp scheduler", K(ret), K(trans_desc));
} else if (NULL == (sche_ctx = trans_desc.get_sche_ctx())) { } else if (OB_ISNULL(sche_ctx)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "scheduler not found", K(ret), K(trans_desc)); TRANS_LOG(ERROR, "scheduler not found", K(ret), K(trans_desc));
} else if (OB_FAIL(sche_ctx->start_savepoint_rollback(trans_desc, sql_no, rollback_partitions))) { } else if (OB_FAIL(sche_ctx->start_savepoint_rollback(trans_desc, sql_no, rollback_partitions))) {
...@@ -867,21 +867,27 @@ int ObTransService::do_dist_rollback_( ...@@ -867,21 +867,27 @@ int ObTransService::do_dist_rollback_(
trans_desc.set_need_rollback(); trans_desc.set_need_rollback();
} }
if (use_tmp_sche_ctx) { release_sche_ctx_(trans_desc, sche_ctx, use_tmp_sche_ctx);
free_tmp_sche_ctx_(trans_desc);
}
return ret; return ret;
} }
int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_sche_ctx) int ObTransService::acquire_sche_ctx_(ObTransDesc& trans_desc,
ObScheTransCtx*& sche_ctx,
bool& use_tmp_sche_ctx)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!trans_desc.is_nested_stmt()) { use_tmp_sche_ctx = false;
// only nested stmt need create temp scheduler
} else if (NULL == trans_desc.get_sche_ctx()) { if (NULL != (sche_ctx = trans_desc.get_sche_ctx())) {
const ObTransID& trans_id = trans_desc.get_trans_id(); TRANS_LOG(DEBUG, "get saved scheduler success", K(trans_desc));
} else if (!trans_desc.is_nested_stmt()) {
TRANS_LOG(WARN, "Non-nested statements should not create a temporary scheduler",
K(trans_desc));
} else {
// 构建临时scheduler
const ObTransID &trans_id = trans_desc.get_trans_id();
const bool for_replay = false; const bool for_replay = false;
bool alloc = true; bool alloc = true;
const bool is_readonly = false; const bool is_readonly = false;
...@@ -889,9 +895,10 @@ int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_s ...@@ -889,9 +895,10 @@ int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_s
if (OB_FAIL(sche_trans_ctx_mgr_.get_trans_ctx(SCHE_PARTITION_ID, trans_id, for_replay, is_readonly, alloc, ctx))) { if (OB_FAIL(sche_trans_ctx_mgr_.get_trans_ctx(SCHE_PARTITION_ID, trans_id, for_replay, is_readonly, alloc, ctx))) {
TRANS_LOG(WARN, "get transaction context error", K(ret), K(trans_id)); TRANS_LOG(WARN, "get transaction context error", K(ret), K(trans_id));
} else if (FALSE_IT(sche_ctx = static_cast<ObScheTransCtx *>(ctx))) {
} else if (!alloc) {
TRANS_LOG(DEBUG, "get existed scheduler success", K(SCHE_PARTITION_ID), K(trans_desc));
} else { } else {
ObScheTransCtx* sche_ctx = static_cast<ObScheTransCtx*>(ctx);
if (OB_FAIL(sche_ctx->init(trans_desc.get_tenant_id(), if (OB_FAIL(sche_ctx->init(trans_desc.get_tenant_id(),
trans_id, trans_id,
trans_desc.get_trans_expired_time(), trans_desc.get_trans_expired_time(),
...@@ -921,9 +928,11 @@ int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_s ...@@ -921,9 +928,11 @@ int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_s
} }
} }
if (OB_FAIL(ret) || OB_FAIL(trans_desc.set_sche_ctx(sche_ctx))) { if (OB_FAIL(ret)) {
TRANS_LOG(DEBUG, "create temp scheduler failed", K(SCHE_PARTITION_ID), K(trans_desc));
sche_ctx->set_exiting(); sche_ctx->set_exiting();
(void)sche_trans_ctx_mgr_.revert_trans_ctx(sche_ctx); (void)sche_trans_ctx_mgr_.revert_trans_ctx(sche_ctx);
sche_ctx = NULL;
} else { } else {
use_tmp_sche_ctx = true; use_tmp_sche_ctx = true;
TRANS_LOG(DEBUG, "create temp scheduler success", K(SCHE_PARTITION_ID), K(trans_desc)); TRANS_LOG(DEBUG, "create temp scheduler success", K(SCHE_PARTITION_ID), K(trans_desc));
...@@ -934,13 +943,14 @@ int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_s ...@@ -934,13 +943,14 @@ int ObTransService::alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_s
return ret; return ret;
} }
void ObTransService::free_tmp_sche_ctx_(ObTransDesc& trans_desc) void ObTransService::release_sche_ctx_(ObTransDesc& trans_desc,
ObScheTransCtx* sche_ctx,
const bool use_tmp_sche_ctx)
{ {
ObScheTransCtx* sche_ctx = NULL; if (NULL != sche_ctx && trans_desc.get_sche_ctx() != sche_ctx) {
if (use_tmp_sche_ctx) {
if (NULL != (sche_ctx = trans_desc.get_sche_ctx())) { sche_ctx->set_exiting();
trans_desc.set_sche_ctx(NULL); }
sche_ctx->set_exiting();
(void)sche_trans_ctx_mgr_.revert_trans_ctx(sche_ctx); (void)sche_trans_ctx_mgr_.revert_trans_ctx(sche_ctx);
} }
} }
......
...@@ -733,9 +733,12 @@ private: ...@@ -733,9 +733,12 @@ private:
int do_dist_rollback_( int do_dist_rollback_(
ObTransDesc& trans_desc, const int64_t sql_no, const common::ObPartitionArray& rollback_partitions); ObTransDesc& trans_desc, const int64_t sql_no, const common::ObPartitionArray& rollback_partitions);
int alloc_tmp_sche_ctx_(ObTransDesc& trans_desc, bool& use_tmp_sche_ctx); int acquire_sche_ctx_(ObTransDesc &trans_desc,
void free_tmp_sche_ctx_(ObTransDesc& trans_desc); ObScheTransCtx *&sche_ctx,
bool &use_tmp_sche_ctx);
void release_sche_ctx_(ObTransDesc &trans_desc,
ObScheTransCtx *sche_ctx,
const bool use_tmp_sche_ctx);
private: private:
static const int64_t END_STMT_MORE_TIME_US = 100 * 1000; static const int64_t END_STMT_MORE_TIME_US = 100 * 1000;
// max task count in message process queue // max task count in message process queue
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册