提交 f161cc3b 编写于 作者: S SanmuWangZJU 提交者: ob-robot

[OBCDC] Support SYS_LS DIST TRANS(including DDL/DML)

上级 a4f1b2b6
......@@ -107,18 +107,16 @@ int64_t MultiDataSourceInfo::to_string(char *buf, const int64_t buf_len) const
if (NULL != buf && buf_len > 0) {
if (has_ls_table_op_) {
(void)common::databuff_printf(buf, buf_len, pos, "ls_table_op: ");
pos += ls_attr_.to_string(buf, buf_len);
(void)common::databuff_printf(buf, buf_len, pos, "{ls_table_op: %s", to_cstring(ls_attr_));
} else {
(void)common::databuff_printf(buf, buf_len, pos, "has_ls_table_op: false");
}
(void)common::databuff_printf(buf, buf_len, pos, ", is_ddl_trans: %d", has_ddl_trans_op_);
if (has_tablet_change_op()) {
(void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: ");
pos += tablet_change_info_arr_.to_string(buf, buf_len);
(void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: %s}", to_cstring(tablet_change_info_arr_));
} else {
(void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: None");
(void)common::databuff_printf(buf, buf_len, pos, ", tablet_change_info: None}");
}
}
......
......@@ -417,7 +417,7 @@ int ObLogCommitter::recycle_task_directly_(PartTransTask &task, const bool can_a
if (OB_NOT_NULL(resource_collector_)
&& OB_SUCCESS != (revert_ret = resource_collector_->revert(&task))) {
if (OB_IN_STOP_STATE != revert_ret) {
LOG_ERROR("revert HEARTBEAT task fail", K(revert_ret), K(task));
LOG_ERROR("revert PartTransTask fail", K(revert_ret), K(task));
}
ret = OB_SUCCESS == ret ? revert_ret : ret;
}
......@@ -1090,7 +1090,7 @@ int ObLogCommitter::handle_dml_task_(PartTransTask *participants)
// Place the Binlog Record chain in the user queue
// Binlog Record may be recycled at any time
if (OB_SUCCESS == ret) {
if (OB_SUCC(ret)) {
if (OB_FAIL(commit_binlog_record_list_(*trans_ctx, cluster_id, valid_part_trans_task_count,
tenant_id, trans_commit_version))) {
if (OB_IN_STOP_STATE != ret) {
......
......@@ -155,12 +155,28 @@ int ObLogFetcherDispatcher::dispatch_ddl_trans_task_(PartTransTask &task, volati
{
int ret = OB_SUCCESS;
if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("sys_ls_handler push fail", KR(ret), K(task));
}
} else {
// succ
}
return ret;
}
// dispatch sys_ls task into sys_ls_handler.
// Including following trans_type: DDL/SYS_LS_HB/SYS_LS_OFFLINE
int ObLogFetcherDispatcher::dispatch_to_sys_ls_handler_(PartTransTask &task, volatile bool &stop_flag)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sys_ls_handler_)) {
ret = OB_INVALID_ERROR;
LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_));
} else {
// DDL transaction push into DDLHandler
LOG_DEBUG("dispatch ddl_trans to sys_ls_handler", K(task));
LOG_DEBUG("dispatch sys_ls_trans to sys_ls_handler", K(task));
RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT);
}
......@@ -189,12 +205,10 @@ int ObLogFetcherDispatcher::dispatch_ls_heartbeat_(PartTransTask &task, volatile
// Heartbeat of the DDL partition is distributed to the DDL processor
if (task.is_sys_ls_heartbeat()) {
if (OB_ISNULL(sys_ls_handler_)) {
ret = OB_INVALID_ERROR;
LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_));
} else {
// Push into DDL Handler
RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT);
if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("dispatch sys_ls_heartbeat task into sys_ls_handler failed", KR(ret), K(task));
}
}
} else {
ret = dispatch_to_committer_(task, stop_flag);
......@@ -210,12 +224,10 @@ int ObLogFetcherDispatcher::dispatch_offline_ls_task_(PartTransTask &task,
// DDL partition's offline tasks are distributed to DDL processors
if (task.is_sys_ls_offline_task()) {
if (OB_ISNULL(sys_ls_handler_)) {
ret = OB_INVALID_ERROR;
LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_));
} else {
// Push into DDL Handler
RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT);
if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("dispatch sys_ls_offline task into sys_ls_handler failed", KR(ret), K(task));
}
}
} else {
ret = dispatch_to_committer_(task, stop_flag);
......@@ -255,13 +267,11 @@ int ObLogFetcherDispatcher::dispatch_ls_table_op_(PartTransTask &task, volatile
{
int ret = OB_SUCCESS;
if (OB_ISNULL(sys_ls_handler_)) {
ret = OB_INVALID_ERROR;
LOG_ERROR("invalid sys_ls_handler", KR(ret), K(sys_ls_handler_));
} else {
// DDL transaction push into DDLHandler
LOG_DEBUG("dispatch ls_table_op to sys_ls_handler", K(task));
RETRY_FUNC(stop_flag, *sys_ls_handler_, push, &task, DATA_OP_TIMEOUT);
// ls table op task expected only in sys_ls.
if (OB_FAIL(dispatch_to_sys_ls_handler_(task, stop_flag))) {
if (OB_IN_STOP_STATE != ret) {
LOG_ERROR("dispatch sys_ls_heartbeat task into sys_ls_handler failed", KR(ret), K(task));
}
}
return ret;
......
......@@ -56,6 +56,7 @@ private:
int dispatch_offline_ls_task_(PartTransTask &task, volatile bool &stop_flag);
int dispatch_global_ls_heartbeat_(PartTransTask &task, volatile bool &stop_flag);
int dispatch_ls_table_op_(PartTransTask &task, volatile bool &stop_flag);
int dispatch_to_sys_ls_handler_(PartTransTask &task, volatile bool &stop_flag);
private:
bool inited_;
......
......@@ -150,8 +150,8 @@ int ObLogPartTransParser::parse(ObLogEntryTask &task, volatile bool &stop_flag)
ret = OB_INVALID_DATA;
// Calibrate data for completeness
} else if (OB_UNLIKELY(! redo_node->check_data_integrity())) {
LOG_ERROR("redo data is not valid", KPC(redo_node));
ret = OB_INVALID_DATA;
LOG_ERROR("redo data is not valid", KR(ret), KPC(redo_node));
} else if (OB_FAIL(parse_stmts_(tenant, *redo_node,
task, *part_trans_task, row_index, stop_flag))) {
LOG_ERROR("parse_stmts_ fail", KR(ret), K(tenant), KPC(redo_node), K(task), K(row_index));
......
......@@ -3004,6 +3004,9 @@ int PartTransTask::commit(
if (! tls_id_.is_sys_log_stream()) {
set_ref_cnt(sorted_redo_list_.get_node_number() + 1);
} else if (is_sys_ls_dml_trans()) {
// set ref for DML in SYS_LS
set_ref_cnt(1);
}
}
}
......@@ -3023,6 +3026,8 @@ int PartTransTask::try_to_set_data_ready_status()
ret = OB_STATE_NOT_MATCH;
} else if (is_data_ready()) {
// do nothing
} else if (is_sys_ls_dml_trans()) {
set_data_ready();
} else if (is_contain_empty_redo_log()) {
set_data_ready();
} else {
......@@ -3507,7 +3512,11 @@ int PartTransTask::wait_formatted(const int64_t timeout, ObCond &cond)
void PartTransTask::set_data_ready()
{
LOG_DEBUG("[STAT] [TRANS_TASK] SET_DATA_READY", K_(is_data_ready), "task", *this);
sorted_redo_list_.init_iterator();
if (is_sys_ls_dml_trans()) {
sorted_redo_list_.mark_sys_ls_dml_trans_dispatched();
} else {
sorted_redo_list_.init_iterator();
}
(void)ATOMIC_SET(&is_data_ready_, true);
wait_data_ready_cond_.signal();
}
......
......@@ -919,6 +919,7 @@ public:
return is_offline_ls_task() && is_sys_ls_part_trans();
}
bool is_not_served_trans() const { return TASK_TYPE_NOT_SERVED_TRANS == type_; }
bool is_sys_ls_dml_trans() const { return is_dml_trans() && is_sys_ls_part_trans(); }
void set_task_info(const TenantLSID &tls_id, const char *info);
......@@ -1030,7 +1031,6 @@ public:
bool is_served() const { return SERVED == serve_state_; }
bool is_single_ls_trans() const { return transaction::TransType::SP_TRANS == trans_type_; }
bool is_dist_trans() const { return transaction::TransType::DIST_TRANS == trans_type_; }
void is_part_trans_sort_finish() const { sorted_redo_list_.is_dml_stmt_iter_end(); }
bool is_part_dispatch_finish() const { return sorted_redo_list_.is_dispatch_finish(); }
void inc_sorted_br() { ATOMIC_INC(&output_br_count_by_turn_); }
// get and reset sorted br count
......
......@@ -842,6 +842,7 @@ void ObLogResourceCollector::do_stat_(PartTransTask &task,
(void)ATOMIC_AAF(&total_part_trans_task_count_, cnt);
if (task.is_ddl_trans()) {
LOG_DEBUG("do_stat_ for ddl_trans", K_(ddl_part_trans_task_count), K(cnt), K(task), "lbt", lbt_oblog());
(void)ATOMIC_AAF(&ddl_part_trans_task_count_, cnt);
} else if (task.is_dml_trans()) {
(void)ATOMIC_AAF(&dml_part_trans_task_count_, cnt);
......
......@@ -224,7 +224,7 @@ int ObLogSequencer::push(PartTransTask *part_trans_task, volatile bool &stop_fla
if (OB_SUCC(ret)) {
(void)ATOMIC_AAF(&queue_part_trans_task_count_, 1);
do_stat_for_part_trans_task_count_(*part_trans_task, 1);
do_stat_for_part_trans_task_count_(*part_trans_task, 1, false/*is_sub_stat*/);
}
if (OB_FAIL(ret)) {
......@@ -709,7 +709,7 @@ int ObLogSequencer::push_task_into_committer_(PartTransTask *task,
LOG_ERROR("sequencer has not been initialized", KR(ret), K(tenant));
} else {
// Counting the number of partitioned tasks
do_stat_for_part_trans_task_count_(*task, -task_count);
do_stat_for_part_trans_task_count_(*task, task_count, true/*is_sub_stat*/);
RETRY_FUNC(stop_flag, (*trans_committer_), push, task, task_count, DATA_OP_TIMEOUT, tenant);
}
......@@ -753,7 +753,8 @@ int ObLogSequencer::handle_participants_ready_trans_(const bool is_dml_trans,
ObByteLockGuard guard(trans_queue_lock_);
trans_queue_.push(trx_sort_elem);
_DSTAT("[TRANS_QUEUE] TRANS_ID=%s QUEUE_SIZE=%lu IS_DML=%d",
_DSTAT("[TRANS_QUEUE] TENANT_ID=%lu TRANS_ID=%s QUEUE_SIZE=%lu IS_DML=%d",
tenant_id,
to_cstring(trx_sort_elem),
trans_queue_.size(),
is_dml_trans);
......@@ -842,6 +843,11 @@ int ObLogSequencer::handle_multi_data_source_info_(
IObLogPartMgr &part_mgr = tenant.get_part_mgr();
while (OB_SUCC(ret) && OB_NOT_NULL(part_trans_task)) {
if (! part_trans_task->is_sys_ls_part_trans()) {
// USER_LS part_trans_task in DIST_DDL_TRANS won't into dispatcher, set_ref_cnt to 1 to
// recycle the part_trans_task.
part_trans_task->set_ref_cnt(1);
}
if (part_trans_task->get_multi_data_source_info().has_tablet_change_op()) {
const CDCTabletChangeInfoArray &tablet_change_info_arr =
part_trans_task->get_multi_data_source_info().get_tablet_change_info_arr();
......@@ -1083,31 +1089,45 @@ int ObLogSequencer::recycle_resources_after_trans_ready_(TransCtx &trans_ctx, Ob
return ret;
}
void ObLogSequencer::do_stat_for_part_trans_task_count_(PartTransTask &part_trans_task,
const int64_t task_count)
void ObLogSequencer::do_stat_for_part_trans_task_count_(
PartTransTask &part_trans_task,
const int64_t task_count,
const bool is_sub_stat)
{
bool is_hb_sub_stat = false;
int64_t hb_dec_task_count = 0;
int64_t op_task_count = task_count;
if (is_sub_stat) {
op_task_count = -1 * task_count;
}
if (part_trans_task.is_ddl_trans()) {
(void)ATOMIC_AAF(&ddl_part_trans_task_count_, task_count);
if (is_sub_stat) {
(void)ATOMIC_AAF(&ddl_part_trans_task_count_, -1);
// dist ddl_task contains dml part_trans_task, should do_stat seperately
if (task_count > 1) {
(void)ATOMIC_AAF(&dml_part_trans_task_count_, 1 - task_count);
}
} else {
(void)ATOMIC_AAF(&ddl_part_trans_task_count_, op_task_count);
}
} else if (part_trans_task.is_dml_trans()) {
(void)ATOMIC_AAF(&dml_part_trans_task_count_, task_count);
(void)ATOMIC_AAF(&dml_part_trans_task_count_, op_task_count);
} else {
// heartbeat
if (task_count < 0) {
if (is_sub_stat) {
is_hb_sub_stat = true;
hb_dec_task_count = task_count * SequencerThread::get_thread_num();
hb_dec_task_count = op_task_count * SequencerThread::get_thread_num();
(void)ATOMIC_AAF(&hb_part_trans_task_count_, hb_dec_task_count);
} else {
(void)ATOMIC_AAF(&hb_part_trans_task_count_, task_count);
(void)ATOMIC_AAF(&hb_part_trans_task_count_, op_task_count);
}
}
if (is_hb_sub_stat) {
(void)ATOMIC_AAF(&total_part_trans_task_count_, hb_dec_task_count);
} else {
(void)ATOMIC_AAF(&total_part_trans_task_count_, task_count);
(void)ATOMIC_AAF(&total_part_trans_task_count_, op_task_count);
}
}
......
......@@ -182,8 +182,10 @@ private:
const int64_t task_count,
volatile bool &stop_flag,
ObLogTenant *tenant);
void do_stat_for_part_trans_task_count_(PartTransTask &part_trans_task,
const int64_t task_count);
void do_stat_for_part_trans_task_count_(
PartTransTask &part_trans_task,
const int64_t task_count,
const bool is_sub_stat);
// TODO add
// 1. statistics on transaction tps and rps (rps before and after Formatter filtering)
......
......@@ -290,6 +290,12 @@ public:
ATOMIC_SET(&sorted_redo_count_, 0);
ATOMIC_SET(&sorted_row_seq_no_, 0);
}
void reset_for_sys_ls_dml_trans(const int64_t redo_node_count)
{
ATOMIC_SET(&dispatched_redo_count_, redo_node_count);
ATOMIC_SET(&sorted_redo_count_, redo_node_count);
ATOMIC_SET(&sorted_row_seq_no_, 0);
}
OB_INLINE int64_t get_dispatched_redo_count() const { return ATOMIC_LOAD(&dispatched_redo_count_); }
OB_INLINE void inc_dispatched_redo_count() { ATOMIC_INC(&dispatched_redo_count_); }
OB_INLINE int64_t get_sorted_redo_count() const { return ATOMIC_LOAD(&sorted_redo_count_); }
......@@ -435,6 +441,15 @@ struct SortedRedoLogList
"cur_sort_redo", static_cast<DmlRedoLogNode*>(cur_sort_redo_),
KP_(cur_sort_stmt),
K_(is_dml_stmt_iter_end));
void mark_sys_ls_dml_trans_dispatched()
{
cur_dispatch_redo_ = NULL;
cur_sort_redo_ = NULL;
cur_sort_stmt_ = NULL;
sorted_progress_.reset_for_sys_ls_dml_trans(node_num_);
}
};
} // namespace libobcdc
} // namespace oceanbase
......
......@@ -34,7 +34,7 @@
if (err_no == ret) { \
retry_cnt ++; \
if (0 == retry_cnt % 200) { \
LOG_WARN(#func "retry for too many times", KP(&var), K(var)); \
LOG_WARN(#func " retry for too many times", KP(&var), K(var)); \
} \
/* sleep 5 ms*/ \
ob_usleep(5 * 1000); \
......
......@@ -88,6 +88,7 @@ int ObLogTransRedoDispatcher::dispatch_trans_redo(TransCtx &trans, volatile bool
LOG_ERROR("failed to dispatch redo of trans", KR(ret), K(trans), K(stop_flag));
}
} else {
trans.set_trans_redo_dispatched();
trans_stat_mgr_->do_dispatch_trans_stat();
}
......@@ -175,7 +176,6 @@ int ObLogTransRedoDispatcher::dispatch_by_turn_(TransCtx &trans, volatile bool &
LOG_WARN("trans dispatch_by_turn for too many times", KR(ret), K(retry_cnt), K(trans), K_(trans_dispatch_ctx));
}
} else {
trans.set_trans_redo_dispatched();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册