提交 f3f54ca0 编写于 作者: O obdev 提交者: wangzelin.wzl

[CP] adjust NLJ/NL connectby open

上级 edd206c0
......@@ -31,7 +31,6 @@ ObNLConnectByWithIndexOp::ObNLConnectByWithIndexOp(ObExecContext& exec_ctx, cons
is_match_(false),
is_cycle_(false),
is_inited_(false),
open_right_child_(false),
need_return_(false)
{
state_operation_func_[CNTB_STATE_JOIN_END] = &ObNLConnectByWithIndexOp::join_end_operate;
......@@ -89,7 +88,6 @@ void ObNLConnectByWithIndexOp::reset()
root_row_ = NULL;
cur_output_row_ = NULL;
is_inited_ = false;
open_right_child_ = false;
sys_connect_by_path_id_ = INT64_MAX;
need_return_ = false;
}
......@@ -99,8 +97,6 @@ int ObNLConnectByWithIndexOp::inner_open()
int ret = OB_SUCCESS;
if (OB_FAIL(ObOperator::inner_open())) {
LOG_WARN("failed to open in base class", K(ret));
} else if (OB_FAIL(left_->open())) {
LOG_WARN("failed to open left child", K(ret));
} else if (OB_FAIL(init())) {
LOG_WARN("fail to init Connect by Ctx", K(ret));
} else if (MY_SPEC.left_prior_exprs_.count() != MY_SPEC.right_prior_exprs_.count()) {
......@@ -118,22 +114,11 @@ int ObNLConnectByWithIndexOp::inner_open()
return ret;
}
int ObNLConnectByWithIndexOp::open_right_child()
{
int ret = OB_SUCCESS;
if (!right_->is_opened() && OB_FAIL(right_->open())) {
LOG_WARN("failed to open right child", K(ret));
}
return ret;
}
int ObNLConnectByWithIndexOp::rescan()
{
int ret = OB_SUCCESS;
reset();
if (OB_FAIL(open_right_child())) {
LOG_WARN("failed to open right child", K(ret));
} else if (OB_FAIL(ObOperator::rescan())) {
if (OB_FAIL(ObOperator::rescan())) {
LOG_WARN("failed to rescan", K(ret));
}
return ret;
......@@ -143,15 +128,6 @@ int ObNLConnectByWithIndexOp::inner_close()
{
int ret = OB_SUCCESS;
reset();
int left_ret = OB_SUCCESS;
int right_ret = OB_SUCCESS;
if (OB_SUCCESS != (left_ret = left_->close())) {
LOG_WARN("Close child operator failed", K(left_ret), "op_type", ob_phy_operator_type_str(get_spec().type_));
}
if (OB_SUCCESS != (right_ret = right_->close())) {
LOG_WARN("Close child operator failed", K(right_ret), "op_type", ob_phy_operator_type_str(get_spec().type_));
}
ret = (OB_SUCCESS == left_ret) ? right_ret : left_ret;
return ret;
}
......@@ -291,11 +267,6 @@ int ObNLConnectByWithIndexOp::prepare_rescan_params()
LOG_WARN("fail to set dynamic param", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(open_right_child())) {
LOG_WARN("failed to open right child", K(ret));
}
}
return ret;
}
......
......@@ -44,7 +44,7 @@ public:
virtual OperatorOpenOrder get_operator_open_order() const override final
{
return OPEN_SELF_ONLY;
return OPEN_SELF_FIRST;
}
int prepare_rescan_params();
......@@ -97,7 +97,6 @@ private:
int add_pseudo_column(ObExpr* pseudo_expr, ObConnectByPseudoColumn column_type);
int init();
int open_right_child();
public:
ObConnectByOpBFSPump connect_by_pump_;
......@@ -115,7 +114,6 @@ private:
bool is_match_; // whether there is a child, for calc connect_by_isleaf
bool is_cycle_; // whether part of a cycle, for calc connect_by_iscycle
bool is_inited_;
bool open_right_child_;
bool need_return_;
};
......
......@@ -90,13 +90,6 @@ int ObBasicNestedLoopJoin::prepare_rescan_params(ObBasicNestedLoopJoinCtx& join_
}
}
// try to open the right child
if (OB_SUCC(ret)) {
if (OB_FAIL(open_right_child(join_ctx))) {
LOG_WARN("failed to open right child", K(ret));
}
}
return ret;
}
......@@ -123,99 +116,26 @@ int ObBasicNestedLoopJoin::get_next_left_row(ObJoinCtx& join_ctx) const
return OB_SUCCESS != ret ? ret : ObJoin::get_next_left_row(join_ctx);
}
int ObBasicNestedLoopJoin::open_right_child(ObBasicNestedLoopJoinCtx& join_ctx) const
{
int ret = OB_SUCCESS;
ObPhyOperator* child_ptr = nullptr;
ObPhyOperatorCtx* child_op_ctx = nullptr;
if (join_ctx.open_right_child()) {
// do nothing
} else if (OB_ISNULL(child_ptr = get_child(SECOND_CHILD))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to get right child", K(ret));
} else if (OB_NOT_NULL(
child_op_ctx = GET_PHY_OPERATOR_CTX(ObPhyOperatorCtx, join_ctx.exec_ctx_, child_ptr->get_id()))) {
// the drain stage will open this operator, and no record in join_ctx.
join_ctx.set_open_right_child();
} else if (OB_FAIL(open_child(join_ctx.exec_ctx_, SECOND_CHILD))) {
LOG_WARN("fail to open right child", K(ret));
join_ctx.set_open_right_child();
} else {
join_ctx.set_open_right_child();
}
return ret;
}
int ObBasicNestedLoopJoin::open_left_child(ObExecContext& ctx) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(open_child(ctx, FIRST_CHILD))) {
LOG_WARN("fail to open left child", K(ret));
}
return ret;
}
int ObBasicNestedLoopJoin::inner_open(ObExecContext& ctx) const
int ObBasicNestedLoopJoin::inner_open(ObExecContext &ctx) const
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObJoin::inner_open(ctx))) {
LOG_WARN("failed to inner open join", K(ret));
} else if (OB_FAIL(open_left_child(ctx))) {
LOG_WARN("failed to open left child", K(ret));
}
return ret;
}
int ObBasicNestedLoopJoin::inner_close(ObExecContext& ctx) const
{
UNUSED(ctx);
int ret = OB_SUCCESS;
int child_ret = OB_SUCCESS;
// first call close of all children
ObPhyOperator* child_ptr = NULL;
for (int32_t i = 0; i < get_child_num(); ++i) {
if (OB_ISNULL(child_ptr = get_child(i))) {
ret = OB_NOT_INIT;
LOG_WARN("failed to get child",
K(i),
K(child_ptr),
K(get_child_num()),
K(ret),
"op_type",
ob_phy_operator_type_str(get_type()));
} else {
int tmp_ret = child_ptr->close(ctx);
if (OB_SUCCESS != tmp_ret) {
child_ret = tmp_ret;
LOG_WARN("Close child operator failed", K(child_ret), "op_type", ob_phy_operator_type_str(get_type()));
}
}
}
// no matter what, must call operatoris close function, then close this operator
ObPhyOperatorCtx* op_ctx = NULL;
if (OB_ISNULL(op_ctx = GET_PHY_OPERATOR_CTX(ObPhyOperatorCtx, ctx, get_id()))) {
LOG_DEBUG("get_phy_operator_ctx failed", K(ret), K_(id), "op_type", ob_phy_operator_type_str(get_type()));
} else {
// op_ctx->op_monitor_info_.set_value(CLOSE_TIME, common::ObTimeUtility::current_time());
}
if (OB_SUCC(ret)) {
// Can only preserve one error code
ret = child_ret;
}
return ret;
}
int ObBasicNestedLoopJoin::rescan(ObExecContext& ctx) const
{
int ret = OB_SUCCESS;
ObBasicNestedLoopJoinCtx* join_ctx = NULL;
if (OB_ISNULL(join_ctx = GET_PHY_OPERATOR_CTX(ObBasicNestedLoopJoinCtx, ctx, get_id()))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get join ctx", K(ret));
} else if (OB_FAIL(open_right_child(*join_ctx))) {
LOG_WARN("failed to open right child", K(ret));
} else if (OB_FAIL(ObJoin::rescan(ctx))) {
if (OB_FAIL(ObJoin::rescan(ctx))) {
LOG_WARN("failed to call parent rescan", K(ret));
} else {
}
......
......@@ -29,7 +29,7 @@ protected:
friend class ObBasicNestedLoopJoin;
public:
ObBasicNestedLoopJoinCtx(ObExecContext& ctx) : ObJoinCtx(ctx), open_right_child_(false)
ObBasicNestedLoopJoinCtx(ObExecContext &ctx) : ObJoinCtx(ctx)
{}
virtual ~ObBasicNestedLoopJoinCtx()
{}
......@@ -38,18 +38,9 @@ protected:
{
ObJoinCtx::destroy();
}
bool open_right_child()
{
return open_right_child_;
}
void set_open_right_child()
{
open_right_child_ = true;
}
protected:
common::ObExprCtx expr_ctx_;
bool open_right_child_;
};
struct RescanParam {
OB_UNIS_VERSION_V(1);
......@@ -104,15 +95,11 @@ protected:
virtual OperatorOpenOrder get_operator_open_order(ObExecContext& ctx) const override
{
UNUSED(ctx);
return OPEN_SELF_ONLY;
return OPEN_SELF_FIRST;
}
virtual int inner_open(ObExecContext& ctx) const override;
virtual int inner_close(ObExecContext& exec_ctx) const override;
virtual int rescan(ObExecContext& exec_ctx) const override;
// open left child
int open_left_child(ObExecContext& ctx) const;
// open right child
int open_right_child(ObBasicNestedLoopJoinCtx& join_ctx) const;
virtual int inner_open(ObExecContext &ctx) const override;
virtual int inner_close(ObExecContext &exec_ctx) const override;
virtual int rescan(ObExecContext &exec_ctx) const override;
protected:
common::ObFixedArray<RescanParam, common::ObIAllocator> rescan_params_; // @todo a better name
......
......@@ -22,8 +22,8 @@ namespace sql {
OB_SERIALIZE_MEMBER(
(ObBasicNestedLoopJoinSpec, ObJoinSpec), rescan_params_, gi_partition_id_expr_, enable_gi_partition_pruning_);
ObBasicNestedLoopJoinOp::ObBasicNestedLoopJoinOp(ObExecContext& exec_ctx, const ObOpSpec& spec, ObOpInput* input)
: ObJoinOp(exec_ctx, spec, input), open_right_child_(false)
ObBasicNestedLoopJoinOp::ObBasicNestedLoopJoinOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpInput *input)
: ObJoinOp(exec_ctx, spec, input)
{}
int ObBasicNestedLoopJoinOp::inner_open()
......@@ -32,53 +32,23 @@ int ObBasicNestedLoopJoinOp::inner_open()
int64_t left_output_cnt = spec_.get_left()->output_.count();
if (OB_FAIL(ObJoinOp::inner_open())) {
LOG_WARN("failed to inner open join", K(ret));
} else if (OB_FAIL(left_->open())) {
LOG_WARN("failed to open left child", K(ret));
}
return ret;
}
int ObBasicNestedLoopJoinOp::rescan()
{
int ret = OB_SUCCESS;
if (OB_FAIL(open_right_child())) {
LOG_WARN("failed to open right child", K(ret));
} else if (OB_FAIL(ObJoinOp::rescan())) {
if (OB_FAIL(ObJoinOp::rescan())) {
LOG_WARN("failed to call parent rescan", K(ret));
}
return ret;
}
int ObBasicNestedLoopJoinOp::open_right_child()
{
int ret = OB_SUCCESS;
if (!right_->is_opened() && OB_FAIL(right_->open())) {
LOG_WARN("failed to open right child", K(ret));
}
return ret;
}
int ObBasicNestedLoopJoinOp::inner_close()
{
int ret = OB_SUCCESS;
int left_ret = OB_SUCCESS;
int right_ret = OB_SUCCESS;
if (OB_ISNULL(left_)) {
left_ret = OB_NOT_INIT;
LOG_WARN("invalid argument", K(left_), K(ret));
} else if (OB_SUCCESS != (left_ret = left_->close())) {
LOG_WARN("Close child operator failed", K(left_ret), "op_type", ob_phy_operator_type_str(get_spec().type_));
}
if (OB_ISNULL(right_)) {
right_ret = OB_NOT_INIT;
LOG_WARN("invalid argument", K(right_), K(ret));
} else if (OB_SUCCESS != (right_ret = right_->close())) {
LOG_WARN("Close child operator failed", K(right_ret), "op_type", ob_phy_operator_type_str(get_spec().type_));
}
ret = (OB_SUCCESS == left_ret) ? right_ret : left_ret;
return ret;
}
......@@ -103,6 +73,7 @@ int ObBasicNestedLoopJoinOp::get_next_left_row()
int ObBasicNestedLoopJoinOp::prepare_rescan_params(bool is_group)
{
int ret = OB_SUCCESS;
UNUSED(is_group);
int64_t param_cnt = get_spec().rescan_params_.count();
for (int64_t i = 0; OB_SUCC(ret) && i < param_cnt; ++i) {
const ObDynamicParamSetter& rescan_param = get_spec().rescan_params_.at(i);
......@@ -121,12 +92,6 @@ int ObBasicNestedLoopJoinOp::prepare_rescan_params(bool is_group)
}
}
if (OB_SUCC(ret) && !is_group) {
if (OB_FAIL(open_right_child())) {
LOG_WARN("failed to open right child", K(ret));
}
}
return ret;
}
......
......@@ -53,13 +53,12 @@ public:
virtual ~ObBasicNestedLoopJoinOp(){};
virtual int inner_open() override;
virtual int open_right_child();
virtual int rescan() override;
virtual int inner_close() final;
virtual OperatorOpenOrder get_operator_open_order() const override final
{
return OPEN_SELF_ONLY;
return OPEN_SELF_FIRST;
}
int prepare_rescan_params(bool is_group = false);
......@@ -78,9 +77,6 @@ public:
int save_left_row();
int recover_left_row();
protected:
bool open_right_child_;
private:
DISALLOW_COPY_AND_ASSIGN(ObBasicNestedLoopJoinOp);
};
......
......@@ -304,13 +304,6 @@ int ObBLKNestedLoopJoin::join_left_func_going(ObBLKNestedLoopJoinCtx& join_ctx,
}
}
// try to open the right child
if (OB_SUCC(ret)) {
if (OB_FAIL(open_right_child(join_ctx))) {
LOG_WARN("failed to open right child", K(ret));
}
}
if (OB_SUCCESS == ret && JS_JOIN_RIGHT_JOIN == join_ctx.state_) {
if (OB_FAIL(save_left_row(join_ctx))) {
LOG_WARN("failed to save left row", K(ret));
......
......@@ -92,9 +92,6 @@ int ObNestedLoopJoin::inner_open(ObExecContext& exec_ctx) const
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL right table scan", K(ret));
} else if (FALSE_IT(right_table_scan->set_batch_scan_flag(true))) {
// never reach
} else if (OB_FAIL(open_right_child(*join_ctx))) {
LOG_WARN("failed to open right child", K(ret));
}
}
return ret;
......@@ -591,8 +588,6 @@ int ObNestedLoopJoin::bij_fill_left_rows(ObExecContext& exec_ctx) const
} else if (PHY_TABLE_SCAN != right_op_->get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("right op is not TABLE_SCAN type, plan is incorrect", K(ret), K(right_op_->get_type()));
} else if (OB_FAIL(open_right_child(*join_ctx))) {
LOG_WARN("failed to open right child", K(ret));
} else {
ObTableScan* right_table_scan = static_cast<ObTableScan*>(right_op_);
if (OB_FAIL(right_table_scan->batch_rescan_init(exec_ctx))) {
......
......@@ -51,10 +51,6 @@ int ObNestedLoopJoinOp::inner_open()
LOG_WARN("nlp_op child is null", KP(left_), KP(right_), K(ret));
} else if (OB_FAIL(ObBasicNestedLoopJoinOp::inner_open())) {
LOG_WARN("failed to open in base class", K(ret));
} else if (MY_SPEC.use_group_) {
if (OB_FAIL(open_right_child())) {
LOG_WARN("failed to open right child", K(ret));
}
}
return ret;
}
......
......@@ -623,6 +623,7 @@ int ObOperator::get_next_row()
} else if (OB_ITER_END == ret) {
int tmp_ret = drain_exch();
if (OB_SUCCESS != tmp_ret) {
ret = tmp_ret;
LOG_WARN("drain exchange data failed", K(tmp_ret));
}
if (got_first_row_) {
......
......@@ -486,6 +486,7 @@ int ObPhyOperator::get_next_row(ObExecContext& ctx, const ObNewRow*& row) const
if (OB_ITER_END == ret && NULL != op_ctx) {
int tmp_ret = drain_exch(ctx);
if (OB_SUCCESS != tmp_ret) {
ret = tmp_ret;
LOG_WARN("drain exchange data failed", K(tmp_ret));
}
if (op_ctx->got_first_row_) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册