提交 074c52f1 编写于 作者: O obdev 提交者: wangzelin.wzl

[OBCDC] Modify participant structure

上级 0b6c9733
......@@ -403,7 +403,7 @@ int MutatorRow::parse_columns_(
LOG_DEBUG("handle_lob_v2_data", K(column_stored_idx), K(lob_common), K(obj));
if (! is_out_row) {
LOG_INFO("is_lob_v2 in row", K(column_id), K(is_lob_v2), K(lob_common), K(obj));
LOG_DEBUG("is_lob_v2 in row", K(column_id), K(is_lob_v2), K(lob_common), K(obj));
obj.set_string(obj.get_type(), lob_common.get_inrow_data_ptr(), lob_common.get_byte_size(datum.len_));
} else {
const ObLobData &lob_data = *(reinterpret_cast<const ObLobData *>(lob_common.buffer_));
......@@ -2030,8 +2030,7 @@ PartTransTask::PartTransTask() :
commit_log_lsn_(),
trans_type_(transaction::TransType::UNKNOWN_TRANS),
is_xa_or_dup_(false),
participant_count_(0),
participants_(NULL),
participants_(),
trace_id_(),
trace_info_(),
sorted_log_entry_info_(),
......@@ -2164,8 +2163,7 @@ void PartTransTask::reset()
commit_log_lsn_.reset();
trans_type_ = transaction::TransType::UNKNOWN_TRANS;
is_xa_or_dup_ = false;
participant_count_ = 0;
participants_ = NULL;
participants_.reset();
// The trace_id memory does not need to be freed separately, the allocator frees it all together
trace_id_.reset();
trace_info_.reset();
......@@ -2261,7 +2259,9 @@ int PartTransTask::push_redo_log(
if (OB_SUCC(ret) && need_store_data && is_row_completed) {
if (OB_FAIL(get_and_submit_store_task_(tls_id_.get_tenant_id(), row_flags, store_log_lsn,
data_buf, data_len))) {
LOG_ERROR("get_and_submit_store_task_ fail", KR(ret), K_(tls_id), K(row_flags));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("get_and_submit_store_task_ fail", KR(ret), K_(tls_id), K(row_flags));
}
}
} // need_store_data
}
......@@ -3363,15 +3363,14 @@ int PartTransTask::init_participant_array_(
const palf::LSN &commit_log_lsn)
{
int ret = OB_SUCCESS;
transaction::ObLSLogInfo *part_array = NULL;
const int64_t part_count = is_single_ls_trans() ? 1 : participants.count();
if (OB_UNLIKELY(! tls_id_.is_valid() || ! commit_log_lsn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid tls_id or commit_log_lsn", K_(tls_id), K(commit_log_lsn));
} else if (OB_UNLIKELY(NULL != participants_) || OB_UNLIKELY(participant_count_ > 0)) {
LOG_ERROR("participant has been initialized", K(participants_), K(participant_count_));
LOG_ERROR("invalid tls_id or commit_log_lsn", KR(ret), K_(tls_id), K(commit_log_lsn));
} else if (OB_UNLIKELY(participants_.count() > 0)) {
ret = OB_INIT_TWICE;
LOG_ERROR("participant has been initialized", KR(ret), K(participants_));
// participants record prepared ls info, should be empty if is single_ls_trans.
} else if (OB_UNLIKELY(is_single_ls_trans() && part_count != 1)
|| OB_UNLIKELY(is_dist_trans() && ! is_xa_or_dup_ && part_count <= 1)) {
......@@ -3379,19 +3378,13 @@ int PartTransTask::init_participant_array_(
LOG_ERROR("trans_type is not consistent with participant_count", KR(ret), K_(tls_id), K_(trans_id),
K_(trans_type), K(participants));
} else {
int64_t alloc_size = part_count * sizeof(transaction::ObLSLogInfo);
part_array = static_cast<transaction::ObLSLogInfo *>(allocator_.alloc(alloc_size));
if (OB_ISNULL(part_array)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("allocate memory for participant array fail", KR(ret), K_(tls_id), K_(trans_id),
K_(trans_type), K(part_count), K(alloc_size), K(participants));
} else if (is_single_ls_trans()) {
new (part_array) transaction::ObLSLogInfo(tls_id_.get_ls_id(), commit_log_lsn);
if (OB_UNLIKELY(! part_array->is_valid())) {
if (is_single_ls_trans()) {
if (OB_FAIL(participants_.push_back(transaction::ObLSLogInfo(tls_id_.get_ls_id(), commit_log_lsn)))) {
LOG_ERROR("participants_ push_back failed", KR(ret), KPC(this));
} else if (OB_UNLIKELY(! participants_[0].is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("unexcepted invalid part_array", KR(ret), KPC(part_array), KPC(this));
}
LOG_ERROR("unexcepted invalid part_array", KR(ret), K(participants_), KPC(this));
} else {}
} else {
for (int64_t index = 0; OB_SUCC(ret) && index < part_count; index++) {
const transaction::ObLSLogInfo &part_log_info = participants.at(index);
......@@ -3400,28 +3393,19 @@ int PartTransTask::init_participant_array_(
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("part_log_info recorded in TransCommitLog is invalid", KR(ret),
K_(tls_id), K_(trans_id), K_(trans_type), K(participants), K(part_log_info));
} else {
new(part_array + index) transaction::ObLSLogInfo(part_log_info.get_ls_id(), part_log_info.get_lsn());
}
} else if (OB_FAIL(participants_.push_back(part_log_info))) {
LOG_ERROR("participants_ push_back failed", KR(ret), KPC(this));
} else {}
}
}
}
if (OB_SUCC(ret)) {
participants_ = part_array;
participant_count_ = part_count;
} else {
if (NULL != part_array) {
allocator_.free(part_array);
part_array = NULL;
}
}
return ret;
}
void PartTransTask::destroy_participant_array_()
{
/*
if (NULL != participants_ && participant_count_ > 0) {
for (int64_t index = 0; index < participant_count_; index++) {
participants_[index].~ObLSLogInfo();
......@@ -3431,6 +3415,7 @@ void PartTransTask::destroy_participant_array_()
participants_ = NULL;
participant_count_ = 0;
}
*/
}
int PartTransTask::set_participants(
......
......@@ -982,14 +982,14 @@ public:
common::ObIAllocator &get_allocator() { return allocator_; }
const transaction::ObLSLogInfo *get_participants() const
const transaction::ObLSLogInfoArray &get_participants() const
{
return participants_;
}
int64_t get_participant_count() const
{
return participant_count_;
return participants_.count();
}
// for unittest start
......@@ -1090,8 +1090,8 @@ public:
K_(prepare_log_lsn),
K_(commit_ts),
K_(commit_log_lsn),
K_(participant_count),
KP_(participants),
"participant_count", participants_.count(),
K_(participants),
K_(trace_id),
K_(trace_info),
K_(sorted_log_entry_info),
......@@ -1184,9 +1184,8 @@ private:
transaction::TransType trans_type_;
bool is_xa_or_dup_; // true if xa dist trans or duplicate table trans.
int64_t participant_count_;
// participants info, used for determine the sequence of trans at sequencer moudle.
transaction::ObLSLogInfo *participants_;
transaction::ObLSLogInfoArray participants_;
// App Trace ID (get from commit_info log)
ObString trace_id_;
// App Trace Info (get from commit_info log)
......
......@@ -406,6 +406,9 @@ int ObLogResourceCollector::push_task_into_queue_(ObLogResourceRecycleTask &task
if (OB_TIMEOUT != ret) {
break;
} else {
// When timeout, need to retry
ret = OB_SUCCESS;
}
}
// Note: After a task is pushed to the queue, it may be recycled quickly and the task cannot be accessed later
......@@ -613,7 +616,9 @@ int ObLogResourceCollector::revert_dml_binlog_record_(ObLogBR &br, volatile bool
if (OB_SUCC(ret)) {
if (OB_FAIL(dec_ref_cnt_and_try_to_recycle_log_entry_task_(br))) {
LOG_ERROR("dec_ref_cnt_and_try_to_recycle_log_entry_task_ fail", KR(ret));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("dec_ref_cnt_and_try_to_recycle_log_entry_task_ fail", KR(ret));
}
}
}
}
......
......@@ -261,12 +261,14 @@ int RocksDbStoreService::del(void *cf_handle, const std::string &key)
{
int ret = OB_SUCCESS;
rocksdb::ColumnFamilyHandle *column_family_handle = static_cast<rocksdb::ColumnFamilyHandle *>(cf_handle);
rocksdb::WriteOptions writer_options;
writer_options.disableWAL = true;
if (OB_ISNULL(column_family_handle)) {
LOG_ERROR("column_family_handle is NULL");
ret = OB_ERR_UNEXPECTED;
} else {
rocksdb::Status s = m_db_->Delete(rocksdb::WriteOptions(), column_family_handle, key);
rocksdb::Status s = m_db_->Delete(writer_options, column_family_handle, key);
if (!s.ok()) {
LOG_ERROR("delete %s from rocksdb failed, error %s", key.c_str(), s.ToString().c_str());
......
......@@ -678,7 +678,9 @@ int ObLogSequencer::push_task_into_redo_dispatcher_(TransCtx &trans_ctx, volatil
int ret = OB_SUCCESS;
if (OB_FAIL(redo_dispatcher_->dispatch_trans_redo(trans_ctx, stop_flag))) {
LOG_ERROR("failed to dispatch trans redo", KR(ret), K(trans_ctx), K(stop_flag));
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("failed to dispatch trans redo", KR(ret), K(trans_ctx), K(stop_flag));
}
}
return ret;
......
......@@ -746,6 +746,7 @@ void ObLogTenant::print_stat_info()
"SEQ(GB=%ld,CMT=%ld) "
"SCHEMA(GB=%ld,CUR=%ld) "
"CMT_SCHEMA(CUR=%ld,NEXT=%ld) "
"CHECKPOINT(TX=%ld(%s), GHB=%ld(%s)) "
"DROP_TS=%s "
"DDL_TABLE=%lu",
tenant_id_, tenant_name_, print_state(get_tenant_state()), get_tenant_state(),
......@@ -755,6 +756,8 @@ void ObLogTenant::print_stat_info()
get_global_seq(), NULL == task_queue_ ? 0 : task_queue_->get_next_task_seq(),
get_global_schema_version(), get_schema_version(),
committer_cur_schema_version_, committer_next_trans_schema_version_,
committer_trans_commit_version_, NTS_TO_STR(committer_trans_commit_version_),
committer_global_heartbeat_, NTS_TO_STR(committer_global_heartbeat_),
NTS_TO_STR(drop_tenant_tstamp_),
ddl_table_id);
}
......
......@@ -306,7 +306,7 @@ int TransCtx::prepare_(
const uint64_t tenant_id = host_tls_id.get_tenant_id();
int64_t host_commit_log_timestamp = host.get_commit_ts();
const palf::LSN &host_commit_log_lsn = host.get_commit_log_lsn();
const ObLSLogInfo *part_array = host.get_participants();
const transaction::ObLSLogInfoArray &part_array = host.get_participants();
int64_t part_count = host.get_participant_count();
const int64_t trans_commit_version = host.get_trans_commit_version();
tenant_id_ = tenant_id;
......@@ -314,7 +314,7 @@ int TransCtx::prepare_(
// default serve
bool is_serving_host_part = true;
if (OB_UNLIKELY(part_count <= 0) || OB_ISNULL(part_array)) {
if (OB_UNLIKELY(part_count <= 0)) {
LOG_ERROR("invalid participant array", K(part_count), K(part_array), K(host));
ret = OB_ERR_UNEXPECTED;
}
......
......@@ -145,19 +145,21 @@ void ObLogRouteService::stop()
void ObLogRouteService::wait()
{
LOG_INFO("ObLogRouteService wait begin");
TG_WAIT(lib::TGDefIDs::LogRouterTimer);
int64_t num = 0;
int ret = OB_SUCCESS;
while (OB_SUCC(TG_GET_QUEUE_NUM(tg_id_, num)) && num > 0) {
PAUSE();
}
if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "ObLogApplyService failed to get queue number");
if (IS_INIT) {
LOG_INFO("ObLogRouteService wait begin");
TG_WAIT(lib::TGDefIDs::LogRouterTimer);
int64_t num = 0;
int ret = OB_SUCCESS;
while (OB_SUCC(TG_GET_QUEUE_NUM(tg_id_, num)) && num > 0) {
PAUSE();
}
if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "ObLogRouteService failed to get queue number");
}
TG_STOP(tg_id_);
TG_WAIT(tg_id_);
LOG_INFO("ObLogRouteService wait finish");
}
TG_STOP(tg_id_);
TG_WAIT(tg_id_);
LOG_INFO("ObLogRouteService wait finish");
}
void ObLogRouteService::destroy()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册