提交 a9e70b7b 编写于 作者: O obdev 提交者: wangzelin.wzl

bug fix to open source branch

上级 80fad5f4
......@@ -1452,3 +1452,7 @@ DEF_BOOL(ob_proxy_readonly_transaction_routing_policy, OB_TENANT_PARAMETER, "tru
DEF_BOOL(_enable_block_file_punch_hole, OB_CLUSTER_PARAMETER, "False",
"specifies whether to punch whole when free blocks in block_file",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_ob_enable_px_for_inner_sql, OB_CLUSTER_PARAMETER, "true",
"specifies whether inner sql uses px. "
"The default value is TRUE. Value: TRUE: turned on FALSE: turned off",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
......@@ -3574,6 +3574,7 @@ int ObCodeGeneratorImpl::convert_set(ObLogSet& op, const PhyOpsDesc& child_ops,
const ObIArray<OrderItem>& search_order = op.get_search_ordering();
OZ(r_union->search_by_col_lists_.init(search_order.count()));
OZ(r_union->init_op_schema_obj(search_order.count()));
ARRAY_FOREACH(search_order, i)
{
const ObRawExpr* raw_expr = search_order.at(i).expr_;
......@@ -3667,6 +3668,8 @@ int ObCodeGeneratorImpl::convert_internal_sort(
int ret = OB_SUCCESS;
if (OB_FAIL(sort.init_sort_columns(sort_column.count()))) {
SQL_CG_LOG(WARN, "fail to init sort columns.", K(ret));
} else if (OB_FAIL(sort.init_op_schema_obj(sort_column.count()))) {
LOG_WARN("fail to init sort schema obj array", K(ret));
}
ARRAY_FOREACH(sort_column, i)
{
......@@ -8715,6 +8718,9 @@ int ObCodeGeneratorImpl::fill_sort_columns(const ColumnIndexProviderImpl& idx_pr
int64_t sort_idx = OB_INVALID_INDEX;
if (OB_FAIL(merge_receive.init_sort_columns(sort_keys.count()))) {
LOG_WARN("fail to init sort column", K(ret));
} else if (NULL != phy_op &&
OB_FAIL(phy_op->init_op_schema_obj(sort_keys.count()))) {
LOG_WARN("fail to get op schema obj array", K(ret));
}
ARRAY_FOREACH(sort_keys, i)
{
......
......@@ -53,7 +53,7 @@ ObPhyOperator::ObPhyOperator(ObIAllocator& alloc)
is_exact_rows_(false),
type_(PHY_INVALID),
plan_depth_(0),
op_schema_objs_()
op_schema_objs_(alloc)
{}
ObPhyOperator::~ObPhyOperator()
......@@ -1169,5 +1169,14 @@ int ObPhyOperator::try_open_and_get_operator_ctx(
return ret;
}
int ObPhyOperator::init_op_schema_obj(int64_t count)
{
int ret = OB_SUCCESS;
if (OB_FAIL(op_schema_objs_.init(count))) {
LOG_WARN("fail to init op schema obj", K(ret), K(count));
}
return ret;
}
} // namespace sql
} // namespace oceanbase
......@@ -829,6 +829,7 @@ public:
return op_schema_objs_;
}
int init_op_schema_obj(int64_t count);
private:
static const int64_t CHECK_STATUS_MASK = 0x3FF; // check_status for each 1024 rows
protected:
......@@ -856,8 +857,7 @@ protected:
bool is_exact_rows_;
ObPhyOperatorType type_; // for GDB debug purpose, no need to serialize
int32_t plan_depth_; // for plan cache explain
common::ObSEArray<ObOpSchemaObj, 8> op_schema_objs_;
common::ObFixedArray<ObOpSchemaObj, common::ObIAllocator> op_schema_objs_;
private:
DISALLOW_COPY_AND_ASSIGN(ObPhyOperator);
};
......
......@@ -204,6 +204,12 @@ int ObPxMergeSortReceive::inner_close(ObExecContext& ctx) const
if (release_merge_sort_ret != common::OB_SUCCESS) {
LOG_WARN("release dtl channel failed", K(release_merge_sort_ret));
}
// must erase after unlink channel
release_channel_ret = erase_dtl_interm_result(ctx);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_TRACE("release interm result failed", KR(release_channel_ret));
}
}
return ret;
}
......
......@@ -141,6 +141,11 @@ int ObPxMSReceiveOp::inner_close()
if (release_merge_sort_ret != common::OB_SUCCESS) {
LOG_WARN("release dtl channel failed", K(release_merge_sort_ret));
}
release_channel_ret = erase_dtl_interm_result();
if (release_channel_ret != common::OB_SUCCESS) {
LOG_TRACE("release interm result failed", KR(release_channel_ret));
}
return ret;
}
......
......@@ -294,6 +294,29 @@ int ObPxReceive::active_all_receive_channel(ObPxReceiveCtx& recv_ctx, ObExecCont
return ret;
}
int ObPxReceive::erase_dtl_interm_result(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
dtl::ObDtlChannelInfo ci;
ObDTLIntermResultKey key;
ObPxReceiveCtx *recv_ctx = NULL;
if (OB_ISNULL(recv_ctx = GET_PHY_OPERATOR_CTX(ObPxReceiveCtx, ctx, get_id()))) {
LOG_DEBUG("The operator has not been opened.", K(ret), K_(id), "op_type",
ob_phy_operator_type_str(get_type()));
} else {
for (int i = 0; i < recv_ctx->get_ch_set().count(); ++i) {
if (OB_FAIL(recv_ctx->get_ch_set().get_channel_info(i, ci))) {
LOG_WARN("fail get channel info", K(ret));
} else {
key.channel_id_ = ci.chid_;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
LOG_TRACE("fail to release recieve internal result", K(ret));
}
}
}
}
return ret;
}
//////////////////////////////////////////
ObPxFifoReceive::ObPxFifoReceive(common::ObIAllocator& alloc) : ObPxReceive(alloc)
......@@ -351,26 +374,11 @@ int ObPxFifoReceive::inner_close(ObExecContext& ctx) const
/* we must release channel even if there is some error happen before */
if (OB_NOT_NULL(recv_ctx)) {
int release_channel_ret = ObPxChannelUtil::flush_rows(recv_ctx->task_channels_);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("release dtl channel failed", K(release_channel_ret));
}
ObDTLIntermResultKey key;
ObDtlBasicChannel* channel = NULL;
;
for (int i = 0; i < recv_ctx->task_channels_.count(); ++i) {
channel = static_cast<ObDtlBasicChannel*>(recv_ctx->task_channels_.at(i));
key.channel_id_ = channel->get_id();
if (channel->use_interm_result()) {
release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("fail to release recieve internal result", KR(release_channel_ret), K(ret));
}
}
int release_channel_ret = ObPxChannelUtil::flush_rows(recv_ctx->task_channels_);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("release dtl channel failed", K(release_channel_ret));
}
dtl::ObDtlChannelLoop& loop = recv_ctx->msg_loop_;
dtl::ObDtlChannelLoop &loop = recv_ctx->msg_loop_;
release_channel_ret = loop.unregister_all_channel();
if (release_channel_ret != common::OB_SUCCESS) {
// the following unlink actions is not safe is any unregister failure happened
......@@ -380,6 +388,11 @@ int ObPxFifoReceive::inner_close(ObExecContext& ctx) const
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("release dtl channel failed", KR(release_channel_ret));
}
// must erase after unlink channel
release_channel_ret = erase_dtl_interm_result(ctx);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_TRACE("release interm result failed", KR(release_channel_ret));
}
}
return ret;
}
......
......@@ -180,6 +180,7 @@ protected:
ObPxTaskChSet& ch_set, common::ObIArray<dtl::ObDtlChannel*>& channels, dtl::ObDtlFlowControl* dfc = nullptr);
int get_sqc_id(ObExecContext& ctx, int64_t& sqc_id) const;
int erase_dtl_interm_result(ObExecContext &ctx) const;
private:
/* functions */
/* variables */
......
......@@ -302,6 +302,24 @@ int ObPxReceiveOp::active_all_receive_channel()
}
return ret;
}
int ObPxReceiveOp::erase_dtl_interm_result()
{
int ret = OB_SUCCESS;
dtl::ObDtlChannelInfo ci;
ObDTLIntermResultKey key;
for (int i = 0; i < get_ch_set().count(); ++i) {
if (OB_FAIL(get_ch_set().get_channel_info(i, ci))) {
LOG_WARN("fail get channel info", K(ret));
} else {
key.channel_id_ = ci.chid_;
if (OB_FAIL(ObDTLIntermResultManager::getInstance().erase_interm_result_info(key))) {
LOG_TRACE("fail to release recieve internal result", K(ret));
}
}
}
return ret;
}
//------------- end ObPxReceiveOp-----------------
ObPxFifoReceiveOp::ObPxFifoReceiveOp(ObExecContext& exec_ctx, const ObOpSpec& spec, ObOpInput* input)
......@@ -316,26 +334,14 @@ int ObPxFifoReceiveOp::inner_open()
int ObPxFifoReceiveOp::inner_close()
{
int ret = OB_SUCCESS;
int release_channel_ret = common::OB_SUCCESS;
/* we must release channel even if there is some error happen before */
if (channel_linked_) {
int release_channel_ret = ObPxChannelUtil::flush_rows(task_channels_);
release_channel_ret = ObPxChannelUtil::flush_rows(task_channels_);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("release dtl channel failed", K(release_channel_ret));
}
ObDTLIntermResultKey key;
ObDtlBasicChannel *channel = NULL;
int64_t recv_cnt = 0;
for (int i = 0; i < task_channels_.count(); ++i) {
channel = static_cast<ObDtlBasicChannel*>(task_channels_.at(i));
key.channel_id_ = channel->get_id();
recv_cnt += channel->get_recv_buffer_cnt();
if (channel->use_interm_result()) {
release_channel_ret = ObDTLIntermResultManager::getInstance().erase_interm_result_info(key);
if (release_channel_ret != common::OB_SUCCESS) {
LOG_WARN("fail to release recieve internal result", K(ret));
}
}
}
op_monitor_info_.otherstat_3_id_ = ObSqlMonitorStatIds::DTL_SEND_RECV_COUNT;
op_monitor_info_.otherstat_3_value_ = recv_cnt;
release_channel_ret = msg_loop_.unregister_all_channel();
......@@ -348,6 +354,11 @@ int ObPxFifoReceiveOp::inner_close()
LOG_WARN("release dtl channel failed", K(release_channel_ret));
}
}
// must erase after unlink channel
release_channel_ret = erase_dtl_interm_result();
if (release_channel_ret != common::OB_SUCCESS) {
LOG_TRACE("release interm result failed", KR(release_channel_ret));
}
return ret;
}
......
......@@ -123,7 +123,7 @@ public:
}
int64_t get_sqc_id();
int erase_dtl_interm_result();
public:
void reset_for_rescan()
{
......
......@@ -207,6 +207,7 @@ void ObGranuleIterator::ObGranuleIteratorCtx::destroy()
ranges_.reset();
pkeys_.reset();
rescan_tasks_.reset();
pwj_rescan_task_infos_.reset();
}
int ObGranuleIterator::ObGranuleIteratorCtx::parameters_init(const ObGIInput* input)
......@@ -323,9 +324,17 @@ int ObGranuleIterator::rescan(ObExecContext& ctx) const
gi_ctx->state_ = GI_GET_NEXT_GRANULE_TASK;
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("the partition wise join GI rescan not supported", K(ret));
// 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行.
// 在执行过程中缓存住了自己的任务队列.
if (GI_UNINITIALIZED == gi_ctx->state_ || GI_PREPARED == gi_ctx->state_) {
/*do nothing*/
} else {
gi_ctx->is_rescan_ = true;
gi_ctx->rescan_task_idx_ = 0;
gi_ctx->state_ = GI_GET_NEXT_GRANULE_TASK;
}
}
return ret;
}
......@@ -570,12 +579,24 @@ int ObGranuleIterator::do_get_next_granule_task(
}
}
}
if (OB_FAIL(fetch_full_pw_tasks(exec_ctx, gi_task_infos, op_ids))) {
if (OB_FAIL(ret)) {
} else if (gi_ctx.is_rescan_) {
if (gi_ctx.rescan_task_idx_ >= gi_ctx.pwj_rescan_task_infos_.count()) {
ret = OB_ITER_END;
gi_ctx.state_ = GI_END;
}
} else if (OB_FAIL(fetch_full_pw_tasks(exec_ctx, gi_task_infos, op_ids))) {
if (OB_ITER_END != ret) {
LOG_WARN("try fetch task failed", K(ret));
} else {
gi_ctx.state_ = GI_END;
}
} else {
for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(gi_ctx.pwj_rescan_task_infos_.push_back(gi_task_infos.at(i)))) {
LOG_WARN("fail to rescan pwj task info", K(ret));
}
}
}
ARRAY_FOREACH_X(op_ids, idx, cnt, OB_SUCC(ret))
{
......@@ -585,7 +606,16 @@ int ObGranuleIterator::do_get_next_granule_task(
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) {
if (gi_ctx.is_rescan_) {
if (gi_ctx.rescan_task_idx_ >= gi_ctx.pwj_rescan_task_infos_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rescan task idx is unexpected", K(gi_ctx.rescan_task_idx_),
K(gi_ctx.pwj_rescan_task_infos_.count()), K(cnt));
} else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx),
gi_ctx.pwj_rescan_task_infos_.at(gi_ctx.rescan_task_idx_++)))) {
LOG_WARN("reset table scan's ranges failed", K(ret));
}
} else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) {
LOG_WARN("reset table scan's ranges failed", K(ret));
}
LOG_DEBUG("produce a gi task(PWJ)", K(op_ids.at(idx)), K(gi_task_infos.at(idx)));
......
......@@ -119,7 +119,8 @@ public:
all_task_fetched_(false),
is_rescan_(false),
rescan_taskset_(nullptr),
rescan_task_idx_(0)
rescan_task_idx_(0),
pwj_rescan_task_infos_()
{}
virtual ~ObGranuleIteratorCtx()
{
......@@ -146,6 +147,7 @@ public:
const ObGITaskSet* rescan_taskset_ = NULL;
common::ObSEArray<ObGITaskSet::Pos, OB_MIN_PARALLEL_TASK_COUNT * 2> rescan_tasks_;
int64_t rescan_task_idx_;
common::ObSEArray<ObGranuleTaskInfo, 2> pwj_rescan_task_infos_;
};
public:
......
......@@ -220,7 +220,8 @@ ObGranuleIteratorOp::ObGranuleIteratorOp(ObExecContext& exec_ctx, const ObOpSpec
all_task_fetched_(false),
is_rescan_(false),
rescan_taskset_(nullptr),
rescan_task_idx_(0)
rescan_task_idx_(0),
pwj_rescan_task_infos_()
{}
void ObGranuleIteratorOp::destroy()
......@@ -228,6 +229,7 @@ void ObGranuleIteratorOp::destroy()
ranges_.reset();
pkeys_.reset();
rescan_tasks_.reset();
pwj_rescan_task_infos_.reset();
}
int ObGranuleIteratorOp::parameters_init()
......@@ -360,8 +362,15 @@ int ObGranuleIteratorOp::rescan()
state_ = GI_GET_NEXT_GRANULE_TASK;
}
} else {
ret = OB_NOT_SUPPORTED;
LOG_WARN("the partition wise join GI rescan not supported", K(ret));
// 在partition_wise_join的情况, 按woker第一次完整执行所抢占的任务执行.
// 在执行过程中缓存住了自己的任务队列.
if (GI_UNINITIALIZED == state_ || GI_PREPARED == state_) {
/*do nothing*/
} else {
is_rescan_ = true;
rescan_task_idx_ = 0;
state_ = GI_GET_NEXT_GRANULE_TASK;
}
}
return ret;
}
......@@ -512,12 +521,24 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool prepare /* = false */)
}
}
}
if (OB_FAIL(fetch_full_pw_tasks(gi_task_infos, op_ids))) {
if (OB_FAIL(ret)) {
} else if (is_rescan_) {
if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
ret = OB_ITER_END;
state_ = GI_END;
}
} else if (OB_FAIL(fetch_full_pw_tasks(gi_task_infos, op_ids))) {
if (OB_ITER_END != ret) {
LOG_WARN("try fetch task failed", K(ret));
} else {
state_ = GI_END;
}
} else {
for (int i = 0; i < gi_task_infos.count() && OB_SUCC(ret); ++i) {
if (OB_FAIL(pwj_rescan_task_infos_.push_back(gi_task_infos.at(i)))) {
LOG_WARN("fail to rescan pwj task info", K(ret));
}
}
}
ARRAY_FOREACH_X(op_ids, idx, cnt, OB_SUCC(ret))
......@@ -527,11 +548,21 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool prepare /* = false */)
LOG_WARN("failed to erase task", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) {
LOG_WARN("reset table scan's ranges failed", K(ret));
if (OB_SUCC(ret)) {
if (is_rescan_) {
if (rescan_task_idx_ >= pwj_rescan_task_infos_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rescan task idx is unexpected", K(rescan_task_idx_),
K(pwj_rescan_task_infos_.count()), K(cnt));
} else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx),
pwj_rescan_task_infos_.at(rescan_task_idx_++)))) {
LOG_WARN("reset table scan's ranges failed", K(ret));
}
} else if (OB_FAIL(gi_prepare_map->set_refactored(op_ids.at(idx), gi_task_infos.at(idx)))) {
LOG_WARN("reset table scan's ranges failed", K(ret));
}
LOG_DEBUG("produce a gi task(PWJ)", K(op_ids.at(idx)), K(gi_task_infos.at(idx)));
}
LOG_DEBUG("produce a gi task(PWJ)", K(op_ids.at(idx)), K(gi_task_infos.at(idx)));
}
if (OB_SUCC(ret)) {
......
......@@ -232,6 +232,9 @@ private:
const ObGITaskSet* rescan_taskset_ = NULL;
common::ObSEArray<ObGITaskSet::Pos, OB_MIN_PARALLEL_TASK_COUNT * 2> rescan_tasks_;
int64_t rescan_task_idx_;
// full pwj场景下, 在执行过程中缓存住了自己的任务队列.
// 供GI rescan使用
common::ObSEArray<ObGranuleTaskInfo, 2> pwj_rescan_task_infos_;
};
} // end namespace sql
......
......@@ -710,6 +710,8 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const
loop.ignore_interrupt();
ObPxControlChannelProc control_channels;
int64_t times_offset = 0;
int64_t last_timestamp = 0;
bool wait_msg = true;
while (OB_SUCC(ret) && wait_msg) {
ObDtlChannelLoop& loop = px_ctx.msg_loop_;
......@@ -718,7 +720,7 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const
/**
* start to get next msg.
*/
if (OB_FAIL(check_all_sqc(active_dfos, all_dfo_terminate))) {
if (OB_FAIL(check_all_sqc(active_dfos, times_offset++, all_dfo_terminate, last_timestamp))) {
LOG_WARN("fail to check sqc");
} else if (all_dfo_terminate) {
wait_msg = false;
......@@ -762,7 +764,10 @@ int ObPxCoord::wait_all_running_dfos_exit(ObExecContext& ctx) const
return ret;
}
int ObPxCoord::check_all_sqc(ObIArray<ObDfo*>& active_dfos, bool& all_dfo_terminate) const
int ObPxCoord::check_all_sqc(ObIArray<ObDfo *> &active_dfos,
int64_t times_offset,
bool &all_dfo_terminate,
int64_t &last_timestamp) const
{
int ret = OB_SUCCESS;
all_dfo_terminate = true;
......@@ -781,6 +786,15 @@ int ObPxCoord::check_all_sqc(ObIArray<ObDfo*>& active_dfos, bool& all_dfo_termin
LOG_WARN("NULL unexpected sqc", K(ret));
} else if (sqc->need_report()) {
LOG_DEBUG("wait for sqc", K(sqc));
int64_t cur_timestamp = ObTimeUtility::current_time();
// > 1s, increase gradually
// In order to get the dfo to propose as soon as possible and
// In order to avoid the interruption that is not received,
// So the interruption needs to be sent repeatedly
if (cur_timestamp - last_timestamp > (1000000 + min(times_offset, 10) * 1000000)) {
last_timestamp = cur_timestamp;
ObInterruptUtil::broadcast_dfo(active_dfos.at(i), OB_GOT_SIGNAL_ABORTING);
}
all_dfo_terminate = false;
break;
}
......
......@@ -162,11 +162,14 @@ protected:
int register_first_buffer_cache(ObExecContext& ctx, ObPxCoordCtx& px_ctx, ObDfo* root_dfo) const;
void unregister_first_buffer_cache(ObExecContext& ctx, ObPxCoordCtx& px_ctx) const;
int check_all_sqc(common::ObIArray<ObDfo*>& active_dfos, bool& all_dfo_terminate) const;
int calc_allocated_worker_count(
int64_t px_expected, int64_t query_expected, int64_t query_allocated, int64_t& allocated_worker_count) const;
int check_all_sqc(common::ObIArray<ObDfo *> &active_dfos,
int64_t time_offset,
bool &all_dfo_terminate,
int64_t &cur_timestamp) const;
int register_interrupt(ObPxCoordCtx* px_ctx) const;
void clear_interrupt(ObPxCoordCtx* px_ctx) const;
......
......@@ -649,6 +649,8 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
loop.ignore_interrupt();
ObPxControlChannelProc control_channels;
int64_t times_offset = 0;
int64_t last_timestamp = 0;
bool wait_msg = true;
while (OB_SUCC(ret) && wait_msg) {
ObDtlChannelLoop& loop = msg_loop_;
......@@ -657,7 +659,7 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
/**
* start to get next msg.
*/
if (OB_FAIL(check_all_sqc(active_dfos, all_dfo_terminate))) {
if (OB_FAIL(check_all_sqc(active_dfos, times_offset++, all_dfo_terminate, last_timestamp))) {
LOG_WARN("fail to check sqc");
} else if (all_dfo_terminate) {
wait_msg = false;
......@@ -701,7 +703,10 @@ int ObPxCoordOp::wait_all_running_dfos_exit()
return ret;
}
int ObPxCoordOp::check_all_sqc(ObIArray<ObDfo*>& active_dfos, bool& all_dfo_terminate)
int ObPxCoordOp::check_all_sqc(ObIArray<ObDfo *> &active_dfos,
int64_t times_offset,
bool &all_dfo_terminate,
int64_t &last_timestamp)
{
int ret = OB_SUCCESS;
all_dfo_terminate = true;
......@@ -720,6 +725,15 @@ int ObPxCoordOp::check_all_sqc(ObIArray<ObDfo*>& active_dfos, bool& all_dfo_term
LOG_WARN("NULL unexpected sqc", K(ret));
} else if (sqc->need_report()) {
LOG_DEBUG("wait for sqc", K(sqc));
int64_t cur_timestamp = ObTimeUtility::current_time();
// > 1s, increase gradually
// In order to get the dfo to propose as soon as possible and
// In order to avoid the interruption that is not received,
// So the interruption needs to be sent repeatedly
if (cur_timestamp - last_timestamp > (1000000 + min(times_offset, 10) * 1000000)) {
last_timestamp = cur_timestamp;
ObInterruptUtil::broadcast_dfo(active_dfos.at(i), OB_GOT_SIGNAL_ABORTING);
}
all_dfo_terminate = false;
break;
}
......
......@@ -119,7 +119,10 @@ protected:
int register_first_buffer_cache(ObDfo* root_dfo);
void unregister_first_buffer_cache();
int check_all_sqc(common::ObIArray<ObDfo*>& active_dfos, bool& all_dfo_terminate);
int check_all_sqc(common::ObIArray<ObDfo *> &active_dfos,
int64_t time_offset,
bool &all_dfo_terminate,
int64_t &cur_timestamp);
int calc_allocated_worker_count(
int64_t px_expected, int64_t query_expected, int64_t query_allocated, int64_t& allocated_worker_count);
......
......@@ -167,8 +167,8 @@ int ObInitTaskP::after_process()
return OB_NOT_SUPPORTED;
}
void ObFastInitSqcReportQCMessageCall::operator()(
hash::HashMapPair<ObInterruptibleTaskID, ObInterruptCheckerNode*>& entry)
void ObFastInitSqcReportQCMessageCall::operator()(hash::HashMapPair<ObInterruptibleTaskID,
ObInterruptCheckerNode *> &entry)
{
UNUSED(entry);
if (OB_NOT_NULL(sqc_)) {
......@@ -299,7 +299,7 @@ int ObFastInitSqcCB::deal_with_rpc_timeout_err_safely()
int ret = OB_SUCCESS;
ObDealWithRpcTimeoutCall call(addr_, retry_info_, timeout_ts_, trace_id_);
call.ret_ = OB_TIMEOUT;
ObGlobalInterruptManager* manager = ObGlobalInterruptManager::getInstance();
ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance();
if (OB_NOT_NULL(manager)) {
if (OB_FAIL(manager->get_map().atomic_refactored(interrupt_id_, call))) {
LOG_WARN("fail to deal with rpc timeout call", K(interrupt_id_));
......
......@@ -101,34 +101,43 @@ public:
{}
~ObFastInitSqcReportQCMessageCall() = default;
void operator()(hash::HashMapPair<ObInterruptibleTaskID, ObInterruptCheckerNode*>& entry);
private:
ObPxSqcMeta* sqc_;
};
class ObDealWithRpcTimeoutCall {
class ObDealWithRpcTimeoutCall
{
public:
ObDealWithRpcTimeoutCall(
common::ObAddr addr, ObQueryRetryInfo* retry_info, int64_t timeout_ts, common::ObCurTraceId::TraceId& trace_id)
: addr_(addr), retry_info_(retry_info), timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT)
{}
ObDealWithRpcTimeoutCall(common::ObAddr addr,
ObQueryRetryInfo *retry_info,
int64_t timeout_ts,
common::ObCurTraceId::TraceId &trace_id) : addr_(addr), retry_info_(retry_info),
timeout_ts_(timeout_ts), trace_id_(trace_id), ret_(common::OB_TIMEOUT) {}
~ObDealWithRpcTimeoutCall() = default;
void operator()(hash::HashMapPair<ObInterruptibleTaskID, ObInterruptCheckerNode*>& entry);
void operator() (hash::HashMapPair<ObInterruptibleTaskID,
ObInterruptCheckerNode *> &entry);
void deal_with_rpc_timeout_err();
public:
common::ObAddr addr_;
ObQueryRetryInfo* retry_info_;
ObQueryRetryInfo *retry_info_;
int64_t timeout_ts_;
common::ObCurTraceId::TraceId trace_id_;
int ret_;
};
class ObFastInitSqcCB : public obrpc::ObPxRpcProxy::AsyncCB<obrpc::OB_PX_FAST_INIT_SQC> {
class ObFastInitSqcCB
: public obrpc::ObPxRpcProxy::AsyncCB<obrpc::OB_PX_FAST_INIT_SQC>
{
public:
ObFastInitSqcCB(const common::ObAddr& server, const common::ObCurTraceId::TraceId& trace_id,
ObQueryRetryInfo* retry_info, int64_t timeout_ts, ObInterruptibleTaskID tid, ObPxSqcMeta* sqc)
: addr_(server), retry_info_(retry_info), timeout_ts_(timeout_ts), interrupt_id_(tid), sqc_(sqc)
ObFastInitSqcCB(const common::ObAddr &server,
const common::ObCurTraceId::TraceId &trace_id,
ObQueryRetryInfo *retry_info,
int64_t timeout_ts,
ObInterruptibleTaskID tid,
ObPxSqcMeta *sqc)
: addr_(server), retry_info_(retry_info),
timeout_ts_(timeout_ts), interrupt_id_(tid),
sqc_(sqc)
{
trace_id_.set(trace_id);
}
......@@ -149,10 +158,7 @@ public:
}
return newcb;
}
virtual void set_args(const Request& arg)
{
UNUSED(arg);
}
virtual void set_args(const Request &arg) { UNUSED(arg); }
int deal_with_rpc_timeout_err_safely();
void interrupt_qc(int err);
......
......@@ -931,7 +931,10 @@ int ObIndexSSTableBuilder::gen_build_macro(ObPhysicalPlan& phy_plan, ObPhyOperat
} else {
sort->set_column_count(columns.count());
if (OB_FAIL(sort->init_sort_columns(index_table_->get_rowkey_column_num()))) {
LOG_WARN("init sort column failed", K(ret), "rowkey_cnt", index_table_->get_rowkey_column_num());
LOG_WARN("init sort column failed",
K(ret), "rowkey_cnt", index_table_->get_rowkey_column_num());
} else if (OB_FAIL(sort->init_op_schema_obj(index_table_->get_rowkey_column_num()))) {
LOG_WARN("fail to init op schema obj", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < index_table_->get_rowkey_column_num(); i++) {
const bool ascending = true;
......
......@@ -514,9 +514,6 @@ int ObLogWindowFunction::get_win_partition_intersect_exprs(
bool ObLogWindowFunction::is_block_op() const
{
bool is_block_op = true;
// 对于window function算子, 在没有partition by以及完整窗口情况下,
// 所有数据作为一个窗口, 认为是block算子
// 在其他情况下, 认为是非block算子
ObWinFunRawExpr* win_expr = NULL;
for (int64_t i = 0; i < win_exprs_.count(); ++i) {
if (OB_ISNULL(win_expr = win_exprs_.at(i))) {
......
......@@ -2006,6 +2006,9 @@ int ObDMLResolver::resolve_base_or_alias_table_item_normal(uint64_t tenant_id, c
K(params_.contain_dml_),
K(is_inner_table(tschema->get_table_id())),
K(session_info_->is_inner()));
} else if (!GCONF._ob_enable_px_for_inner_sql &&
ObSQLSessionInfo::USER_SESSION != session_info_->get_session_type()) {
stmt->get_query_ctx()->forbid_use_px_ = true;
} else {
// use px, including PL, inner sql, inner connection sql triggered by CMD
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册