提交 4de3feb7 编写于 作者: O obdev 提交者: wangzelin.wzl

batch cherry pick bugfix codes

上级 9c766360
......@@ -146,9 +146,7 @@ int ObPxRepartTransmit::do_repart_transmit(ObExecContext& exec_ctx, ObRepartSlic
}
if (OB_SUCC(ret)) {
// init the ObRepartSliceIdxCalc cache map
if (OB_FAIL(repart_slice_calc.init_partition_cache_map())) {
LOG_WARN("failed to repart_slice_calc init partiiton cache map", K(ret));
} else if (OB_FAIL(repart_slice_calc.init())) {
if (OB_FAIL(repart_slice_calc.init())) {
LOG_WARN("failed to init repart slice calc", K(ret));
} else if (OB_FAIL(send_rows(exec_ctx, *transmit_ctx, repart_slice_calc))) {
LOG_WARN("failed to send rows", K(ret));
......
......@@ -117,9 +117,7 @@ int ObPxRepartTransmitOp::do_repart_transmit(ObRepartSliceIdxCalc& repart_slice_
{
int ret = OB_SUCCESS;
// init the ObRepartSliceIdxCalc cache map
if (OB_FAIL(repart_slice_calc.init_partition_cache_map())) {
LOG_WARN("failed to repart_slice_calc init partiiton cache map", K(ret));
} else if (OB_FAIL(repart_slice_calc.init())) {
if (OB_FAIL(repart_slice_calc.init())) {
LOG_WARN("failed to init repart slice calc", K(ret));
} else if (OB_FAIL(send_rows(repart_slice_calc))) {
LOG_WARN("failed to send rows", K(ret));
......
......@@ -304,66 +304,5 @@ int ObShuffleService::init_expr_ctx(ObExecContext& exec_ctx)
return ret;
}
int ObShuffleService::get_hash_part_id(
const ObNewRow& row, const int64_t part_num, ObIArray<int64_t>& part_ids, const PartIdx2PartIdMap& part_map)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(row.is_invalid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is invalid", K(ret), K(row));
} else if (1 != row.get_count() || (!row.get_cell(0).is_int() && !row.get_cell(0).is_null())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is invalid", K(ret), K(row));
} else {
int64_t value = row.get_cell(0).is_int() ? row.get_cell(0).get_int() : 0;
int64_t part_idx = -1;
int64_t part_id = -1;
if (value < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("value should not be less than 0", K(ret), K(row));
} else if (OB_FAIL(ObPartitionUtils::calc_hash_part_idx(value, part_num, part_idx))) {
LOG_WARN("fail to calc hash part idx", K(ret));
} else if (OB_FAIL(part_map.get_refactored(part_idx, part_id))) {
LOG_WARN("fail to get part id by idx", K(ret), K(part_idx));
} else if (OB_FAIL(part_ids.push_back(part_id))) {
LOG_WARN("fail to add part id", K(ret));
} else {
// do nothing
}
}
return ret;
}
int ObShuffleService::get_hash_subpart_id(const common::ObNewRow& row, const int64_t subpart_num,
common::ObIArray<int64_t>& subpart_ids, const SubPartIdx2SubPartIdMap& subpart_map)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(row.is_invalid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is invalid", K(ret), K(row));
} else if (1 != row.get_count() || (!row.get_cell(0).is_int() && !row.get_cell(0).is_null())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("row is invalid", K(ret), K(row));
} else {
int64_t value = row.get_cell(0).is_int() ? row.get_cell(0).get_int() : 0;
int64_t subpart_idx = -1;
int64_t subpart_id = -1;
if (value < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("value should not be less than 0", K(ret), K(row));
} else if (OB_FAIL(ObPartitionUtils::calc_hash_part_idx(value, subpart_num, subpart_idx))) {
LOG_WARN("fail to calc hash subpart idx", K(ret), K(value), K(subpart_num));
} else if (OB_FAIL(subpart_map.get_refactored(subpart_idx, subpart_id))) {
LOG_WARN("fail to get subpart id by idx", K(ret), K(subpart_idx));
} else if (OB_FAIL(subpart_ids.push_back(subpart_id))) {
LOG_WARN("fail to push back subpart id to array", K(ret));
} else {
// do nothing
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase
......@@ -33,9 +33,7 @@ public:
ObShuffleService(ObIAllocator& allocator)
: allocator_(allocator),
row_cache_(),
current_cell_count_(-1),
part_idx_to_part_id_(NULL),
subpart_idx_to_subpart_id_(NULL)
current_cell_count_(-1)
{}
~ObShuffleService() = default;
......@@ -46,16 +44,6 @@ public:
const ObIArray<ObTransmitRepartColumn>& repart_sub_columns, int64_t& part_idx, int64_t& subpart_idx,
bool& no_match_partiton);
inline void set_part_idx_to_part_id_map(PartIdx2PartIdMap* map)
{
part_idx_to_part_id_ = map;
}
inline void set_subpart_idx_to_subpart_id_map(SubPartIdx2SubPartIdMap* map)
{
subpart_idx_to_subpart_id_ = map;
}
// This interface is only used under the px framework,
// non-px please use the above interface.
int get_partition_ids(ObExecContext& exec_ctx, const share::schema::ObTableSchema& table_schema,
......@@ -118,11 +106,6 @@ private:
const ObIArray<ObTransmitRepartColumn>& repart_columns, common::ObNewRow& out_row, bool hash_part,
bool is_hash_v2);
int get_hash_part_id(const common::ObNewRow& row, const int64_t part_num, common::ObIArray<int64_t>& part_ids,
const PartIdx2PartIdMap& part_map);
int get_hash_subpart_id(const common::ObNewRow& row, const int64_t subpart_num,
common::ObIArray<int64_t>& subpart_ids, const SubPartIdx2SubPartIdMap& subpart_map);
public:
constexpr static int64_t NO_MATCH_PARTITION = -2;
......@@ -133,8 +116,6 @@ private:
// Expr calculation context
ObExprCtx expr_ctx_;
PartIdx2PartIdMap* part_idx_to_part_id_;
SubPartIdx2SubPartIdMap* subpart_idx_to_subpart_id_;
};
} // namespace sql
......
......@@ -96,112 +96,6 @@ int ObRepartSliceIdxCalc::get_slice_idx(const common::ObNewRow& row, int64_t& sl
return ret;
}
int ObRepartSliceIdxCalc::init_partition_cache_map()
{
int ret = OB_SUCCESS;
ObPartitionLevel level = table_schema_.get_part_level();
if (PARTITION_LEVEL_ONE == level || PARTITION_LEVEL_TWO == level) {
int64_t part_num = table_schema_.get_part_option().get_part_num();
const ObPartition* part = NULL;
bool check_dropped_schema = false;
ObPartIteratorV2 iter(table_schema_, check_dropped_schema);
if (part_num < 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the part num is not correct", K(ret), K(part_num));
} else if (OB_FAIL(init_cache_map(part_id_to_part_array_idx_))) {
LOG_WARN("init the part_id_to_part_array_idx map fail", K(ret), K(table_schema_));
} else {
int i = 0;
while (OB_SUCC(ret) && OB_SUCC(iter.next(part))) {
if (OB_ISNULL(part)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), K(part));
} else if (OB_FAIL(part_id_to_part_array_idx_.set_refactored(part->get_part_id(), i))) {
LOG_WARN("fail to set part_id_to_part_array_idx", K(ret), K(part_num), K(i), K(*part));
} else {
i++;
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
if (OB_SUCC(ret) && table_schema_.is_hash_part()) {
if (OB_FAIL(init_cache_map(part_idx_to_part_id_))) {
LOG_WARN("init the part_idx_to_part_array_id map fail", K(ret), K(table_schema_));
} else {
iter.init(table_schema_, check_dropped_schema);
while (OB_SUCC(ret) && OB_SUCC(iter.next(part))) {
if (OB_ISNULL(part)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret), K(part));
} else if (OB_FAIL(part_idx_to_part_id_.set_refactored(part->get_part_idx(), part->get_part_id()))) {
LOG_WARN("fail to set part_idx_to_part_id", K(ret), K(part_num), K(*part));
}
}
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) {
// set the part_idx_to_part_id map to shuffle_service_
shuffle_service_.set_part_idx_to_part_id_map(&part_idx_to_part_id_);
}
}
}
if (OB_SUCC(ret) && PARTITION_LEVEL_TWO == level) {
int64_t subpart_num = table_schema_.get_sub_part_option().get_part_num();
ObSubPartition** subpart_array = table_schema_.get_def_subpart_array();
if (subpart_num < 1) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the sub part num is not correct", K(ret), K(subpart_num));
} else if (OB_ISNULL(subpart_array)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the sub part array is null", K(ret), K(subpart_array));
} else if (OB_FAIL(init_cache_map(subpart_id_to_subpart_array_idx_))) {
LOG_WARN("init the subpart_id_to_subpart_idx map fail", K(ret), K(table_schema_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < subpart_num; i++) {
const ObSubPartition* subpart = subpart_array[i];
if (OB_FAIL(subpart_id_to_subpart_array_idx_.set_refactored(subpart->get_sub_part_id(), i))) {
LOG_WARN("fail to set subpart_id_to_subpart_idx", K(ret), K(i), K(subpart_num), K(*subpart));
}
}
}
if (OB_SUCC(ret) && table_schema_.is_hash_subpart()) {
if (OB_FAIL(init_cache_map(subpart_idx_to_subpart_id_))) {
LOG_WARN("init the subpart_idx_to_subpart_id map fail", K(ret), K(table_schema_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < subpart_num; i++) {
const ObSubPartition* subpart = subpart_array[i];
if (OB_FAIL(subpart_idx_to_subpart_id_.set_refactored(
subpart->get_sub_part_idx(), subpart->get_sub_part_id()))) {
LOG_WARN("fail to set subpart_idx_to_subpart_id", K(i), K(subpart_num), K(ret), K(*subpart));
}
}
// set the subpart_idx_to_subpart_id to shuffle_service_
shuffle_service_.set_subpart_idx_to_subpart_id_map(&subpart_idx_to_subpart_id_);
}
}
}
}
return ret;
}
int ObRepartSliceIdxCalc::init_cache_map(hash::ObHashMap<int64_t, int64_t, hash::NoPthreadDefendMode>& map)
{
int ret = OB_SUCCESS;
if (map.created()) {
if (OB_FAIL(map.reuse())) {
LOG_WARN("reuse the partition info map fail", K(ret));
}
} else if (OB_FAIL(map.create(DEFAULT_CACHE_MAP_BUCKET_NUM, ObModIds::OB_HASH_BUCKET_PX_TRANSIMIT_REPART))) {
LOG_WARN("create the partition info map fail", K(ret));
}
return ret;
}
int ObRepartSliceIdxCalc::get_partition_id(const common::ObNewRow& row, int64_t& partition_id)
{
int ret = OB_SUCCESS;
......
......@@ -135,22 +135,7 @@ public:
typedef common::hash::ObHashMap<int64_t, int64_t, common::hash::NoPthreadDefendMode> PartId2ArrayIdxMap;
typedef common::hash::ObHashMap<int64_t, int64_t, common::hash::NoPthreadDefendMode> SubPartId2ArrayIdxMap;
virtual ~ObRepartSliceIdxCalc()
{
// destory map
if (part_id_to_part_array_idx_.created()) {
part_id_to_part_array_idx_.destroy();
}
if (part_idx_to_part_id_.created()) {
part_idx_to_part_id_.destroy();
}
if (subpart_id_to_subpart_array_idx_.created()) {
subpart_id_to_subpart_array_idx_.destroy();
}
if (subpart_idx_to_subpart_id_.created()) {
subpart_idx_to_subpart_id_.destroy();
}
}
virtual ~ObRepartSliceIdxCalc() {}
virtual int get_slice_idx(const common::ObNewRow& row, int64_t& slice_idx) override;
virtual int get_slice_idx(const ObIArray<ObExpr*>& exprs, ObEvalCtx& eval_ctx, int64_t& slice_idx) override;
......@@ -180,7 +165,6 @@ private:
// get part id from hashmap, implicate that only one level-1 part in the map
virtual int get_part_id_by_one_level_sub_ch_map(int64_t& part_id);
virtual int get_sub_part_id_by_one_level_first_ch_map(const int64_t part_id, int64_t& sub_part_id);
int init_cache_map(hash::ObHashMap<int64_t, int64_t, hash::NoPthreadDefendMode>& map);
protected:
ObExecContext& exec_ctx_;
......@@ -195,12 +179,6 @@ protected:
int64_t round_robin_idx_;
const ObPxPartChInfo& part_ch_info_;
ObPxPartChMap px_repart_ch_map_;
const static int64_t DEFAULT_CACHE_MAP_BUCKET_NUM = 64;
PartId2ArrayIdxMap part_id_to_part_array_idx_;
ObShuffleService::PartIdx2PartIdMap part_idx_to_part_id_;
SubPartId2ArrayIdxMap subpart_id_to_subpart_array_idx_;
ObShuffleService::SubPartIdx2SubPartIdMap subpart_idx_to_subpart_id_;
ObRepartitionType repart_type_;
};
......
......@@ -510,6 +510,10 @@ int ObQueryCtx::generate_stmt_name(ObIAllocator* allocator)
LOG_WARN("Allocator should not be NULL", K(ret));
} else if (stmt_id_name_map_.count() > 0) {
std::sort(stmt_id_name_map_.begin(), stmt_id_name_map_.end());
int64_t start_ids[5];
for (int64_t i = 0; i < 5; ++i) {
start_ids[i] = 1;
}
for (int64_t idx = 0; OB_SUCC(ret) && idx < stmt_id_name_map_.count(); ++idx) {
IdNamePair& id_name_pair = stmt_id_name_map_.at(idx);
if (!id_name_pair.origin_name_.empty()) {
......@@ -518,9 +522,21 @@ int ObQueryCtx::generate_stmt_name(ObIAllocator* allocator)
stmt::T_REPLACE == id_name_pair.stmt_type_ || stmt::T_DELETE == id_name_pair.stmt_type_ ||
stmt::T_UPDATE == id_name_pair.stmt_type_) {
int64_t pos = 0;
int64_t* start_id = NULL;
if (stmt::T_SELECT == id_name_pair.stmt_type_) {
start_id = &start_ids[0];
} else if (stmt::T_INSERT == id_name_pair.stmt_type_) {
start_id = &start_ids[1];
} else if (stmt::T_REPLACE == id_name_pair.stmt_type_) {
start_id = &start_ids[2];
} else if (stmt::T_DELETE == id_name_pair.stmt_type_) {
start_id = &start_ids[3];
} else if (stmt::T_UPDATE == id_name_pair.stmt_type_) {
start_id = &start_ids[4];
}
if (OB_FAIL(get_dml_stmt_name(id_name_pair.stmt_type_, buf, OB_MAX_QB_NAME_LENGTH, pos))) {
LOG_WARN("Get dml stmt name", K(ret));
} else if (OB_FAIL(append_id_to_stmt_name(buf, OB_MAX_QB_NAME_LENGTH, pos))) {
} else if (OB_FAIL(append_id_to_stmt_name(buf, OB_MAX_QB_NAME_LENGTH, pos, *start_id))) {
LOG_WARN("Failed to append id to stmt name", K(ret));
} else {
ObString generate_name(pos, buf);
......@@ -592,7 +608,7 @@ int ObQueryCtx::get_dml_stmt_name(stmt::StmtType stmt_type, char* buf, int64_t b
return ret;
}
int ObQueryCtx::append_id_to_stmt_name(char* buf, int64_t buf_len, int64_t& pos)
int ObQueryCtx::append_id_to_stmt_name(char* buf, int64_t buf_len, int64_t& pos, int64_t& id_start)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(buf)) {
......@@ -603,7 +619,7 @@ int ObQueryCtx::append_id_to_stmt_name(char* buf, int64_t buf_len, int64_t& pos)
LOG_WARN("Buf size not enough", K(ret));
} else {
bool find_unique = false;
int64_t id = 1;
int64_t id = id_start;
int64_t old_pos = pos;
while (!find_unique && OB_SUCC(ret)) {
pos = old_pos;
......@@ -621,6 +637,9 @@ int ObQueryCtx::append_id_to_stmt_name(char* buf, int64_t buf_len, int64_t& pos)
++id;
}
}
if (OB_SUCC(ret) && find_unique) {
id_start = id;
}
}
return ret;
}
......
......@@ -493,7 +493,7 @@ public:
int get_dml_stmt_name(stmt::StmtType stmt_type, char* buf, int64_t buf_len, int64_t& pos);
int append_id_to_stmt_name(char* buf, int64_t buf_len, int64_t& pos);
int append_id_to_stmt_name(char* buf, int64_t buf_len, int64_t& pos, int64_t& id_start);
int get_stmt_name_by_id(const int64_t stmt_id, common::ObString& stmt_name) const;
int get_stmt_org_name_by_id(const int64_t stmt_id, common::ObString& org_name) const;
......
......@@ -249,6 +249,14 @@ int ObLogSubPlanScan::allocate_exchange_post(AllocExchContext* ctx)
} else if (OB_FAIL(update_weak_part_exprs(ctx))) {
LOG_WARN("failed to update weak part exprs", K(ret));
} else {
ObShardingInfo &child_sharding = child->get_sharding_info();
if (child_sharding.get_partition_keys().count() != sharding_info_.get_partition_keys().count() ||
child_sharding.get_sub_partition_keys().count() != sharding_info_.get_sub_partition_keys().count() ||
child_sharding.get_partition_func().count() != sharding_info_.get_partition_func().count()) {
sharding_info_.get_partition_keys().reset();
sharding_info_.get_sub_partition_keys().reset();
sharding_info_.get_partition_func().reset();
}
LOG_TRACE("subplan scan sharding info", K(sharding_info_));
}
return ret;
......
......@@ -720,7 +720,7 @@ int ObQueryRange::get_column_key_part(const ObRawExpr* l_expr, const ObRawExpr*
if (escape_val.is_unknown() && OB_ISNULL(query_range_ctx_->params_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (escape_val.is_unknown() && OB_FAIL(get_param_value(val, *query_range_ctx_->params_))) {
} else if (escape_val.is_unknown() && OB_FAIL(get_param_value(escape_val, *query_range_ctx_->params_))) {
LOG_WARN("failed to get param value", K(ret));
} else if (OB_FAIL(get_like_range(val, escape_val, *out_key_part, dtc_params))) {
LOG_WARN("get like range failed", K(ret));
......
......@@ -258,7 +258,12 @@ int ObTransformAggrSubquery::check_subquery_validity(
LOG_WARN("failed to check subquery on conditions", K(ret));
} else if (!is_valid) {
// do nothing
// 5. check correlated join contiditons
// 5. check correlated semi contiditons
} else if (OB_FAIL(check_subquery_semi_conditions(*subquery, is_valid))) {
LOG_WARN("failed to check subquery semi conditions", K(ret));
} else if (!is_valid) {
// do nothing
// 6. check correlated join contiditons
} else if (OB_FAIL(check_subquery_conditions(*subquery, nested_conditions, is_valid))) {
LOG_WARN("failed to check subquery conditions", K(ret));
}
......@@ -964,6 +969,11 @@ int ObTransformAggrSubquery::check_subquery_validity(
// 4. check correlated join on contiditons
} else if (OB_FAIL(check_subquery_on_conditions(*subquery, is_valid))) {
LOG_WARN("failed to check subquery on conditions", K(ret));
} else if (!is_valid) {
// do nothing
// 5. check correlated semi contiditons
} else if (OB_FAIL(check_subquery_semi_conditions(*subquery, is_valid))) {
LOG_WARN("failed to check subquery semi conditions", K(ret));
} else {
stmt_level = subquery->get_current_level();
}
......@@ -1612,3 +1622,32 @@ int ObTransformAggrSubquery::extract_no_rewrite_expr(ObRawExpr* expr)
}
return ret;
}
int ObTransformAggrSubquery::check_subquery_semi_conditions(ObSelectStmt &subquery,
bool &is_valid)
{
int ret = OB_SUCCESS;
is_valid = true;
for (int64_t i = 0; OB_SUCC(ret) && is_valid && i < subquery.get_semi_info_size(); ++i) {
bool is_correlated = false;
SemiInfo *semi_info = subquery.get_semi_infos().at(i);
if (OB_ISNULL(semi_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else {
for (int64_t j = 0; OB_SUCC(ret) && is_valid && j < semi_info->semi_conditions_.count(); ++j) {
ObRawExpr *cond = NULL;
if (OB_ISNULL(cond = semi_info->semi_conditions_.at(j))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("condition expr is null", K(ret));
} else if (OB_FAIL(ObTransformUtils::is_correlated_expr(
cond, subquery.get_current_level() - 1, is_correlated))) {
LOG_WARN("failed to check is correlated condition", K(ret));
} else if (is_correlated) {
is_valid = false;
}
}
}
}
return ret;
}
\ No newline at end of file
......@@ -159,6 +159,7 @@ private:
int is_valid_group_by(const ObSelectStmt& subquery, bool& is_valid);
int extract_no_rewrite_select_exprs(ObDMLStmt*& stmt);
int extract_no_rewrite_expr(ObRawExpr* expr);
int check_subquery_semi_conditions(ObSelectStmt &subquery, bool &is_valid);
private:
common::ObSEArray<ObRawExpr*, 8, common::ModulePageAllocator, true> no_rewrite_exprs_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册