diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.cpp b/src/sql/engine/aggregate/ob_aggregate_processor.cpp index 2bcee9ef8fde366458e6488a33516990d5e413a8..014ed10a8e692c33c9d9d2902d8852414f7e8fec 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.cpp +++ b/src/sql/engine/aggregate/ob_aggregate_processor.cpp @@ -394,7 +394,7 @@ int64_t ObAggregateProcessor::ExtraResult::to_string(char* buf, const int64_t bu return pos; } -int ObAggrInfo::eval_aggr(ObChunkDatumStore::ShadowStoredRow<>& curr_row_results, ObEvalCtx& ctx) const +int ObAggrInfo::eval_aggr(ObChunkDatumStore::ShadowStoredRow& curr_row_results, ObEvalCtx& ctx) const { int ret = OB_SUCCESS; if (param_exprs_.empty() && T_FUN_COUNT == get_expr_type()) { @@ -1908,7 +1908,7 @@ int ObAggregateProcessor::collect_aggr_result(AggrCell& aggr_cell, const ObExpr* const ObChunkDatumStore::StoredRow* storted_row = NULL; int64_t rank_num = 0; bool need_check_order_equal = T_FUN_GROUP_DENSE_RANK == aggr_fun; - ObChunkDatumStore::LastStoredRow<> prev_row(aggr_alloc_); + ObChunkDatumStore::LastStoredRow prev_row(aggr_alloc_); int64_t total_sort_row_cnt = extra->get_row_count(); bool is_first = true; while (OB_SUCC(ret) && OB_SUCC(extra->get_next_row(storted_row))) { @@ -2044,7 +2044,7 @@ int ObAggregateProcessor::collect_aggr_result(AggrCell& aggr_cell, const ObExpr* const int64_t total_row_count = extra->get_row_count(); char buf_alloc[number::ObNumber::MAX_CALC_BYTE_LEN]; ObDataBuffer allocator(buf_alloc, number::ObNumber::MAX_CALC_BYTE_LEN); - ObChunkDatumStore::LastStoredRow<> prev_row(aggr_alloc_); + ObChunkDatumStore::LastStoredRow prev_row(aggr_alloc_); number::ObNumber factor; bool need_linear_inter = false; int64_t not_null_start_loc = 0; @@ -2163,7 +2163,7 @@ int ObAggregateProcessor::collect_aggr_result(AggrCell& aggr_cell, const ObExpr* LOG_WARN("finish_add_row failed", KPC(extra), K(ret)); } else { const ObChunkDatumStore::StoredRow* storted_row = NULL; - ObChunkDatumStore::LastStoredRow<> first_row(aggr_alloc_); + ObChunkDatumStore::LastStoredRow first_row(aggr_alloc_); bool is_first = true; while (OB_SUCC(ret) && OB_SUCC(extra->get_next_row(storted_row))) { bool is_equal = false; @@ -2925,7 +2925,7 @@ int ObAggregateProcessor::compare_calc(const ObDatum& left_value, const ObDatum& return ret; } -int ObAggregateProcessor::check_rows_equal(const ObChunkDatumStore::LastStoredRow<>& prev_row, +int ObAggregateProcessor::check_rows_equal(const ObChunkDatumStore::LastStoredRow& prev_row, const ObChunkDatumStore::StoredRow& cur_row, const ObAggrInfo& aggr_info, bool& is_equal) { int ret = OB_SUCCESS; @@ -3190,7 +3190,7 @@ int ObAggregateProcessor::get_wm_concat_result( LOG_WARN("finish_add_row failed", KPC(extra), K(ret)); } else { ObString sep_str = ObCharsetUtils::get_const_str(aggr_info.expr_->datum_meta_.cs_type_, ','); - ObChunkDatumStore::LastStoredRow<> first_row(aggr_alloc_); + ObChunkDatumStore::LastStoredRow first_row(aggr_alloc_); const ObChunkDatumStore::StoredRow* storted_row = NULL; bool is_first = true; bool need_continue = true; diff --git a/src/sql/engine/aggregate/ob_aggregate_processor.h b/src/sql/engine/aggregate/ob_aggregate_processor.h index be3db4b7415c5e844de21924479c25ee00fc82b6..ef6fb30c535034709b5dda7811eb2a7d1caf2a8d 100644 --- a/src/sql/engine/aggregate/ob_aggregate_processor.h +++ b/src/sql/engine/aggregate/ob_aggregate_processor.h @@ -62,7 +62,7 @@ public: ? 1 : ((T_FUN_COUNT == get_expr_type() && param_exprs_.empty()) ? 0 : param_exprs_.count()); } - int eval_aggr(ObChunkDatumStore::ShadowStoredRow<>& curr_row_results, ObEvalCtx& ctx) const; + int eval_aggr(ObChunkDatumStore::ShadowStoredRow& curr_row_results, ObEvalCtx& ctx) const; inline void set_implicit_first_aggr() { is_implicit_first_aggr_ = true; @@ -326,7 +326,7 @@ public: } public: - ObChunkDatumStore::ShadowStoredRow<> curr_row_results_; + ObChunkDatumStore::ShadowStoredRow curr_row_results_; private: // for avg/count @@ -466,7 +466,7 @@ private: int rollup_distinct(AggrCell& aggr_cell, AggrCell& rollup_cell); int compare_calc(const ObDatum& left_value, const ObDatum& right_value, const ObAggrInfo& aggr_info, int64_t index, int& compare_result, bool& is_asc); - int check_rows_equal(const ObChunkDatumStore::LastStoredRow<>& prev_row, const ObChunkDatumStore::StoredRow& cur_row, + int check_rows_equal(const ObChunkDatumStore::LastStoredRow& prev_row, const ObChunkDatumStore::StoredRow& cur_row, const ObAggrInfo& aggr_info, bool& is_equal); int get_wm_concat_result( const ObAggrInfo& aggr_info, GroupConcatExtraResult*& extra, bool is_keep_group_concat, ObDatum& concat_result); diff --git a/src/sql/engine/aggregate/ob_merge_distinct_op.h b/src/sql/engine/aggregate/ob_merge_distinct_op.h index c9901aa1f9fe18f5c959aa745c501ea77fa89f91..98bc44d0892f16d7254303ef95b9011dfc3dbd6a 100644 --- a/src/sql/engine/aggregate/ob_merge_distinct_op.h +++ b/src/sql/engine/aggregate/ob_merge_distinct_op.h @@ -50,7 +50,7 @@ public: }; private: - typedef ObChunkDatumStore::LastStoredRow LastStoreRow; + typedef ObChunkDatumStore::LastStoredRow LastStoreRow; bool first_got_row_; common::ObArenaAllocator alloc_; LastStoreRow last_row_; @@ -60,4 +60,4 @@ private: } // end namespace sql } // end namespace oceanbase -#endif /* OCEANBASE_SRC_SQL_ENGINE_AGGREGATE_OB_MERGE_DISTINCT_OP_H_ */ \ No newline at end of file +#endif /* OCEANBASE_SRC_SQL_ENGINE_AGGREGATE_OB_MERGE_DISTINCT_OP_H_ */ diff --git a/src/sql/engine/aggregate/ob_merge_groupby_op.h b/src/sql/engine/aggregate/ob_merge_groupby_op.h index fdeb2fc6209f01973b3915652c8bb4a06b251e81..11da7c960e34dbc14be5c4f1afdca31af22865f2 100644 --- a/src/sql/engine/aggregate/ob_merge_groupby_op.h +++ b/src/sql/engine/aggregate/ob_merge_groupby_op.h @@ -97,7 +97,7 @@ private: // added to support groupby with rollup int64_t cur_output_group_id_; int64_t first_output_group_id_; - ObChunkDatumStore::LastStoredRow<> last_child_output_; + ObChunkDatumStore::LastStoredRow last_child_output_; DatumFixedArray curr_groupby_datums_; int64_t dir_id_; }; diff --git a/src/sql/engine/basic/ob_chunk_datum_store.cpp b/src/sql/engine/basic/ob_chunk_datum_store.cpp index 17287ab7eb0897f140cd8039bb30e7f0d583252a..71d799479c81b2f3ec18f2d6f1732e549bcf4db5 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.cpp +++ b/src/sql/engine/basic/ob_chunk_datum_store.cpp @@ -57,113 +57,6 @@ void point2pointer(T*& dst_pointer, B* dst_base, T* src_pointer, const B* src_ba dst_pointer = reinterpret_cast(reinterpret_cast(dst_base) + reinterpret_cast(src_pointer) - reinterpret_cast(src_base)); } - -} // namespace chunk_datum_store - -int ObChunkDatumStore::StoredRow::copy_datums(const common::ObIArray& exprs, ObEvalCtx& ctx, char* buf, - const int64_t size, const int64_t row_size, const uint32_t row_extend_size) -{ - int ret = OB_SUCCESS; - if (payload_ != buf || size < 0) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(payload_), KP(buf), K(size)); - } else { - cnt_ = static_cast(exprs.count()); - int64_t pos = sizeof(ObDatum) * cnt_ + row_extend_size; - row_size_ = static_cast(row_size); - for (int64_t i = 0; OB_SUCC(ret) && i < cnt_; ++i) { - ObDatum& in_datum = static_cast(exprs.at(i)->locate_expr_datum(ctx)); - ObDatum* datum = new (&cells()[i]) ObDatum(); - if (OB_FAIL(datum->deep_copy(in_datum, buf, size, pos))) { - LOG_WARN("failed to copy datum", K(ret), K(i), K(pos), K(size), K(row_size), K(in_datum), K(*datum)); - } else { - LOG_DEBUG("succ to copy_datums", K(cnt_), K(i), K(size), K(row_size), K(in_datum), K(*datum)); - } - } - } - return ret; -} - -int ObChunkDatumStore::StoredRow::copy_datums(const common::ObIArray& exprs, ObEvalCtx& ctx, int64_t& row_size, - char* buf, const int64_t max_buf_size, const uint32_t row_extend_size) -{ - int ret = OB_SUCCESS; - if (max_buf_size < 0) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(payload_), KP(buf), K(max_buf_size)); - } else { - cnt_ = static_cast(exprs.count()); - int64_t pos = sizeof(ObDatum) * cnt_ + row_extend_size + sizeof(StoredRow); - ObDatum* datums = cells(); - if (pos > max_buf_size) { - ret = OB_BUF_NOT_ENOUGH; - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < cnt_; ++i) { - ObDatum* in_datum = nullptr; - ObDatum* datum = (ObDatum*)datums; - if (OB_FAIL(exprs.at(i)->eval(ctx, in_datum))) { - LOG_WARN("failed to eval datum", K(ret)); - } else if (OB_FAIL(datum->deep_copy(*in_datum, buf, max_buf_size, pos))) { - if (OB_BUF_NOT_ENOUGH != ret) { - LOG_WARN("failed to copy datum", K(ret), K(i), K(pos), K(max_buf_size), KP(in_datum), K(*datum)); - } - } else { - LOG_DEBUG("succ to copy_datums", K(cnt_), K(i), K(max_buf_size), KP(in_datum), K(*datum)); - } - ++datums; - } - row_size_ = static_cast(pos); - row_size = row_size_; - } - } - return ret; -} - -int ObChunkDatumStore::StoredRow::copy_datums(common::ObDatum** datums, const int64_t cnt, char* buf, - const int64_t size, const int64_t row_size, const uint32_t row_extend_size) -{ - int ret = OB_SUCCESS; - if (payload_ != buf || size < 0 || nullptr == datums) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(payload_), KP(buf), K(size), K(datums)); - } else { - cnt_ = static_cast(cnt); - int64_t pos = sizeof(ObDatum) * cnt_ + row_extend_size; - row_size_ = static_cast(row_size); - for (int64_t i = 0; OB_SUCC(ret) && i < cnt_; ++i) { - if (nullptr == datums[i]) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(payload_), KP(buf), K(size), K(datums)); - } else { - ObDatum* datum = new (&cells()[i]) ObDatum(); - if (OB_FAIL(datum->deep_copy(*datums[i], buf, size, pos))) { - LOG_WARN( - "failed to copy datum", K(ret), K(i), K(pos), K(size), K(row_size), K(*datums[i]), K(datums[i]->len_)); - } - } - } - } - return ret; -} - -int ObChunkDatumStore::StoredRow::copy_datums(common::ObDatum* datums, const int64_t cnt, char* buf, const int64_t size, - const int64_t row_size, const uint32_t row_extend_size) -{ - int ret = OB_SUCCESS; - if (payload_ != buf || size < 0 || nullptr == datums) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(payload_), KP(buf), K(size), K(datums)); - } else { - cnt_ = static_cast(cnt); - row_size_ = static_cast(row_size); - MEMCPY(buf, static_cast(datums), size); - int64_t pos = sizeof(ObDatum) * cnt_ + row_extend_size; - for (int64_t i = 0; i < cnt_; ++i) { - cells()[i].ptr_ = buf + pos; - pos += cells()[i].len_; - } - } - return ret; } int ObChunkDatumStore::StoredRow::to_expr(const common::ObIArray& exprs, ObEvalCtx& ctx) const @@ -230,6 +123,54 @@ void ObChunkDatumStore::StoredRow::swizzling(char* base /*= NULL*/) } } +int ObChunkDatumStore::StoredRow::build(StoredRow *&sr, const ObExprPtrIArray &exprs, ObEvalCtx &ctx, char *buf, + const int64_t buf_len, const uint32_t extra_size /* = 0 */) +{ + int ret = OB_SUCCESS; + sr = reinterpret_cast(buf); + int64_t pos = sizeof(*sr) + sizeof(ObDatum) * exprs.count() + extra_size; + if (pos > buf_len) { + ret = OB_BUF_NOT_ENOUGH; + } else { + sr->cnt_ = exprs.count(); + ObDatum *datums = sr->cells(); + for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); i++) { + ObExpr *expr = exprs.at(i); + ObDatum *in_datum = NULL; + if (OB_UNLIKELY(NULL == expr)) { + // Set datum to NULL for NULL expr + datums[i].set_null(); + } else if (OB_FAIL(expr->eval(ctx, in_datum))) { + LOG_WARN("expression evaluate failed", K(ret)); + } else { + ret = datums[i].deep_copy(*in_datum, buf, buf_len, pos); + } + } + if (OB_SUCC(ret)) { + sr->row_size_ = static_cast(pos); + } + } + + return ret; +} + +int ObChunkDatumStore::StoredRow::build(StoredRow *&sr, const ObExprPtrIArray &exprs, ObEvalCtx &ctx, + common::ObIAllocator &alloc, const uint32_t extra_size /* = 0 */) +{ + int ret = OB_SUCCESS; + int64_t size = 0; + char *buf = NULL; + if (OB_FAIL(Block::row_store_size(exprs, ctx, size, extra_size))) { + LOG_WARN("get row store size failed", K(ret)); + } else if (NULL == (buf = static_cast(alloc.alloc(size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate memory failed", K(ret), K(size)); + } else if (OB_FAIL(build(sr, exprs, ctx, buf, size, extra_size))) { + LOG_WARN("build stored row failed", K(ret)); + } + return ret; +} + int ObChunkDatumStore::Block::add_row(const common::ObIArray& exprs, ObEvalCtx& ctx, const int64_t row_size, uint32_t row_extend_size, StoredRow** stored_row) { @@ -242,18 +183,15 @@ int ObChunkDatumStore::Block::add_row(const common::ObIArray& exprs, Ob ret = OB_BUF_NOT_ENOUGH; LOG_WARN("buffer not enough", K(row_size), "remain", buf->remain()); } else { - StoredRow* sr = (StoredRow*)buf->head(); - if (OB_FAIL(sr->copy_datums( - exprs, ctx, buf->head() + ROW_HEAD_SIZE, row_size - ROW_HEAD_SIZE, row_size, row_extend_size))) { - LOG_WARN("copy row failed", K(ret), K(row_size)); + StoredRow *sr = NULL; + if (OB_FAIL(StoredRow::build(sr, exprs, ctx, buf->head(), row_size, row_extend_size))) { + LOG_WARN("build stored row failed", K(ret)); + } else if (OB_FAIL(buf->advance(row_size))) { + LOG_WARN("fill buffer head failed", K(ret), K(buf), K(row_size)); } else { - if (OB_FAIL(buf->advance(row_size))) { - LOG_WARN("fill buffer head failed", K(ret), K(buf), K(row_size)); - } else { - rows_++; - if (NULL != stored_row) { - *stored_row = sr; - } + rows_++; + if (NULL != stored_row) { + *stored_row = sr; } } } @@ -268,25 +206,17 @@ int ObChunkDatumStore::Block::append_row(const common::ObIArray& exprs, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(buf)); } else { - int64_t max_size = buf->remain(); - if (ROW_HEAD_SIZE > max_size) { - ret = OB_BUF_NOT_ENOUGH; + StoredRow *sr = NULL; + if (OB_FAIL(StoredRow::build(sr, exprs, *ctx, buf->head(), buf->remain(), row_extend_size))) { + if (OB_BUF_NOT_ENOUGH != ret) { + LOG_WARN("build stored row failed", K(ret)); + } + } else if (OB_FAIL(buf->advance(sr->row_size_))) { + LOG_WARN("buffer advance failed", K(ret)); } else { - StoredRow* sr = (StoredRow*)buf->head(); - int64_t row_size = 0; - if (OB_FAIL(sr->copy_datums(exprs, *ctx, row_size, buf->head(), max_size, row_extend_size))) { - if (OB_BUF_NOT_ENOUGH != ret) { - LOG_WARN("copy row failed", K(ret), K(max_size)); - } - } else { - if (OB_FAIL(buf->advance(row_size))) { - LOG_WARN("fill buffer head failed", K(ret), K(buf)); - } else { - ++rows_; - if (NULL != stored_row) { - *stored_row = sr; - } - } + ++rows_; + if (NULL != stored_row) { + *stored_row = sr; } } } diff --git a/src/sql/engine/basic/ob_chunk_datum_store.h b/src/sql/engine/basic/ob_chunk_datum_store.h index d54a135621f200bd7a58f3b8f780f25a27386235..764b9c94e1ef3ac97f5ce78dba6149ab165b5cc1 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.h +++ b/src/sql/engine/basic/ob_chunk_datum_store.h @@ -34,36 +34,6 @@ class ObChunkDatumStore { OB_UNIS_VERSION_V(1); public: - static inline int row_copy_size(const common::ObIArray& exprs, ObEvalCtx& ctx, int64_t& size) - { - int ret = OB_SUCCESS; - common::ObDatum* datum = nullptr; - size = DATUM_SIZE * exprs.count(); - for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); ++i) { - if (OB_FAIL(exprs.at(i)->eval(ctx, datum))) { - SQL_ENG_LOG(WARN, "failed to eval expr datum", KPC(exprs.at(i)), K(ret)); - } else { - size += datum->len_; - } - } - return ret; - } - static inline int64_t row_copy_size(common::ObDatum** datums, const int64_t cnt) - { - int64_t size = DATUM_SIZE * cnt; - for (int64_t i = 0; i < cnt; ++i) { - size += datums[i]->len_; - } - return size; - } - static inline int64_t row_copy_size(common::ObDatum* datums, const int64_t cnt) - { - int64_t size = DATUM_SIZE * cnt; - for (int64_t i = 0; i < cnt; ++i) { - size += datums[i].len_; - } - return size; - } /* * StoredRow memory layout * N Datum + extend_size(can be 0) + real data @@ -75,16 +45,22 @@ public: struct StoredRow { StoredRow() : cnt_(0), row_size_(0) {} - int copy_datums(const common::ObIArray& exprs, ObEvalCtx& ctx, char* buf, const int64_t size, - const int64_t row_size, const uint32_t row_extend_size); - int copy_datums(const common::ObIArray& exprs, ObEvalCtx& ctx, int64_t& row_size, char* buf, - const int64_t max_buf_size, const uint32_t row_extend_size); - int copy_datums(common::ObDatum** datums, const int64_t cnt, char* buf, const int64_t size, const int64_t row_size, - const uint32_t row_extend_size); - int copy_datums(common::ObDatum* datums, const int64_t cnt, char* buf, const int64_t size, const int64_t row_size, - const uint32_t row_extend_size); int to_expr(const common::ObIArray& exprs, ObEvalCtx& ctx) const; int to_expr(const common::ObIArray& exprs, ObEvalCtx& ctx, int64_t count) const; + // Build a stored row by exprs. + // @param [out] sr, result stored row + // @param epxrs, + // @param ctx + // @param buf + // @param buf_len, use Block::row_store_size() to detect the needed buffer size. + // @param extra_size, extra store size + // @param unswizzling + // @return OB_SUCCESS or OB_BUF_NOT_ENOUGH if buf not enough + static int build(StoredRow *&sr, const ObExprPtrIArray &exprs, ObEvalCtx &ctx, char *buf, const int64_t buf_len, + const uint32_t extra_size = 0); + static int build(StoredRow *&sr, const ObExprPtrIArray &exprs, ObEvalCtx &ctx, common::ObIAllocator &alloc, + const uint32_t extra_size = 0); + inline common::ObDatum* cells() { @@ -117,7 +93,6 @@ public: * 2) Provide reuse mode, memory can be reused * 3) Provide conversion from StoredRow to ObIArray */ - template class LastStoredRow { public: LastStoredRow(ObIAllocator& alloc) @@ -138,13 +113,13 @@ public: char* buf = NULL; int64_t row_size = 0; int64_t buffer_len = 0; - T* new_row = NULL; + StoredRow* new_row = NULL; if (0 == exprs.count()) { // no column. scenario like distinct 1 } else if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, ctx, row_size))) { SQL_ENG_LOG(WARN, "failed to calc copy size", K(ret)); } else { - int64_t head_size = sizeof(T); + int64_t head_size = sizeof(StoredRow); reuse = OB_ISNULL(store_row_) ? false : reuse && (max_size_ >= row_size + head_size + extra_size); if (reuse && OB_NOT_NULL(store_row_)) { // switch buffer for write @@ -163,24 +138,21 @@ public: } else if (OB_ISNULL(buf2 = reinterpret_cast(alloc_.alloc(buffer_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret)); - } else if (OB_ISNULL(pre_alloc_row1_ = new (buf1) T())) { + } else if (OB_ISNULL(pre_alloc_row1_ = new (buf1) StoredRow())) { ret = OB_ALLOCATE_MEMORY_FAILED; SQL_ENG_LOG(ERROR, "failed to new row", K(ret)); - } else if (OB_ISNULL(pre_alloc_row2_ = new (buf2) T())) { + } else if (OB_ISNULL(pre_alloc_row2_ = new (buf2) StoredRow())) { ret = OB_ALLOCATE_MEMORY_FAILED; SQL_ENG_LOG(ERROR, "failed to new row", K(ret)); } else { buf = buf1; - new_row = pre_alloc_row1_; } } if (OB_SUCC(ret)) { - int64_t pos = head_size; - if (OB_FAIL(new_row->copy_datums(exprs, ctx, buf + pos, buffer_len - head_size, row_size, extra_size))) { - SQL_ENG_LOG(WARN, "failed to deep copy row", K(ret), K(buffer_len), K(row_size)); + if (OB_FAIL(StoredRow::build(store_row_, exprs, ctx, buf, buffer_len, extra_size))) { + SQL_ENG_LOG(WARN, "failed to build stored row", K(ret), K(buffer_len), K(row_size)); } else { max_size_ = buffer_len; - store_row_ = new_row; } } } @@ -192,9 +164,9 @@ public: bool reuse = reuse_; char* buf = NULL; int64_t buffer_len = 0; - T* new_row = NULL; + StoredRow* new_row = NULL; int64_t row_size = row.row_size_; - int64_t head_size = sizeof(T); + int64_t head_size = sizeof(StoredRow); reuse = OB_ISNULL(store_row_) ? false : reuse && (max_size_ >= row_size + head_size + extra_size); if (reuse && OB_NOT_NULL(store_row_)) { buf = reinterpret_cast(store_row_); @@ -205,20 +177,15 @@ public: if (OB_ISNULL(buf = reinterpret_cast(alloc_.alloc(buffer_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret)); - } else if (OB_ISNULL(new_row = new (buf) T())) { + } else if (OB_ISNULL(new_row = new (buf) StoredRow())) { ret = OB_ALLOCATE_MEMORY_FAILED; SQL_ENG_LOG(ERROR, "failed to new row", K(ret)); } } if (OB_SUCC(ret)) { int64_t pos = head_size; - if (OB_FAIL(new_row->copy_datums(const_cast(row.cells()), - row.cnt_, - buf + pos, - buffer_len - head_size, - row_size, - extra_size))) { - SQL_ENG_LOG(WARN, "failed to deep copy row", K(ret), K(buffer_len), K(row_size)); + if (OB_FAIL(new_row->assign(&row))) { + SQL_ENG_LOG(WARN, "stored row assign failed", K(ret)); } else { max_size_ = buffer_len; store_row_ = new_row; @@ -226,7 +193,7 @@ public: } return ret; } - void set_store_row(T* in_store_row) + void set_store_row(StoredRow* in_store_row) { store_row_ = in_store_row; } @@ -237,18 +204,17 @@ public: } TO_STRING_KV(K_(max_size), K_(reuse), KPC_(store_row)); - T* store_row_; + StoredRow* store_row_; ObIAllocator& alloc_; int64_t max_size_; bool reuse_; private: // To avoid writing memory overwrite, alloc 2 row for alternate writing - T* pre_alloc_row1_; - T* pre_alloc_row2_; + StoredRow* pre_alloc_row1_; + StoredRow* pre_alloc_row2_; }; - template class ShadowStoredRow { public: ShadowStoredRow() : alloc_(nullptr), store_row_(nullptr), saved_(false) @@ -261,7 +227,7 @@ public: int init(common::ObIAllocator& allocator, int64_t datum_cnt) { int ret = OB_SUCCESS; - int64_t buffer_len = datum_cnt * sizeof(ObDatum) + sizeof(T); + int64_t buffer_len = datum_cnt * sizeof(ObDatum) + sizeof(StoredRow); char* buf = nullptr; if (NULL != alloc_) { ret = common::OB_INIT_TWICE; @@ -271,9 +237,9 @@ public: SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret)); } else { alloc_ = &allocator; - store_row_ = new (buf) T(); + store_row_ = new (buf) StoredRow(); store_row_->cnt_ = datum_cnt; - store_row_->row_size_ = datum_cnt * sizeof(ObDatum); + store_row_->row_size_ = datum_cnt * sizeof(ObDatum) + sizeof(StoredRow); saved_ = false; } return ret; @@ -322,7 +288,7 @@ public: saved_ = false; } - T* get_store_row() const + StoredRow* get_store_row() const { return store_row_; } @@ -330,7 +296,7 @@ public: private: common::ObIAllocator* alloc_; - T* store_row_; + StoredRow* store_row_; bool saved_; }; @@ -370,18 +336,10 @@ public: } // following interface for ObDatum only,unused for now - static int64_t inline min_buf_size(common::ObDatum** datums, const int64_t cnt) - { - return BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer) + row_store_size(datums, cnt); - } static int64_t inline min_buf_size(common::ObDatum* datums, const int64_t cnt) { return BlockBuffer::HEAD_SIZE + sizeof(BlockBuffer) + row_store_size(datums, cnt); } - static int64_t inline row_store_size(common::ObDatum** datums, const int64_t cnt, uint32_t row_extend_size = 0) - { - return ROW_HEAD_SIZE + row_extend_size + ObChunkDatumStore::row_copy_size(datums, cnt); - } static int64_t inline row_store_size(common::ObDatum* datums, const int64_t cnt, uint32_t row_extend_size = 0) { return ROW_HEAD_SIZE + row_extend_size + ObChunkDatumStore::row_copy_size(datums, cnt); @@ -970,6 +928,30 @@ private: callback_->free(size); } + static inline int row_copy_size(const common::ObIArray &exprs, ObEvalCtx &ctx, int64_t &size) + { + int ret = OB_SUCCESS; + common::ObDatum *datum = nullptr; + size = DATUM_SIZE * exprs.count(); + for (int64_t i = 0; i < exprs.count() && OB_SUCC(ret); ++i) { + if (OB_FAIL(exprs.at(i)->eval(ctx, datum))) { + SQL_ENG_LOG(WARN, "failed to eval expr datum", KPC(exprs.at(i)), K(ret)); + } else { + size += datum->len_; + } + } + return ret; + } + + static inline int64_t row_copy_size(common::ObDatum *datums, const int64_t cnt) + { + int64_t size = DATUM_SIZE * cnt; + for (int64_t i = 0; i < cnt; ++i) { + size += datums[i].len_; + } + return size; + } + private: bool inited_; uint64_t tenant_id_; @@ -1011,6 +993,8 @@ private: DISALLOW_COPY_AND_ASSIGN(ObChunkDatumStore); }; +typedef ObChunkDatumStore::StoredRow ObStoredDatumRow; + inline int ObChunkDatumStore::BlockBuffer::advance(int64_t size) { int ret = common::OB_SUCCESS; @@ -1026,7 +1010,6 @@ inline int ObChunkDatumStore::BlockBuffer::advance(int64_t size) } return ret; } - } // end namespace sql } // end namespace oceanbase diff --git a/src/sql/engine/basic/ob_limit_op.h b/src/sql/engine/basic/ob_limit_op.h index 519e83811baca69dcca8e80ac442d2e18da37fc4..011126436b9c64b90c65e0a638e00638ef1e53bb 100644 --- a/src/sql/engine/basic/ob_limit_op.h +++ b/src/sql/engine/basic/ob_limit_op.h @@ -68,7 +68,7 @@ private: int64_t total_cnt_; bool is_percent_first_; - ObChunkDatumStore::LastStoredRow<> pre_sort_columns_; + ObChunkDatumStore::LastStoredRow pre_sort_columns_; }; } // end namespace sql diff --git a/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp b/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp index 699d8b9e3e4b7c55ce4265e85a7cfca693b9f17e..bd705344377da9b50d872293b1ad2e7fa42079da 100644 --- a/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp +++ b/src/sql/engine/connect_by/ob_cnnt_by_pump.cpp @@ -21,32 +21,9 @@ using namespace oceanbase::common; int ObConnectByOpPumpBase::deep_copy_row(const ObIArray& exprs, const ObChunkDatumStore::StoredRow*& dst_row) { - int ret = OB_SUCCESS; - char* buf = NULL; - int64_t row_size = 0; - int64_t buffer_len = 0; - int64_t extra_size = 0; - int64_t head_size = sizeof(ObChunkDatumStore::StoredRow); - int64_t pos = head_size; - ObChunkDatumStore::StoredRow* new_row = nullptr; - if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, *eval_ctx_, row_size))) { - SQL_ENG_LOG(WARN, "failed to calc copy size", K(ret)); - } else { - row_size += head_size; - buffer_len = row_size + extra_size; - if (OB_ISNULL(buf = reinterpret_cast(allocator_.alloc(buffer_len)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - SQL_ENG_LOG(ERROR, "alloc buf failed", K(ret)); - } else if (OB_ISNULL(new_row = new (buf) ObChunkDatumStore::StoredRow())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - SQL_ENG_LOG(ERROR, "failed to new row", K(ret)); - } else if (OB_FAIL( - new_row->copy_datums(exprs, *eval_ctx_, buf + pos, buffer_len - head_size, row_size, extra_size))) { - SQL_ENG_LOG(WARN, "failed to deep copy row", K(ret), K(buffer_len), K(row_size)); - } else { - dst_row = new_row; - } - } + ObChunkDatumStore::StoredRow* sr = NULL; + int ret = ObChunkDatumStore::StoredRow::build(sr, exprs, *eval_ctx_, allocator_); + dst_row = sr; return ret; } diff --git a/src/sql/engine/dml/ob_multi_table_insert_up_op.cpp b/src/sql/engine/dml/ob_multi_table_insert_up_op.cpp index fa432b95f8f02245d5d6ef39c9ca5f82ba456074..4c70472f5877d51d472d8e55ee5c849cab4e0ce1 100644 --- a/src/sql/engine/dml/ob_multi_table_insert_up_op.cpp +++ b/src/sql/engine/dml/ob_multi_table_insert_up_op.cpp @@ -326,31 +326,7 @@ int ObMultiTableInsertUpOp::shuffle_update_row(const ObChunkDatumStore::StoredRo int ObMultiTableInsertUpOp::convert_exprs_to_stored_row( ObIAllocator& allocator, ObEvalCtx& eval_ctx, const ObExprPtrIArray& exprs, ObChunkDatumStore::StoredRow*& new_row) { - int ret = OB_SUCCESS; - new_row = NULL; - char* buf = NULL; - int64_t row_size = 0; - if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, eval_ctx, row_size))) { - LOG_WARN("failed to calc copy size", K(ret)); - } else { - const int64_t STORE_ROW_HEADER_SIZE = sizeof(ObChunkDatumStore::StoredRow); - int64_t buffer_len = STORE_ROW_HEADER_SIZE + row_size; - if (OB_ISNULL(buf = reinterpret_cast(allocator.alloc(buffer_len)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc buf failed", K(ret)); - } else if (OB_ISNULL(new_row = new (buf) ObChunkDatumStore::StoredRow())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("failed to new row", K(ret)); - } else { - int64_t pos = STORE_ROW_HEADER_SIZE; - if (OB_FAIL(new_row->copy_datums( - exprs, eval_ctx, buf + pos, buffer_len - STORE_ROW_HEADER_SIZE, row_size, 0 /*extra_size*/))) { - LOG_WARN("failed to deep copy row", K(ret), K(buffer_len)); - } - } - } - - return ret; + return ObChunkDatumStore::StoredRow::build(new_row, exprs, eval_ctx, allocator); } } // namespace sql diff --git a/src/sql/engine/join/ob_hash_join_op.h b/src/sql/engine/join/ob_hash_join_op.h index 532ac3d2da1e64b8e85e05a15043a7d8507dc4af..6161062fdb347b70adb6c17f6cad8ccbda9ca99d 100644 --- a/src/sql/engine/join/ob_hash_join_op.h +++ b/src/sql/engine/join/ob_hash_join_op.h @@ -689,7 +689,7 @@ private: bool postprocessed_left_; bool has_fill_right_row_; bool has_fill_left_row_; - ObChunkDatumStore::ShadowStoredRow right_last_row_; + ObChunkDatumStore::ShadowStoredRow right_last_row_; bool need_return_; bool iter_end_; bool opt_cache_aware_; diff --git a/src/sql/engine/join/ob_merge_join_op.h b/src/sql/engine/join/ob_merge_join_op.h index de5fbe7ba04665249f1d7d4ff753addc01c8d560..4416aaa4f704376484e2fd25b98bef32aeb76ef4 100644 --- a/src/sql/engine/join/ob_merge_join_op.h +++ b/src/sql/engine/join/ob_merge_join_op.h @@ -192,7 +192,7 @@ private: bool has_backup_row_; bool reach_end_; // child iterator end bool* left_row_joined_; - ObChunkDatumStore::ShadowStoredRow<> store_row_; + ObChunkDatumStore::ShadowStoredRow store_row_; ObOperator* child_; }; diff --git a/src/sql/engine/join/ob_nested_loop_join_op.h b/src/sql/engine/join/ob_nested_loop_join_op.h index cd02a8a51e0a980a2ca418cf45223ad552ca02f3..3131a6c77d1ef4993af4e980b665d09736335014 100644 --- a/src/sql/engine/join/ob_nested_loop_join_op.h +++ b/src/sql/engine/join/ob_nested_loop_join_op.h @@ -102,7 +102,7 @@ public: ObChunkDatumStore left_store_; ObChunkDatumStore::Iterator left_store_iter_; bool is_left_end_; - ObChunkDatumStore::ShadowStoredRow<> last_store_row_; + ObChunkDatumStore::ShadowStoredRow last_store_row_; bool save_last_row_; private: diff --git a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h index 9ac72bd93f4ebde640be96fc2700a1046add1024..2829a081807226e573b1450b50e571fdd00caf69 100644 --- a/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h +++ b/src/sql/engine/pdml/static/ob_pdml_op_data_driver.h @@ -132,7 +132,7 @@ private: DriverState state_; // Driver current state ObEvalCtx* eval_ctx_; - ObChunkDatumStore::LastStoredRow<> last_row_; + ObChunkDatumStore::LastStoredRow last_row_; int64_t last_row_part_id_; const ObExprPtrIArray* last_row_expr_; int64_t op_id_; diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp index 7229b9f445cc25539fbc903cccf7d4d0358119be..74833c116a660e059baaff22199c8cb45be9671d 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.cpp @@ -129,12 +129,12 @@ int ObPxMSCoordOp::init_store_rows(int64_t n_ways) LOG_WARN("unexpected status: store rows is not empty", K(ret), K(n_ways)); } for (int64_t i = 0; i < n_ways && OB_SUCC(ret); ++i) { - void* buf = alloc_.alloc(sizeof(ObChunkDatumStore::LastStoredRow<>)); + void* buf = alloc_.alloc(sizeof(ObChunkDatumStore::LastStoredRow)); if (OB_ISNULL(buf)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to alloca memory", K(ret)); } else { - ObChunkDatumStore::LastStoredRow<>* store_row = new (buf) ObChunkDatumStore::LastStoredRow<>(alloc_); + ObChunkDatumStore::LastStoredRow* store_row = new (buf) ObChunkDatumStore::LastStoredRow(alloc_); if (OB_FAIL(store_rows_.push_back(store_row))) { LOG_WARN("failed to push back", K(ret)); } @@ -160,7 +160,7 @@ int ObPxMSCoordOp::free_allocator() last_pop_row_ = nullptr; // the heap shoud be empty before store_rows_.reset(); while (OB_SUCC(ret) && row_heap_.count() > 0) { - const ObChunkDatumStore::LastStoredRow<>* pop_row = nullptr; + const ObChunkDatumStore::LastStoredRow* pop_row = nullptr; if (OB_SUCC(row_heap_.raw_pop(pop_row))) { row_heap_.shrink(); } else { @@ -324,7 +324,7 @@ int ObPxMSCoordOp::next_row(bool& wait_next_msg) LOG_WARN("fail push row to heap", K(ret)); } } else { - ObChunkDatumStore::LastStoredRow<>* cur_row = nullptr; + ObChunkDatumStore::LastStoredRow* cur_row = nullptr; if (OB_FAIL(store_rows_.at(row_heap_.writable_channel_idx(), cur_row))) { LOG_WARN("failed to get store row", K(ret), K(row_heap_.writable_channel_idx())); } else if (OB_FAIL(cur_row->save_store_row(MY_SPEC.all_exprs_, eval_ctx_, 0))) { @@ -346,14 +346,14 @@ int ObPxMSCoordOp::next_row(bool& wait_next_msg) all_rows_finish_ = true; metric_.mark_last_out(); } else if (row_heap_.capacity() == row_heap_.count()) { - const ObChunkDatumStore::LastStoredRow<>* pop_row = nullptr; + const ObChunkDatumStore::LastStoredRow* pop_row = nullptr; if (OB_FAIL(row_heap_.pop(pop_row))) { LOG_WARN("failed to pop row", K(ret)); } else if (OB_FAIL(pop_row->store_row_->to_expr(MY_SPEC.all_exprs_, eval_ctx_))) { LOG_WARN("failed to to exprs", K(ret)); } else { wait_next_msg = false; - last_pop_row_ = const_cast*>(pop_row); + last_pop_row_ = const_cast(pop_row); } metric_.count(); metric_.mark_first_out(); diff --git a/src/sql/engine/px/exchange/ob_px_ms_coord_op.h b/src/sql/engine/px/exchange/ob_px_ms_coord_op.h index 736be796a94bbda7b39ac2ac2c7f6e4c7d195f4c..0f2fbd8fa0497c8771653bdb5cb558da5c6785f8 100644 --- a/src/sql/engine/px/exchange/ob_px_ms_coord_op.h +++ b/src/sql/engine/px/exchange/ob_px_ms_coord_op.h @@ -72,7 +72,7 @@ public: }; class ObMsgReceiveFilter : public dtl::ObIDltChannelLoopPred { public: - ObMsgReceiveFilter(ObRowHeap>& heap) + ObMsgReceiveFilter(ObRowHeap& heap) : data_ch_idx_start_(-1), data_ch_idx_end_(-1), heap_(heap) {} ~ObMsgReceiveFilter() = default; @@ -96,7 +96,7 @@ public: private: int64_t data_ch_idx_start_; int64_t data_ch_idx_end_; - ObRowHeap>& heap_; + ObRowHeap& heap_; }; public: @@ -127,9 +127,9 @@ private: ObBarrierPieceMsgP barrier_piece_msg_proc_; ObWinbufPieceMsgP winbuf_piece_msg_proc_; ObPxQcInterruptedP interrupt_proc_; - ObArray*> store_rows_; - ObChunkDatumStore::LastStoredRow<>* last_pop_row_; - ObRowHeap> row_heap_; + ObArray store_rows_; + ObChunkDatumStore::LastStoredRow* last_pop_row_; + ObRowHeap row_heap_; ObMsgReceiveFilter receive_order_; common::ObArenaAllocator alloc_; }; diff --git a/src/sql/engine/px/exchange/ob_px_transmit_op.h b/src/sql/engine/px/exchange/ob_px_transmit_op.h index 3603b73cb3f4f21870973a5c1814584c1caa6183..9a0464a573005f38008f77f3f11682cdc2b0c167 100644 --- a/src/sql/engine/px/exchange/ob_px_transmit_op.h +++ b/src/sql/engine/px/exchange/ob_px_transmit_op.h @@ -135,7 +135,7 @@ protected: common::ObArenaAllocator px_row_allocator_; ObPxTaskChSet task_ch_set_; bool transmited_; - // const ObChunkDatum::LastStoredRow<> first_row_; + // const ObChunkDatum::LastStoredRow first_row_; bool iter_end_; bool consume_first_row_; dtl::ObDtlUnblockingMsgP dfc_unblock_msg_proc_; diff --git a/src/sql/engine/px/exchange/ob_row_heap.cpp b/src/sql/engine/px/exchange/ob_row_heap.cpp index 7bc34063d5ff7ddffbed629589e6cbfc076490d3..9b008e07cdef1762edc34cbc502205edd02b439c 100644 --- a/src/sql/engine/px/exchange/ob_row_heap.cpp +++ b/src/sql/engine/px/exchange/ob_row_heap.cpp @@ -129,7 +129,7 @@ ObMaxDatumRowCompare::ObMaxDatumRowCompare() int ObMaxDatumRowCompare::init(const ObIArray* sort_collations, const ObIArray* sort_cmp_funs, - const common::ObIArray*>& rows) + const common::ObIArray& rows) { int ret = OB_SUCCESS; bool is_static_cmp = false; diff --git a/src/sql/engine/px/exchange/ob_row_heap.h b/src/sql/engine/px/exchange/ob_row_heap.h index aa158eece73af0e01d030990f01045b4b1e3a324..3cefd1c8bce9670c20b1ba00ab2cef84f1dc49a8 100644 --- a/src/sql/engine/px/exchange/ob_row_heap.h +++ b/src/sql/engine/px/exchange/ob_row_heap.h @@ -84,7 +84,7 @@ class ObMaxDatumRowCompare { public: ObMaxDatumRowCompare(); int init(const ObIArray* sort_collations, const ObIArray* sort_cmp_funs, - const common::ObIArray*>& rows); + const common::ObIArray& rows); // compare function for quick sort. bool operator()(int64_t row_idx1, int64_t row_idx2); @@ -113,7 +113,7 @@ public: int ret_; const ObIArray* sort_collations_; const ObIArray* sort_cmp_funs_; - const common::ObIArray*>* rows_; + const common::ObIArray* rows_; }; /* diff --git a/src/sql/engine/recursive_cte/ob_search_method_op.cpp b/src/sql/engine/recursive_cte/ob_search_method_op.cpp index b6b6f8b554bd79d18d0244a9eb5b3440f05a2456..09a134eb5ab89c27cb0da7a23d6428afe24d3b0b 100644 --- a/src/sql/engine/recursive_cte/ob_search_method_op.cpp +++ b/src/sql/engine/recursive_cte/ob_search_method_op.cpp @@ -70,7 +70,7 @@ bool ObSearchMethodOp::ObCycleHash::operator==(const ObCycleHash& other) const int ObSearchMethodOp::add_row(const ObIArray& exprs, ObEvalCtx& eval_ctx) { int ret = OB_SUCCESS; - ObChunkDatumStore::LastStoredRow<> last_row(allocator_); + ObChunkDatumStore::LastStoredRow last_row(allocator_); if (input_rows_.empty() && 0 == input_rows_.get_capacity() && OB_FAIL(input_rows_.reserve(INIT_ROW_COUNT))) { LOG_WARN("Failed to pre allocate array", K(ret)); } else if (OB_UNLIKELY(exprs.empty())) { diff --git a/src/sql/engine/set/ob_merge_set_op.h b/src/sql/engine/set/ob_merge_set_op.h index 34fbd27b8d23f93d3f08383ed4ff46e78c4f70fd..4cb25f417ca1ccab4ca45679ebeda6ce84d7bba8 100644 --- a/src/sql/engine/set/ob_merge_set_op.h +++ b/src/sql/engine/set/ob_merge_set_op.h @@ -70,7 +70,7 @@ protected: protected: common::ObArenaAllocator alloc_; - ObChunkDatumStore::LastStoredRow<> last_row_; + ObChunkDatumStore::LastStoredRow last_row_; Compare cmp_; bool need_skip_init_row_; bool iter_end_; diff --git a/src/sql/engine/sort/ob_sort_op_impl.cpp b/src/sql/engine/sort/ob_sort_op_impl.cpp index 7ccd2b5320d5cb715585468da003a5c0048e1902..c4bc268d6653b1ed7c84b6ba4adf16a8ec9eb2d2 100644 --- a/src/sql/engine/sort/ob_sort_op_impl.cpp +++ b/src/sql/engine/sort/ob_sort_op_impl.cpp @@ -1425,21 +1425,19 @@ int ObInMemoryTopnSortImpl::add_row(const common::ObIArray& exprs, bool // optimize for hit-rate: enlarge first Limit-Count row's space, // so following rows are more likely to fit in. int64_t row_size = 0; - if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, *eval_ctx_, row_size))) { + if (OB_FAIL(ObChunkDatumStore::Block::row_store_size(exprs, *eval_ctx_, row_size, STORE_ROW_EXTRA_SIZE))) { LOG_WARN("failed to calc copy size", K(ret)); } else { - int64_t buffer_len = STORE_ROW_HEADER_SIZE + 2 * row_size + STORE_ROW_EXTRA_SIZE; - if (OB_ISNULL(buf = reinterpret_cast(cur_alloc_.alloc(buffer_len)))) { + int64_t buffer_len = 2 * row_size; + if (OB_ISNULL(buf = reinterpret_cast(cur_alloc_.alloc(buffer_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_ERROR("alloc buf failed", K(ret)); - } else if (OB_ISNULL(new_row = new (buf) SortStoredRow())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("failed to new row", K(ret)); } else { - int64_t pos = STORE_ROW_HEADER_SIZE; - if (OB_FAIL(new_row->copy_datums( - exprs, *eval_ctx_, buf + pos, buffer_len - STORE_ROW_HEADER_SIZE, row_size, STORE_ROW_EXTRA_SIZE))) { - LOG_WARN("failed to deep copy row", K(ret), K(buffer_len)); + ObChunkDatumStore::StoredRow *sr = NULL; + if (OB_FAIL( + ObChunkDatumStore::StoredRow::build(sr, exprs, *eval_ctx_, buf, buffer_len, STORE_ROW_EXTRA_SIZE))) { + LOG_WARN("build stored row failed", K(ret)); + } else if (FALSE_IT(new_row = static_cast(sr))) { } else if (OB_FAIL(heap_.push(new_row))) { LOG_WARN("failed to push back row", K(ret), K(buffer_len)); } else { @@ -1469,37 +1467,33 @@ int ObInMemoryTopnSortImpl::adjust_topn_heap(const common::ObIArray& ex char* buf = NULL; int64_t row_size = 0; int64_t buffer_len = 0; - if (OB_FAIL(ObChunkDatumStore::row_copy_size(exprs, *eval_ctx_, row_size))) { + if (OB_FAIL(ObChunkDatumStore::Block::row_store_size(exprs, *eval_ctx_, row_size, STORE_ROW_EXTRA_SIZE))) { LOG_WARN("failed to calc copy size", K(ret)); } else { // check to see whether this old row's space is adequate for new one - if (dt_row->get_max_size() >= row_size + STORE_ROW_HEADER_SIZE + STORE_ROW_EXTRA_SIZE) { + if (dt_row->get_max_size() >= row_size) { buf = reinterpret_cast(dt_row); new_row = dt_row; buffer_len = dt_row->get_max_size(); } else { - buffer_len = row_size * 2 + STORE_ROW_HEADER_SIZE + STORE_ROW_EXTRA_SIZE; + buffer_len = row_size * 2; if (OB_ISNULL(buf = reinterpret_cast(cur_alloc_.alloc(buffer_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_ERROR("alloc buf failed", K(ret)); - } else if (OB_ISNULL(new_row = new (buf) SortStoredRow())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("failed to new row", K(ret)); } } } if (OB_SUCC(ret)) { - int64_t pos = STORE_ROW_HEADER_SIZE; - if (OB_FAIL(new_row->copy_datums( - exprs, *eval_ctx_, buf + pos, buffer_len - STORE_ROW_HEADER_SIZE, row_size, STORE_ROW_EXTRA_SIZE))) { - LOG_WARN("failed to deep copy row", K(ret), K(buffer_len), K(row_size)); + ObChunkDatumStore::StoredRow *sr = NULL; + if (OB_FAIL( + ObChunkDatumStore::StoredRow::build(sr, exprs, *eval_ctx_, buf, buffer_len, STORE_ROW_EXTRA_SIZE))) { + LOG_WARN("build stored row failed", K(ret)); + } else if (FALSE_IT(new_row = static_cast(sr))) { } else if (OB_FAIL(heap_.replace_top(new_row))) { LOG_WARN("failed to replace top", K(ret)); } else { new_row->set_max_size(buffer_len); last_row_ = new_row; - // LOG_TRACE("in memory topn sort check replace row", KPC(new_row), - // K(buffer_len), K(row_size), K(new_row->get_max_size())); } } } else { diff --git a/src/sql/engine/sort/ob_sort_op_impl.h b/src/sql/engine/sort/ob_sort_op_impl.h index 9733111f5dacad65565030b1826930d44e5a5aab..1d2a3c70d70b5d6aec149b4b4bb0d6c1dba4e334 100644 --- a/src/sql/engine/sort/ob_sort_op_impl.h +++ b/src/sql/engine/sort/ob_sort_op_impl.h @@ -340,7 +340,7 @@ private: const ObChunkDatumStore::StoredRow* prev_row_; // when got new prefix, save the row to to %next_prefix_row_ - ObChunkDatumStore::ShadowStoredRow<> next_prefix_row_store_; + ObChunkDatumStore::ShadowStoredRow next_prefix_row_store_; ObChunkDatumStore::StoredRow* next_prefix_row_; ObOperator* child_; diff --git a/src/sql/engine/subquery/ob_subplan_filter_op.cpp b/src/sql/engine/subquery/ob_subplan_filter_op.cpp index d28a2e1d8a069c96381bf671a62eab4c64c3dd02..54d1617622b0ea84a695c4273998978479b90bed 100644 --- a/src/sql/engine/subquery/ob_subplan_filter_op.cpp +++ b/src/sql/engine/subquery/ob_subplan_filter_op.cpp @@ -470,7 +470,7 @@ int ObSubPlanFilterOp::handle_update_set() ret = OB_ERR_UNEXPECTED; LOG_WARN("too many subplan unexpected", K(ret), K(subplan_iters_.count())); } else { - ObChunkDatumStore::LastStoredRow<> row_val(update_set_mem_->get_arena_allocator()); + ObChunkDatumStore::LastStoredRow row_val(update_set_mem_->get_arena_allocator()); Iterator* iter = subplan_iters_.at(0); if (OB_ISNULL(iter)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/engine/window_function/ob_window_function_op.h b/src/sql/engine/window_function/ob_window_function_op.h index 916922f0c53a9a064075e33230eb4014e4ea67a0..8bd1d36c83c97e6591050369b82d613ffcedda81 100644 --- a/src/sql/engine/window_function/ob_window_function_op.h +++ b/src/sql/engine/window_function/ob_window_function_op.h @@ -272,7 +272,7 @@ public: int64_t wf_idx_; int64_t part_first_row_idx_; - ObChunkDatumStore::LastStoredRow<> part_values_; + ObChunkDatumStore::LastStoredRow part_values_; RowsStore part_rows_store_; Frame last_valid_frame_; @@ -494,7 +494,7 @@ private: RowsStore rows_store_; WinFuncCellList wf_list_; // shadow copy the next and restore it before get next row from child. - ObChunkDatumStore::ShadowStoredRow<> next_row_; + ObChunkDatumStore::ShadowStoredRow next_row_; bool next_row_valid_; // TODO DatumFixedArray curr_row_collect_values_;