提交 e1656f5d 编写于 作者: O obdev 提交者: ob-robot

[CP] Change arena alloctor to malloc allocator in connect by pump.

上级 1ef21d96
......@@ -102,6 +102,7 @@ int ObConnectByOpPump::add_root_row()
int ret = OB_SUCCESS;
PumpNode node;
node.level_ = 1;
bool is_push = false;
for (int64_t i = 0; OB_SUCC(ret) && i < connect_by_path_count_; i++) {
if (OB_FAIL(node.sys_path_length_.push_back(0))) {
LOG_WARN("array push back failed", K(ret));
......@@ -115,9 +116,13 @@ int ObConnectByOpPump::add_root_row()
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(deep_copy_row(*cur_output_exprs_, node.output_row_))) {
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(push_back_node_to_stack(node))) {
} else if (OB_FAIL(push_back_node_to_stack(node, is_push))) {
LOG_WARN("fail to push back row", K(ret));
} else if (OB_FAIL(alloc_iter(pump_stack_.at(pump_stack_.count() - 1)))) {
LOG_WARN("alloc iterator failed", K(ret));
}
if (!is_push) {
free_pump_node(node);
}
return ret;
}
......@@ -138,6 +143,9 @@ int ObConnectByOpPump::push_back_store_row()
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(datum_store_.add_row(*dst_row))) {
LOG_WARN("datum store add row failed", K(ret));
} else if (OB_NOT_NULL(dst_row)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(dst_row));
dst_row = NULL;
}
return ret;
}
......@@ -155,37 +163,29 @@ int ObConnectByOpPump::alloc_iter(PumpNode &pop_node)
return ret;
}
int ObConnectByOpPump::free_pump_node(PumpNode &pop_node)
void ObConnectByOpPump::free_pump_node(PumpNode &pop_node)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(pop_node.pump_row_) || OB_ISNULL(pop_node.output_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pop node", K(ret));
} else {
if (OB_NOT_NULL(pop_node.pump_row_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.pump_row_));
}
if (OB_NOT_NULL(pop_node.output_row_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.output_row_));
if (OB_ISNULL(pop_node.prior_exprs_result_)) {
} else {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.prior_exprs_result_));
}
if (OB_ISNULL(pop_node.first_child_)) {
} else {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.first_child_));
}
if (OB_NOT_NULL(pop_node.row_fetcher_.iterator_)) {
pop_node.row_fetcher_.iterator_->~Iterator();
allocator_.free(pop_node.row_fetcher_.iterator_);
}
pop_node.pump_row_ = NULL;
pop_node.output_row_ = NULL;
pop_node.prior_exprs_result_ = NULL;
pop_node.first_child_ = NULL;
pop_node.row_fetcher_.iterator_ = NULL;
}
return ret;
if (OB_NOT_NULL(pop_node.prior_exprs_result_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.prior_exprs_result_));
}
if (OB_NOT_NULL(pop_node.first_child_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.first_child_));
}
if (OB_NOT_NULL(pop_node.row_fetcher_.iterator_)) {
pop_node.row_fetcher_.iterator_->~Iterator();
allocator_.free(pop_node.row_fetcher_.iterator_);
}
pop_node.pump_row_ = NULL;
pop_node.output_row_ = NULL;
pop_node.prior_exprs_result_ = NULL;
pop_node.first_child_ = NULL;
pop_node.row_fetcher_.iterator_ = NULL;
}
int ObConnectByOpPump::free_pump_node_stack(ObIArray<PumpNode> &stack)
......@@ -195,8 +195,8 @@ int ObConnectByOpPump::free_pump_node_stack(ObIArray<PumpNode> &stack)
while(OB_SUCC(ret) && false == stack.empty()) {
if (OB_FAIL(stack.pop_back(pop_node))) {
LOG_WARN("fail to pop back pump_stack", K(ret));
} else if (OB_FAIL(free_pump_node(pop_node))) {
LOG_WARN("free pump node failed", K(ret));
} else {
free_pump_node(pop_node);
}
}
return ret;
......@@ -214,14 +214,19 @@ int ObConnectByOpPump::get_top_pump_node(PumpNode *&node)
return ret;
}
int ObConnectByOpPump::push_back_node_to_stack(PumpNode &node)
int ObConnectByOpPump::push_back_node_to_stack(PumpNode &node, bool &is_push)
{
int ret = OB_SUCCESS;
is_push = false;
PumpNode *top_node = pump_stack_.count() > 0 ? &pump_stack_.at(pump_stack_.count() - 1) : NULL;
if (OB_FAIL(calc_prior_and_check_cycle(node, true/* add node to hashset */, top_node))) {
LOG_WARN("fail to calc path node", K(ret));
} else if (false == node.is_cycle_ && OB_FAIL(pump_stack_.push_back(node))) {
LOG_WARN("fail to push back row", K(ret));
} else if (false == node.is_cycle_) {
if (OB_FAIL(pump_stack_.push_back(node))) {
LOG_WARN("fail to push back row", K(ret));
} else {
is_push = true;
}
}
if (OB_FAIL(ret)) {//if fail free memory
......@@ -240,15 +245,9 @@ int ObConnectByOpPump::init(const ObNLConnectBySpec &connect_by, ObNLConnectByOp
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(eval_ctx.exec_ctx_.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else if (OB_FAIL(hash_filter_rows_.create(CONNECT_BY_TREE_HEIGHT))) {
LOG_WARN("create hash set failed", K(ret));
} else {
uint64_t tenant_id = eval_ctx.exec_ctx_.get_my_session()->get_effective_tenant_id();
allocator_.set_tenant_id(tenant_id);
allocator_.set_ctx_id(ObCtxIds::WORK_AREA);
connect_by_prior_exprs_ = &connect_by.connect_by_prior_exprs_;
eval_ctx_ = &eval_ctx;
connect_by_ = &connect_by_op;
......@@ -298,6 +297,7 @@ int ObConnectByOpPump::join_right_table(PumpNode &node, bool &matched)
LOG_WARN("calc other conds failed", K(ret));
} else if (matched) {
PumpNode next_node;
bool is_push = false;
next_node.level_ = node.level_ + 1;
if (next_node.level_ >= CONNECT_BY_MAX_NODE_NUM) {
ret = OB_ERR_CBY_NO_MEMORY;
......@@ -316,15 +316,16 @@ int ObConnectByOpPump::join_right_table(PumpNode &node, bool &matched)
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(deep_copy_row(*right_prior_exprs_, next_node.pump_row_))) {
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(push_back_node_to_stack(next_node))) {
} else if (OB_FAIL(push_back_node_to_stack(next_node, is_push))) {
LOG_WARN("push back node to stack failed", K(ret));
} else if (next_node.is_cycle_) {
//nocycle模式如果匹配到的子节点与某个祖先节点相同,那么放弃这个子节点继续搜索
matched = false;
if (OB_FAIL(free_pump_node(next_node))) {
LOG_WARN("free pump node failed", K(ret));
}
} else if (OB_FAIL(alloc_iter(pump_stack_.at(pump_stack_.count() - 1)))) {
LOG_WARN("alloc iterator failed", K(ret));
}
if (!is_push) {
free_pump_node(next_node);
}
}
}
......@@ -377,8 +378,8 @@ int ObConnectByOpPump::get_next_row()
KPC(pop_node.prior_exprs_result_));
}
}
if (OB_SUCC(ret) && OB_FAIL(free_pump_node(pop_node))) {
LOG_WARN("free pump node failed", K(ret));
if (OB_SUCC(ret)) {
free_pump_node(pop_node);
}
}
}
......
......@@ -28,7 +28,7 @@ private:
class MallocWrapper: public common::ObMalloc
{
public:
explicit MallocWrapper(const char *label): allocator_(label), alloc_cnt_(0) {}
explicit MallocWrapper(): allocator_(NULL), alloc_cnt_(0) {}
virtual ~MallocWrapper()
{
if (OB_UNLIKELY(alloc_cnt_ != 0)) {
......@@ -37,31 +37,26 @@ private:
}
void *alloc(const int64_t sz)
{
void *mem = allocator_.alloc(sz);
void *mem = allocator_->alloc(sz);
alloc_cnt_ = nullptr == mem ? alloc_cnt_ : alloc_cnt_ + 1;
return mem;
}
void *alloc(const int64_t sz, const common::ObMemAttr &attr)
{
void *mem = allocator_.alloc(sz, attr);
void *mem = allocator_->alloc(sz, attr);
alloc_cnt_ = nullptr == mem ? alloc_cnt_ : alloc_cnt_ + 1;
return mem;
}
void free(void *ptr)
{
--alloc_cnt_;
allocator_.free(ptr);
allocator_->free(ptr);
}
void set_tenant_id(int64_t tenant_id)
{
allocator_.set_tenant_id(tenant_id);
}
void set_ctx_id(int64_t ctx_id)
{
allocator_.set_ctx_id(ctx_id);
void set_allocator(common::ObIAllocator &alloc) {
allocator_ = &alloc;
}
private:
common::ObArenaAllocator allocator_;
common::ObIAllocator *allocator_;
int64_t alloc_cnt_;
};
......@@ -75,7 +70,7 @@ public:
right_prior_exprs_(NULL),
eval_ctx_(NULL),
// connect_by_root_row_(NULL),
allocator_(ObModIds::OB_CONNECT_BY_PUMP),
allocator_(),
is_inited_(false),
cur_level_(1),
never_meet_cycle_(true),
......@@ -295,15 +290,16 @@ public:
private:
// int alloc_prior_row_cells(uint64_t row_count);
int push_back_node_to_stack(PumpNode &node);
int push_back_node_to_stack(PumpNode &node, bool &is_push);
int calc_prior_and_check_cycle(PumpNode &node, bool set_refactored, PumpNode *left_node);
int check_cycle(const ObChunkDatumStore::StoredRow *row, bool set_refactored);
int check_child_cycle(PumpNode &node, PumpNode *left_node);
int free_pump_node(PumpNode &node);
void free_pump_node(PumpNode &node);
int alloc_iter(PumpNode &node);
int free_pump_node_stack(ObIArray<PumpNode> &stack);
void set_row_store_constructed() { datum_store_constructed_ = true; }
bool get_row_store_constructed() { return datum_store_constructed_; }
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
private:
static const int64_t SYS_PATH_BUFFER_INIT_SIZE = 128;
......
......@@ -273,13 +273,7 @@ int ObConnectByOpBFSPump::init(ObNLConnectByWithIndexSpec &connect_by,
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(eval_ctx.exec_ctx_.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else {
uint64_t tenant_id = eval_ctx.exec_ctx_.get_my_session()->get_effective_tenant_id();
allocator_.set_tenant_id(tenant_id);
allocator_.set_ctx_id(ObCtxIds::WORK_AREA);
connect_by_ = &connect_by_op;
connect_by_prior_exprs_ = &connect_by.connect_by_prior_exprs_;
left_prior_exprs_ = &connect_by.left_prior_exprs_;
......
......@@ -129,6 +129,7 @@ private:
int check_cycle_path();
int free_path_stack();
int free_pump_node_stack(ObIArray<PumpNode> &stack);
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
private:
common::ObArray<PumpNode> pump_stack_;
common::ObArray<PathNode> path_stack_;
......
......@@ -286,6 +286,7 @@ int ObNLConnectByOp::inner_open()
true /* enable dump */, 0))) {
LOG_WARN("init chunk row store failed", K(ret));
} else {
connect_by_pump_.set_allocator(mem_context_->get_malloc_allocator());
connect_by_pump_.datum_store_.set_allocator(mem_context_->get_malloc_allocator());
}
}
......@@ -533,6 +534,10 @@ int ObNLConnectByOp::calc_pseudo_flags(ObConnectByOpPump::PumpNode &node)
finished = (false == output_cycle || node.is_cycle_)
&& (false == output_leaf || false == node.is_leaf_);
}
if (OB_NOT_NULL(next_node.prior_exprs_result_)) {
connect_by_pump_.allocator_.free(
const_cast<ObChunkDatumStore::StoredRow *>(next_node.prior_exprs_result_));
}
}
}
if (OB_ITER_END == ret) {
......
......@@ -34,7 +34,8 @@ ObNLConnectByWithIndexOp::ObNLConnectByWithIndexOp(ObExecContext &exec_ctx, cons
is_match_(false),
is_cycle_(false),
is_inited_(false),
need_return_(false)
need_return_(false),
mem_context_(NULL)
{
state_operation_func_[CNTB_STATE_JOIN_END] = &ObNLConnectByWithIndexOp::join_end_operate;
state_function_func_[CNTB_STATE_JOIN_END][FT_ITER_GOING] = NULL;
......@@ -112,6 +113,19 @@ int ObNLConnectByWithIndexOp::inner_open()
&& MY_SPEC.cmp_funcs_.count() != MY_SPEC.left_prior_exprs_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status: cmp func is not match with prior exprs", K(ret));
} else {
lib::ContextParam param;
int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
param.set_mem_attr(tenant_id, ObModIds::OB_CONNECT_BY_PUMP, ObCtxIds::WORK_AREA)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
LOG_WARN("create entity failed", K(ret));
} else if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
} else {
connect_by_pump_.set_allocator(mem_context_->get_malloc_allocator());
}
}
return ret;
}
......
......@@ -107,6 +107,7 @@ private:
bool is_cycle_;//判断是否有循环的情况,用来生产connect_by_iscycle
bool is_inited_;
bool need_return_;
lib::MemoryContext mem_context_;
};
}//sql
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册