提交 cb09edf4 编写于 作者: O obdev 提交者: ob-robot

[CP] Change arena alloctor to malloc allocator in connect by pump.

上级 21bb2cd2
......@@ -94,6 +94,7 @@ int ObConnectByOpPump::add_root_row()
int ret = OB_SUCCESS;
PumpNode node;
node.level_ = 1;
bool is_push = false;
for (int64_t i = 0; OB_SUCC(ret) && i < connect_by_path_count_; i++) {
if (OB_FAIL(node.sys_path_length_.push_back(0))) {
LOG_WARN("array push back failed", K(ret));
......@@ -107,9 +108,13 @@ int ObConnectByOpPump::add_root_row()
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(deep_copy_row(*cur_output_exprs_, node.output_row_))) {
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(push_back_node_to_stack(node))) {
} else if (OB_FAIL(push_back_node_to_stack(node, is_push))) {
LOG_WARN("fail to push back row", K(ret));
} else if (OB_FAIL(alloc_iter(pump_stack_.at(pump_stack_.count() - 1)))) {
LOG_WARN("alloc iterator failed", K(ret));
}
if (!is_push) {
free_pump_node(node);
}
return ret;
}
......@@ -130,6 +135,9 @@ int ObConnectByOpPump::push_back_store_row()
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(datum_store_.add_row(*dst_row))) {
LOG_WARN("datum store add row failed", K(ret));
} else if (OB_NOT_NULL(dst_row)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(dst_row));
dst_row = NULL;
}
return ret;
}
......@@ -147,36 +155,29 @@ int ObConnectByOpPump::alloc_iter(PumpNode& pop_node)
return ret;
}
int ObConnectByOpPump::free_pump_node(PumpNode& pop_node)
void ObConnectByOpPump::free_pump_node(PumpNode& pop_node)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(pop_node.pump_row_) || OB_ISNULL(pop_node.output_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pop node", K(ret));
} else {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow*>(pop_node.pump_row_));
allocator_.free(const_cast<ObChunkDatumStore::StoredRow*>(pop_node.output_row_));
if (OB_ISNULL(pop_node.prior_exprs_result_)) {
} else {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow*>(pop_node.prior_exprs_result_));
}
if (OB_ISNULL(pop_node.first_child_)) {
} else {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow*>(pop_node.first_child_));
}
if (OB_NOT_NULL(pop_node.iter_)) {
allocator_.free(pop_node.iter_);
}
pop_node.pump_row_ = NULL;
pop_node.output_row_ = NULL;
pop_node.prior_exprs_result_ = NULL;
pop_node.first_child_ = NULL;
pop_node.iter_ = nullptr;
if (OB_NOT_NULL(pop_node.pump_row_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.pump_row_));
}
return ret;
if (OB_NOT_NULL(pop_node.output_row_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.output_row_));
}
if (OB_NOT_NULL(pop_node.prior_exprs_result_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.prior_exprs_result_));
}
if (OB_NOT_NULL(pop_node.first_child_)) {
allocator_.free(const_cast<ObChunkDatumStore::StoredRow *>(pop_node.first_child_));
}
if (OB_NOT_NULL(pop_node.iter_)) {
pop_node.iter_->~Iterator();
allocator_.free(pop_node.iter_);
}
pop_node.pump_row_ = NULL;
pop_node.output_row_ = NULL;
pop_node.prior_exprs_result_ = NULL;
pop_node.first_child_ = NULL;
pop_node.iter_ = nullptr;
}
int ObConnectByOpPump::free_pump_node_stack(ObIArray<PumpNode>& stack)
......@@ -186,8 +187,8 @@ int ObConnectByOpPump::free_pump_node_stack(ObIArray<PumpNode>& stack)
while (OB_SUCC(ret) && false == stack.empty()) {
if (OB_FAIL(stack.pop_back(pop_node))) {
LOG_WARN("fail to pop back pump_stack", K(ret));
} else if (OB_FAIL(free_pump_node(pop_node))) {
LOG_WARN("free pump node failed", K(ret));
} else {
free_pump_node(pop_node);
}
}
return ret;
......@@ -205,14 +206,19 @@ int ObConnectByOpPump::get_top_pump_node(PumpNode*& node)
return ret;
}
int ObConnectByOpPump::push_back_node_to_stack(PumpNode& node)
int ObConnectByOpPump::push_back_node_to_stack(PumpNode& node, bool &is_push)
{
int ret = OB_SUCCESS;
is_push = false;
PumpNode* top_node = pump_stack_.count() > 0 ? &pump_stack_.at(pump_stack_.count() - 1) : NULL;
if (OB_FAIL(calc_prior_and_check_cycle(node, true /* add node to hashset */, top_node))) {
LOG_WARN("fail to calc path node", K(ret));
} else if (false == node.is_cycle_ && OB_FAIL(pump_stack_.push_back(node))) {
LOG_WARN("fail to push back row", K(ret));
} else if (false == node.is_cycle_) {
if (OB_FAIL(pump_stack_.push_back(node))) {
LOG_WARN("fail to push back row", K(ret));
} else {
is_push = true;
}
}
if (OB_FAIL(ret)) { // if fail free memory
......@@ -230,14 +236,9 @@ int ObConnectByOpPump::init(const ObNLConnectBySpec& connect_by, ObNLConnectByOp
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(eval_ctx.exec_ctx_.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else if (OB_FAIL(hash_filter_rows_.create(CONNECT_BY_TREE_HEIGHT))) {
LOG_WARN("create hash set failed", K(ret));
} else {
uint64_t tenant_id = eval_ctx.exec_ctx_.get_my_session()->get_effective_tenant_id();
allocator_.set_tenant_id(tenant_id);
connect_by_prior_exprs_ = &connect_by.connect_by_prior_exprs_;
eval_ctx_ = &eval_ctx;
connect_by_ = &connect_by_op;
......@@ -287,6 +288,7 @@ int ObConnectByOpPump::join_right_table(PumpNode& node, bool& matched)
LOG_WARN("calc other conds failed", K(ret));
} else if (matched) {
PumpNode next_node;
bool is_push = false;
next_node.level_ = node.level_ + 1;
if (next_node.level_ >= CONNECT_BY_MAX_NODE_NUM) {
ret = OB_ERR_CBY_NO_MEMORY;
......@@ -304,15 +306,16 @@ int ObConnectByOpPump::join_right_table(PumpNode& node, bool& matched)
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(deep_copy_row(*right_prior_exprs_, next_node.pump_row_))) {
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(push_back_node_to_stack(next_node))) {
} else if (OB_FAIL(push_back_node_to_stack(next_node, is_push))) {
LOG_WARN("push back node to stack failed", K(ret));
} else if (next_node.is_cycle_) {
// if node is same as an ancestor and there is no_cycle, ignore this node and continue to search.
matched = false;
if (OB_FAIL(free_pump_node(next_node))) {
LOG_WARN("free pump node failed", K(ret));
}
} else if (OB_FAIL(alloc_iter(pump_stack_.at(pump_stack_.count() - 1)))) {
LOG_WARN("alloc iterator failed", K(ret));
}
if (!is_push) {
free_pump_node(next_node);
}
}
}
......@@ -364,8 +367,8 @@ int ObConnectByOpPump::get_next_row()
LOG_WARN("fail to erase prior_exprs_result from hashset", K(ret), KPC(pop_node.prior_exprs_result_));
}
}
if (OB_SUCC(ret) && OB_FAIL(free_pump_node(pop_node))) {
LOG_WARN("free pump node failed", K(ret));
if (OB_SUCC(ret)) {
free_pump_node(pop_node);
}
}
}
......
......@@ -24,7 +24,7 @@ class ObConnectByOpPumpBase {
private:
class MallocWrapper : public common::ObMalloc {
public:
explicit MallocWrapper(const char* label) : allocator_(label), alloc_cnt_(0)
explicit MallocWrapper() : allocator_(NULL), alloc_cnt_(0)
{}
virtual ~MallocWrapper()
{
......@@ -34,28 +34,28 @@ private:
}
void* alloc(const int64_t sz)
{
void* mem = allocator_.alloc(sz);
void *mem = allocator_->alloc(sz);
alloc_cnt_ = nullptr == mem ? alloc_cnt_ : alloc_cnt_ + 1;
return mem;
}
void* alloc(const int64_t sz, const common::ObMemAttr& attr)
{
void* mem = allocator_.alloc(sz, attr);
void *mem = allocator_->alloc(sz, attr);
alloc_cnt_ = nullptr == mem ? alloc_cnt_ : alloc_cnt_ + 1;
return mem;
}
void free(void* ptr)
{
--alloc_cnt_;
allocator_.free(ptr);
allocator_->free(ptr);
}
void set_tenant_id(int64_t tenant_id)
void set_allocator(common::ObIAllocator &alloc)
{
allocator_.set_tenant_id(tenant_id);
allocator_ = &alloc;
}
private:
common::ObArenaAllocator allocator_;
common::ObIAllocator *allocator_;
int64_t alloc_cnt_;
};
......@@ -69,7 +69,7 @@ public:
right_prior_exprs_(NULL),
eval_ctx_(NULL),
// connect_by_root_row_(NULL),
allocator_(ObModIds::OB_CONNECT_BY_PUMP),
allocator_(),
is_inited_(false),
cur_level_(1),
never_meet_cycle_(true),
......@@ -204,11 +204,11 @@ public:
private:
// int alloc_prior_row_cells(uint64_t row_count);
int push_back_node_to_stack(PumpNode& node);
int push_back_node_to_stack(PumpNode& node, bool &is_push);
int calc_prior_and_check_cycle(PumpNode& node, bool set_refactored, PumpNode* left_node);
int check_cycle(const ObChunkDatumStore::StoredRow* row, bool set_refactored);
int check_child_cycle(PumpNode& node, PumpNode* left_node);
int free_pump_node(PumpNode& node);
void free_pump_node(PumpNode& node);
int alloc_iter(PumpNode& node);
int free_pump_node_stack(ObIArray<PumpNode>& stack);
void set_row_store_constructed()
......@@ -219,6 +219,10 @@ private:
{
return datum_store_constructed_;
}
void set_allocator(common::ObIAllocator &alloc)
{
allocator_.set_allocator(alloc);
}
private:
static const int64_t SYS_PATH_BUFFER_INIT_SIZE = 128;
......
......@@ -260,12 +260,7 @@ int ObConnectByOpBFSPump::init(
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(eval_ctx.exec_ctx_.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else {
uint64_t tenant_id = eval_ctx.exec_ctx_.get_my_session()->get_effective_tenant_id();
allocator_.set_tenant_id(tenant_id);
connect_by_ = &connect_by_op;
connect_by_prior_exprs_ = &connect_by.connect_by_prior_exprs_;
left_prior_exprs_ = &connect_by.left_prior_exprs_;
......
......@@ -128,6 +128,7 @@ private:
int check_cycle_path();
int free_path_stack();
int free_pump_node_stack(ObIArray<PumpNode>& stack);
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
private:
common::ObArray<PumpNode> pump_stack_;
......
......@@ -213,6 +213,7 @@ int ObConnectByPump::add_root_row(const ObNewRow* root_row, const ObNewRow& mock
UNUSED(mock_right_row);
int ret = OB_SUCCESS;
PumpNode node;
bool is_push = false;
node.level_ = 1;
for (int64_t i = 0; OB_SUCC(ret) && i < connect_by_path_count_; i++) {
if (OB_FAIL(node.sys_path_length_.push_back(0))) {
......@@ -234,10 +235,13 @@ int ObConnectByPump::add_root_row(const ObNewRow* root_row, const ObNewRow& mock
// LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(alloc_iter(node))) {
LOG_WARN("alloc iterator failed", K(ret));
} else if (OB_FAIL(push_back_node_to_stack(node))) {
} else if (OB_FAIL(push_back_node_to_stack(node, is_push))) {
LOG_WARN("fail to push back row", K(ret));
} else {
}
if (!is_push) {
free_pump_node(node);
}
UNUSED(mock_right_row);
return ret;
}
......@@ -270,37 +274,33 @@ void ObConnectByPump::free_memory()
}
}
int ObConnectByPump::free_pump_node(PumpNode& pop_node)
void ObConnectByPump::free_pump_node(PumpNode& pop_node)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(pop_node.pump_row_) || OB_ISNULL(pop_node.output_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid pop node", K(ret));
} else {
allocator_.free(const_cast<ObNewRow*>(pop_node.pump_row_));
if (NULL != pop_node.right_row_) {
allocator_.free(const_cast<ObNewRow*>(pop_node.right_row_));
}
allocator_.free(const_cast<ObNewRow*>(pop_node.output_row_));
if (OB_ISNULL(pop_node.prior_exprs_result_)) {
} else {
allocator_.free(const_cast<ObNewRow*>(pop_node.prior_exprs_result_));
}
if (OB_ISNULL(pop_node.first_child_)) {
} else {
allocator_.free(const_cast<ObNewRow*>(pop_node.first_child_));
}
if (OB_NOT_NULL(pop_node.row_fetcher_.iterator_)) {
allocator_.free(pop_node.row_fetcher_.iterator_);
}
pop_node.pump_row_ = NULL;
pop_node.right_row_ = NULL;
pop_node.output_row_ = NULL;
pop_node.prior_exprs_result_ = NULL;
pop_node.first_child_ = NULL;
pop_node.row_fetcher_.iterator_ = NULL;
if (OB_NOT_NULL(pop_node.pump_row_)) {
allocator_.free(const_cast<ObNewRow *>(pop_node.pump_row_));
}
return ret;
if (OB_NOT_NULL(pop_node.output_row_)) {
allocator_.free(const_cast<ObNewRow *>(pop_node.output_row_));
}
if (OB_NOT_NULL(pop_node.prior_exprs_result_)) {
allocator_.free(const_cast<ObNewRow *>(pop_node.prior_exprs_result_));
}
if (OB_NOT_NULL(pop_node.right_row_)) {
allocator_.free(const_cast<ObNewRow *>(pop_node.right_row_));
}
if (OB_NOT_NULL(pop_node.first_child_)) {
allocator_.free(const_cast<ObNewRow *>(pop_node.first_child_));
}
if (OB_NOT_NULL(pop_node.row_fetcher_.iterator_)) {
pop_node.row_fetcher_.iterator_->~Iterator();
allocator_.free(pop_node.row_fetcher_.iterator_);
}
pop_node.pump_row_ = NULL;
pop_node.output_row_ = NULL;
pop_node.prior_exprs_result_ = NULL;
pop_node.right_row_ = NULL;
pop_node.first_child_ = NULL;
pop_node.row_fetcher_.iterator_ = NULL;
}
int ObConnectByPump::free_pump_node_stack(ObIArray<PumpNode>& stack)
......@@ -310,8 +310,8 @@ int ObConnectByPump::free_pump_node_stack(ObIArray<PumpNode>& stack)
while (OB_SUCC(ret) && false == stack.empty()) {
if (OB_FAIL(stack.pop_back(pop_node))) {
LOG_WARN("fail to pop back pump_stack", K(ret));
} else if (OB_FAIL(free_pump_node(pop_node))) {
LOG_WARN("free pump node failed", K(ret));
} else {
free_pump_node(pop_node);
}
}
return ret;
......@@ -329,13 +329,18 @@ int ObConnectByPump::get_top_pump_node(PumpNode*& node)
return ret;
}
int ObConnectByPump::push_back_node_to_stack(PumpNode& node)
int ObConnectByPump::push_back_node_to_stack(PumpNode& node, bool &is_push)
{
int ret = OB_SUCCESS;
is_push = false;
if (OB_FAIL(calc_prior_and_check_cycle(node, true /* add node to hashset */))) {
LOG_WARN("fail to calc path node", K(ret));
} else if (false == node.is_cycle_ && OB_FAIL(pump_stack_.push_back(node))) {
LOG_WARN("fail to push back row", K(ret));
} else if (false == node.is_cycle_) {
if (OB_FAIL(pump_stack_.push_back(node))) {
LOG_WARN("fail to push back row", K(ret));
} else {
is_push = true;
}
}
if (OB_FAIL(ret)) { // if fail free memory
......@@ -382,9 +387,6 @@ int ObConnectByPump::init(const ObConnectBy& connect_by, common::ObExprCtx* expr
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(expr_ctx) || OB_ISNULL(expr_ctx->my_session_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr_ctx or session is null", K(ret));
} else if (OB_UNLIKELY(2 != connect_by.get_child_num())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid child num", K(ret));
......@@ -400,8 +402,6 @@ int ObConnectByPump::init(const ObConnectBy& connect_by, common::ObExprCtx* expr
} else if (OB_FAIL(hash_filter_rows_.create(CONNECT_BY_TREE_HEIGHT))) {
LOG_WARN("create hash set failed", K(ret));
} else {
uint64_t tenant_id = expr_ctx->my_session_->get_effective_tenant_id();
allocator_.set_tenant_id(tenant_id);
pump_row_desc_ = connect_by.get_pump_row_desc();
pseudo_column_row_desc_ = connect_by.get_pseudo_column_row_desc();
connect_by_prior_exprs_ = connect_by.get_connect_by_prior_exprs();
......@@ -452,6 +452,7 @@ int ObConnectByPump::join_right_table(PumpNode& node, bool& matched, ObPhyOperat
LOG_WARN("calc other conds failed", K(ret));
} else if (matched) {
PumpNode next_node;
bool is_push = false;
next_node.level_ = node.level_ + 1;
if (next_node.level_ >= CONNECT_BY_MAX_NODE_NUM) {
ret = OB_ERR_CBY_NO_MEMORY;
......@@ -477,15 +478,15 @@ int ObConnectByPump::join_right_table(PumpNode& node, bool& matched, ObPhyOperat
LOG_WARN("deep copy row failed", K(ret));
} else if (OB_FAIL(alloc_iter(next_node))) {
LOG_WARN("alloc iterator failed", K(ret));
} else if (OB_FAIL(push_back_node_to_stack(next_node))) {
} else if (OB_FAIL(push_back_node_to_stack(next_node, is_push))) {
LOG_WARN("push back node to stack failed", K(ret));
} else if (next_node.is_cycle_) {
// nocycle mode, if the matched child node is the same as an ancestor node, then abandon
// this child node and continue searching
matched = false;
if (OB_FAIL(free_pump_node(next_node))) {
LOG_WARN("free pump node failed", K(ret));
}
}
if (!is_push) {
free_pump_node(next_node);
}
}
}
......@@ -538,8 +539,8 @@ int ObConnectByPump::get_next_row(ObPhyOperator::ObPhyOperatorCtx* phy_ctx)
LOG_WARN("fail to erase prior_exprs_result from hashset", K(ret), KPC(pop_node.prior_exprs_result_));
}
}
if (OB_SUCC(ret) && OB_FAIL(free_pump_node(pop_node))) {
LOG_WARN("free pump node failed", K(ret));
if (OB_SUCC(ret)) {
free_pump_node(pop_node);
}
}
}
......
......@@ -38,7 +38,7 @@ class ObConnectByPumpBase {
private:
class MallocWrapper : public common::ObMalloc {
public:
explicit MallocWrapper(const char* label) : allocator_(label), alloc_cnt_(0)
explicit MallocWrapper() : allocator_(NULL), alloc_cnt_(0)
{}
virtual ~MallocWrapper()
{
......@@ -48,28 +48,28 @@ private:
}
void* alloc(const int64_t sz)
{
void* mem = allocator_.alloc(sz);
void *mem = allocator_->alloc(sz);
alloc_cnt_ = nullptr == mem ? alloc_cnt_ : alloc_cnt_ + 1;
return mem;
}
void* alloc(const int64_t sz, const common::ObMemAttr& attr)
{
void* mem = allocator_.alloc(sz, attr);
void *mem = allocator_->alloc(sz, attr);
alloc_cnt_ = nullptr == mem ? alloc_cnt_ : alloc_cnt_ + 1;
return mem;
}
void free(void* ptr)
{
--alloc_cnt_;
allocator_.free(ptr);
allocator_->free(ptr);
}
void set_tenant_id(int64_t tenant_id)
void set_allocator(common::ObIAllocator &alloc)
{
allocator_.set_tenant_id(tenant_id);
allocator_ = &alloc;
}
private:
common::ObArenaAllocator allocator_;
common::ObIAllocator *allocator_;
int64_t alloc_cnt_;
};
......@@ -81,7 +81,7 @@ public:
connect_by_prior_exprs_(NULL),
expr_ctx_(NULL),
connect_by_root_row_(NULL),
allocator_(ObModIds::OB_CONNECT_BY_PUMP),
allocator_(),
is_inited_(false),
cur_level_(1),
is_output_level_(false),
......@@ -378,11 +378,11 @@ public:
private:
int alloc_prior_row_cells(uint64_t row_count);
int alloc_iter(PumpNode& node);
int push_back_node_to_stack(PumpNode& node);
int push_back_node_to_stack(PumpNode& node, bool &is_push);
int calc_prior_and_check_cycle(PumpNode& node, bool set_refactored);
int check_cycle(const ObNewRow& row, bool set_refactored);
int check_child_cycle(const ObNewRow& right_row, PumpNode& node);
int free_pump_node(PumpNode& node);
void free_pump_node(PumpNode& node);
int free_pump_node_stack(ObIArray<PumpNode>& stack);
void set_row_store_constructed()
{
......@@ -392,6 +392,10 @@ private:
{
return row_store_constructed_;
}
void set_allocator(common::ObIAllocator &alloc)
{
allocator_.set_allocator(alloc);
}
private:
static const int64_t SYS_PATH_BUFFER_INIT_SIZE = 128;
......
......@@ -278,9 +278,6 @@ int ObConnectByPumpBFS::init(const ObConnectByWithIndex& connect_by, common::ObE
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_ISNULL(expr_ctx) || OB_ISNULL(expr_ctx->my_session_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr_ctx or session is null", K(ret));
} else if (OB_UNLIKELY(2 != connect_by.get_child_num())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid child num", K(ret));
......@@ -292,8 +289,6 @@ int ObConnectByPumpBFS::init(const ObConnectByWithIndex& connect_by, common::ObE
} else if (OB_FAIL(alloc_shallow_row_projector(*left_op))) {
LOG_WARN("fail to alloc shallow row projector", K(ret));
} else {
uint64_t tenant_id = expr_ctx->my_session_->get_effective_tenant_id();
allocator_.set_tenant_id(tenant_id);
pump_row_desc_ = connect_by.get_pump_row_desc();
pseudo_column_row_desc_ = connect_by.get_pseudo_column_row_desc();
connect_by_prior_exprs_ = connect_by.get_connect_by_prior_exprs();
......
......@@ -114,6 +114,7 @@ private:
int check_cycle_path(const ObNewRow& row);
int free_path_stack();
int free_pump_node_stack(ObIArray<PumpNode>& stack);
void set_allocator(common::ObIAllocator &alloc) { allocator_.set_allocator(alloc); }
private:
static const int64_t CONNECT_BY_MAX_NODE_NUM = (2L << 30) / sizeof(PumpNode);
......
......@@ -330,15 +330,9 @@ int ObConnectBy::inner_open(ObExecContext& exec_ctx) const
LOG_WARN("failed to get nested loop connect by ctx", K(ret));
} else if (OB_FAIL(wrap_expr_ctx(exec_ctx, join_ctx->expr_ctx_))) {
LOG_WARN("fail to wrap expr ctx", K(ret));
} else if (OB_FAIL(join_ctx->init(*this, &join_ctx->expr_ctx_))) {
LOG_WARN("fail to init Connect by Ctx", K(ret));
} else if (OB_ISNULL(exec_ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else if (hash_key_exprs_.count() != hash_probe_exprs_.count() ||
hash_key_exprs_.count() != equal_cond_infos_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("hash key exprs count != hash probe exprs count", K(ret));
} else {
tenant_id = exec_ctx.get_my_session()->get_effective_tenant_id();
param.set_mem_attr(lib::ObMemAttr(tenant_id, ObModIds::OB_CONNECT_BY_PUMP, ObCtxIds::WORK_AREA));
......@@ -361,9 +355,21 @@ int ObConnectBy::inner_open(ObExecContext& exec_ctx) const
LOG_WARN("init chunk row store failed", K(ret));
} else {
join_ctx->connect_by_pump_.row_store_.set_allocator(join_ctx->mem_context_->get_malloc_allocator());
join_ctx->connect_by_pump_.row_store_.set_callback(&join_ctx->sql_mem_processor_);
join_ctx->connect_by_pump_.set_allocator(join_ctx->mem_context_->get_malloc_allocator());
join_ctx->expr_ctx_.phy_plan_ctx_ = exec_ctx.get_physical_plan_ctx();
join_ctx->expr_ctx_.calc_buf_ = &join_ctx->get_calc_buf();
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(join_ctx->init(*this, &join_ctx->expr_ctx_))) {
LOG_WARN("fail to init Connect by Ctx", K(ret));
} else if (hash_key_exprs_.count() != hash_probe_exprs_.count() ||
hash_key_exprs_.count() != equal_cond_infos_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("hash key exprs count != hash probe exprs count", K(ret));
}
return ret;
}
......@@ -694,6 +700,10 @@ int ObConnectBy::calc_pseudo_flags(ObConnectByCtx& join_ctx, ObConnectByPump::Pu
}
finished = (false == output_cycle || node.is_cycle_) && (false == output_leaf || false == node.is_leaf_);
}
if (OB_NOT_NULL(next_node.prior_exprs_result_)) {
join_ctx.connect_by_pump_.allocator_.free(
const_cast<ObNewRow *>(next_node.prior_exprs_result_));
}
}
}
if (OB_ITER_END == ret) {
......
......@@ -78,6 +78,8 @@ int ObConnectByWithIndex::inner_open(ObExecContext& exec_ctx) const
{
int ret = OB_SUCCESS;
ObConnectByWithIndexCtx* join_ctx = NULL;
lib::ContextParam param;
int64_t tenant_id = OB_INVALID_ID;
if (OB_ISNULL(my_phy_plan_)) {
ret = OB_BAD_NULL_ERROR;
LOG_WARN("my phy plan is null", K(ret));
......@@ -88,12 +90,32 @@ int ObConnectByWithIndex::inner_open(ObExecContext& exec_ctx) const
LOG_WARN("failed to get nested loop connect by ctx", K(ret));
} else if (OB_FAIL(wrap_expr_ctx(exec_ctx, join_ctx->expr_ctx_))) {
LOG_WARN("fail to wrap expr ctx", K(ret));
} else if (OB_FAIL(join_ctx->init(*this, &join_ctx->expr_ctx_))) {
LOG_WARN("fail to init Connect by Ctx", K(ret));
} else if (OB_ISNULL(exec_ctx.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else {
tenant_id = exec_ctx.get_my_session()->get_effective_tenant_id();
param.set_mem_attr(tenant_id, ObModIds::OB_SQL_SORT_ROW, ObCtxIds::WORK_AREA)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(join_ctx->mem_context_, param))) {
LOG_WARN("create entity failed", K(ret));
} else if (OB_ISNULL(join_ctx->mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
} else {
join_ctx->connect_by_pump_.set_allocator(join_ctx->mem_context_->get_malloc_allocator());
join_ctx->expr_ctx_.phy_plan_ctx_ = exec_ctx.get_physical_plan_ctx();
join_ctx->expr_ctx_.calc_buf_ = &join_ctx->get_calc_buf();
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(join_ctx->init(*this, &join_ctx->expr_ctx_))) {
LOG_WARN("fail to init Connect by Ctx", K(ret));
}
return ret;
}
......
......@@ -30,7 +30,12 @@ public:
public:
ObConnectByWithIndexCtx(ObExecContext& ctx)
: ObConnectByBaseCtx(ctx), output_row_(NULL), connect_by_pump_(), is_match_(false), is_cycle_(false)
: ObConnectByBaseCtx(ctx),
output_row_(NULL),
connect_by_pump_(),
is_match_(false),
is_cycle_(false),
mem_context_(NULL)
{}
virtual ~ObConnectByWithIndexCtx()
{}
......@@ -61,6 +66,7 @@ public:
ObConnectByPumpBFS connect_by_pump_;
bool is_match_; // whether there is a child, for calc connect_by_isleaf
bool is_cycle_; // whether part of a cycle, for calc connect_by_iscycle
lib::MemoryContext mem_context_;
};
public:
......
......@@ -167,7 +167,8 @@ ObNLConnectByOp::ObNLConnectByOp(ObExecContext& exec_ctx, const ObOpSpec& spec,
connect_by_pump_(),
state_(CNTB_STATE_READ_LEFT),
is_inited_(false),
output_generated_(false)
output_generated_(false),
mem_context_(NULL)
{
state_operation_func_[CNTB_STATE_JOIN_END] = &ObNLConnectByOp::join_end_operate;
state_function_func_[CNTB_STATE_JOIN_END][FT_ITER_GOING] = NULL;
......@@ -241,6 +242,19 @@ int ObNLConnectByOp::inner_open()
MY_SPEC.cmp_funcs_.count() != MY_SPEC.left_prior_exprs_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status: cmp func is not match with prior exprs", K(ret));
} else {
lib::ContextParam param;
int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
param.set_mem_attr(tenant_id, ObModIds::OB_CONNECT_BY_PUMP, ObCtxIds::WORK_AREA)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
LOG_WARN("create entity failed", K(ret));
} else if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
} else {
connect_by_pump_.set_allocator(mem_context_->get_malloc_allocator());
}
}
return ret;
}
......@@ -443,6 +457,10 @@ int ObNLConnectByOp::calc_pseudo_flags(ObConnectByOpPump::PumpNode& node)
}
finished = (false == output_cycle || node.is_cycle_) && (false == output_leaf || false == node.is_leaf_);
}
if (OB_NOT_NULL(next_node.prior_exprs_result_)) {
connect_by_pump_.allocator_.free(
const_cast<ObChunkDatumStore::StoredRow *>(next_node.prior_exprs_result_));
}
}
}
if (OB_ITER_END == ret) {
......
......@@ -194,6 +194,7 @@ private:
// common::ObNewRow mock_right_row_;//used for root row output
bool is_inited_;
bool output_generated_;
lib::MemoryContext mem_context_;
};
} // namespace sql
......
......@@ -31,7 +31,8 @@ ObNLConnectByWithIndexOp::ObNLConnectByWithIndexOp(ObExecContext& exec_ctx, cons
is_match_(false),
is_cycle_(false),
is_inited_(false),
need_return_(false)
need_return_(false),
mem_context_(NULL)
{
state_operation_func_[CNTB_STATE_JOIN_END] = &ObNLConnectByWithIndexOp::join_end_operate;
state_function_func_[CNTB_STATE_JOIN_END][FT_ITER_GOING] = NULL;
......@@ -110,6 +111,19 @@ int ObNLConnectByWithIndexOp::inner_open()
MY_SPEC.cmp_funcs_.count() != MY_SPEC.left_prior_exprs_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected status: cmp func is not match with prior exprs", K(ret));
} else {
lib::ContextParam param;
int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
param.set_mem_attr(tenant_id, ObModIds::OB_CONNECT_BY_PUMP, ObCtxIds::WORK_AREA)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
LOG_WARN("create entity failed", K(ret));
} else if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
} else {
connect_by_pump_.set_allocator(mem_context_->get_malloc_allocator());
}
}
return ret;
}
......
......@@ -115,6 +115,7 @@ private:
bool is_cycle_; // whether part of a cycle, for calc connect_by_iscycle
bool is_inited_;
bool need_return_;
lib::MemoryContext mem_context_;
};
} // namespace sql
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册