From f0e34e03f6b56829cb249b1c74eaafec2e4b6c69 Mon Sep 17 00:00:00 2001 From: leslieyuchen Date: Mon, 29 Nov 2021 14:57:40 +0800 Subject: [PATCH] add dml data strict defensive check --- .../src/common/object/ob_obj_compare.cpp | 4 +- src/share/config/ob_server_config.h | 5 + src/share/ob_errno.cpp | 20 + src/share/ob_errno.def | 1 + src/share/ob_errno.h | 6 + src/share/parameter/ob_parameter_seed.ipp | 4 + src/share/schema/ob_table_dml_param.cpp | 128 ++- src/share/schema/ob_table_dml_param.h | 60 +- src/share/schema/ob_table_param.cpp | 89 +-- src/share/schema/ob_table_param.h | 65 +- src/sql/engine/dml/ob_multi_part_update.cpp | 15 +- .../engine/dml/ob_multi_part_update_op.cpp | 2 +- .../dml/ob_table_conflict_row_fetcher_op.cpp | 39 +- .../dml/ob_table_conflict_row_fetcher_op.h | 14 +- .../dml/ob_table_delete_returning_op.cpp | 2 +- src/sql/engine/dml/ob_table_modify.cpp | 47 ++ src/sql/engine/dml/ob_table_modify.h | 13 +- src/sql/engine/dml/ob_table_modify_op.cpp | 54 +- src/sql/engine/dml/ob_table_modify_op.h | 13 +- src/sql/engine/dml/ob_table_update.cpp | 6 +- src/sql/engine/dml/ob_table_update.h | 2 + src/sql/engine/dml/ob_table_update_op.cpp | 5 +- .../memtable/ob_memtable_row_reader.cpp | 4 +- src/storage/memtable/ob_memtable_row_reader.h | 8 +- src/storage/ob_dml_param.cpp | 26 +- src/storage/ob_dml_param.h | 35 +- src/storage/ob_handle_mgr.h | 2 +- src/storage/ob_i_store.cpp | 71 +- src/storage/ob_i_store.h | 20 +- src/storage/ob_partition_service.cpp | 24 + src/storage/ob_partition_storage.cpp | 739 +++++++++++------- src/storage/ob_partition_storage.h | 63 +- src/storage/ob_relative_table.cpp | 174 ++++- src/storage/ob_relative_table.h | 39 +- src/storage/ob_single_merge.cpp | 19 +- src/storage/ob_single_merge.h | 2 - .../ob_sstable_multi_version_row_iterator.cpp | 2 +- .../ob_sstable_multi_version_row_iterator.h | 4 +- src/storage/ob_sstable_row_iterator.h | 2 +- src/storage/ob_value_row_iterator.cpp | 158 +++- src/storage/ob_value_row_iterator.h | 45 ++ src/storage/transaction/ob_trans_define.h | 6 +- 42 files changed, 1468 insertions(+), 569 deletions(-) diff --git a/deps/oblib/src/common/object/ob_obj_compare.cpp b/deps/oblib/src/common/object/ob_obj_compare.cpp index a73be5bd5..fc37bd792 100644 --- a/deps/oblib/src/common/object/ob_obj_compare.cpp +++ b/deps/oblib/src/common/object/ob_obj_compare.cpp @@ -2956,13 +2956,13 @@ int ObObjCmpFuncs::compare(const ObObj& obj1, const ObObj& obj2, ObCollationType obj_cmp_func cmp_func = NULL; cmp = CR_EQ; if (OB_UNLIKELY(false == can_cmp_without_cast(obj1.get_meta(), obj2.get_meta(), CO_CMP, cmp_func))) { - LOG_ERROR("obj1 and obj2 can't compare", K(obj1), K(obj2), K(obj1.get_meta()), K(obj2.get_meta())); ret = OB_ERR_UNEXPECTED; + LOG_WARN("obj1 and obj2 can't compare", K(obj1), K(obj2), K(obj1.get_meta()), K(obj2.get_meta())); } else { ObCompareCtx cmp_ctx(ObMaxType, cs_type, true, INVALID_TZ_OFF, lib::is_oracle_mode() ? NULL_LAST : NULL_FIRST); if (OB_UNLIKELY(CR_OB_ERROR == (cmp = cmp_func(obj1, obj2, cmp_ctx)))) { - LOG_ERROR("failed to compare obj1 and obj2", K(obj1), K(obj2), K(obj1.get_meta()), K(obj2.get_meta())); ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to compare obj1 and obj2", K(obj1), K(obj2), K(obj1.get_meta()), K(obj2.get_meta())); } } return ret; diff --git a/src/share/config/ob_server_config.h b/src/share/config/ob_server_config.h index a2a4458e9..272785d19 100644 --- a/src/share/config/ob_server_config.h +++ b/src/share/config/ob_server_config.h @@ -135,6 +135,11 @@ public: } bool enable_static_engine_for_query() const; + bool enable_defensive_check() const + { + return _enable_defensive_check && lib::is_diagnose_info_enabled(); + } + bool is_major_version_upgrade() const { return false; diff --git a/src/share/ob_errno.cpp b/src/share/ob_errno.cpp index 6f9af31ba..59b82318c 100644 --- a/src/share/ob_errno.cpp +++ b/src/share/ob_errno.cpp @@ -3298,6 +3298,26 @@ static struct ObStrErrorInit ORACLE_ERRNO[-OB_UNIMPLEMENTED_FEATURE] = 3001; ORACLE_STR_ERROR[-OB_UNIMPLEMENTED_FEATURE] = "ORA-03001: unimplemented feature"; ORACLE_STR_USER_ERROR[-OB_UNIMPLEMENTED_FEATURE] = "ORA-03001: unimplemented feature"; + ERROR_NAME[-OB_ERR_DEFENSIVE_CHECK] = "OB_ERR_DEFENSIVE_CHECK"; + ERROR_CAUSE[-OB_ERR_DEFENSIVE_CHECK] = "Internal Error"; + ERROR_SOLUTION[-OB_ERR_DEFENSIVE_CHECK] = "Contact OceanBase Support"; + MYSQL_ERRNO[-OB_ERR_DEFENSIVE_CHECK] = -1; + SQLSTATE[-OB_ERR_DEFENSIVE_CHECK] = "HY000"; + STR_ERROR[-OB_ERR_DEFENSIVE_CHECK] = "fatal internal error"; + STR_USER_ERROR[-OB_ERR_DEFENSIVE_CHECK] = "fatal internal error in [%.*s]"; + ORACLE_ERRNO[-OB_ERR_DEFENSIVE_CHECK] = 600; + ORACLE_STR_ERROR[-OB_ERR_DEFENSIVE_CHECK] = "ORA-00600: internal error code, arguments: -4377, fatal internal error"; + ORACLE_STR_USER_ERROR[-OB_ERR_DEFENSIVE_CHECK] = "ORA-00600: internal error code, arguments: -4377, fatal internal error in [%.*s]"; + ERROR_NAME[-OB_CLUSTER_NAME_HASH_CONFLICT] = "OB_CLUSTER_NAME_HASH_CONFLICT"; + ERROR_CAUSE[-OB_CLUSTER_NAME_HASH_CONFLICT] = "Internal Error"; + ERROR_SOLUTION[-OB_CLUSTER_NAME_HASH_CONFLICT] = "Contact OceanBase Support"; + MYSQL_ERRNO[-OB_CLUSTER_NAME_HASH_CONFLICT] = -1; + SQLSTATE[-OB_CLUSTER_NAME_HASH_CONFLICT] = "HY000"; + STR_ERROR[-OB_CLUSTER_NAME_HASH_CONFLICT] = "cluster name conflict"; + STR_USER_ERROR[-OB_CLUSTER_NAME_HASH_CONFLICT] = "cluster name conflict"; + ORACLE_ERRNO[-OB_CLUSTER_NAME_HASH_CONFLICT] = 600; + ORACLE_STR_ERROR[-OB_CLUSTER_NAME_HASH_CONFLICT] = "ORA-00600: internal error code, arguments: -4378, cluster name conflict"; + ORACLE_STR_USER_ERROR[-OB_CLUSTER_NAME_HASH_CONFLICT] = "ORA-00600: internal error code, arguments: -4378, cluster name conflict"; ERROR_NAME[-OB_IMPORT_NOT_IN_SERVER] = "OB_IMPORT_NOT_IN_SERVER"; ERROR_CAUSE[-OB_IMPORT_NOT_IN_SERVER] = "Internal Error"; ERROR_SOLUTION[-OB_IMPORT_NOT_IN_SERVER] = "Contact OceanBase Support"; diff --git a/src/share/ob_errno.def b/src/share/ob_errno.def index e9792d125..e3d7044fb 100644 --- a/src/share/ob_errno.def +++ b/src/share/ob_errno.def @@ -421,6 +421,7 @@ DEFINE_ERROR(OB_OBCONFIG_CLUSTER_NOT_EXIST, -4368, -1, "HY000", "cluster not exi DEFINE_ORACLE_ERROR(OB_ERR_VALUE_LARGER_THAN_ALLOWED, -4374, -1, "HY000", "value larger than specified precision allowed for this column", 1438, "value larger than specified precision allowed for this column"); DEFINE_ERROR(OB_DISK_ERROR, -4375, -1, "HY000", "observer has disk error"); DEFINE_ORACLE_ERROR(OB_UNIMPLEMENTED_FEATURE, -4376, -1, "HY000", "unimplemented feature", 3001, "unimplemented feature"); +DEFINE_ERROR_EXT(OB_ERR_DEFENSIVE_CHECK, -4377, -1, "HY000", "fatal internal error", "fatal internal error in [%.*s]"); DEFINE_ERROR(OB_CLUSTER_NAME_HASH_CONFLICT, -4378, -1, "HY000", "cluster name conflict"); //////////////////////////////////////////////////////////////// diff --git a/src/share/ob_errno.h b/src/share/ob_errno.h index ad928925e..650e6336c 100644 --- a/src/share/ob_errno.h +++ b/src/share/ob_errno.h @@ -235,6 +235,8 @@ constexpr int OB_OBCONFIG_CLUSTER_NOT_EXIST = -4368; constexpr int OB_ERR_VALUE_LARGER_THAN_ALLOWED = -4374; constexpr int OB_DISK_ERROR = -4375; constexpr int OB_UNIMPLEMENTED_FEATURE = -4376; +constexpr int OB_ERR_DEFENSIVE_CHECK = -4377; +constexpr int OB_CLUSTER_NAME_HASH_CONFLICT = -4378; constexpr int OB_IMPORT_NOT_IN_SERVER = -4505; constexpr int OB_CONVERT_ERROR = -4507; constexpr int OB_BYPASS_TIMEOUT = -4510; @@ -1563,6 +1565,8 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219; #define OB_ERR_VALUE_LARGER_THAN_ALLOWED__USER_ERROR_MSG "value larger than specified precision allowed for this column" #define OB_DISK_ERROR__USER_ERROR_MSG "observer has disk error" #define OB_UNIMPLEMENTED_FEATURE__USER_ERROR_MSG "unimplemented feature" +#define OB_ERR_DEFENSIVE_CHECK__USER_ERROR_MSG "fatal internal error in [%.*s]" +#define OB_CLUSTER_NAME_HASH_CONFLICT__USER_ERROR_MSG "cluster name conflict" #define OB_IMPORT_NOT_IN_SERVER__USER_ERROR_MSG "Import not in service" #define OB_CONVERT_ERROR__USER_ERROR_MSG "Convert error" #define OB_BYPASS_TIMEOUT__USER_ERROR_MSG "Bypass timeout" @@ -2991,6 +2995,8 @@ constexpr int OB_ERR_INVALID_DATE_MSG_FMT_V2 = -4219; #define OB_ERR_VALUE_LARGER_THAN_ALLOWED__ORA_USER_ERROR_MSG "ORA-01438: value larger than specified precision allowed for this column" #define OB_DISK_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4375, observer has disk error" #define OB_UNIMPLEMENTED_FEATURE__ORA_USER_ERROR_MSG "ORA-03001: unimplemented feature" +#define OB_ERR_DEFENSIVE_CHECK__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4377, fatal internal error in [%.*s]" +#define OB_CLUSTER_NAME_HASH_CONFLICT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4378, cluster name conflict" #define OB_IMPORT_NOT_IN_SERVER__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4505, Import not in service" #define OB_CONVERT_ERROR__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4507, Convert error" #define OB_BYPASS_TIMEOUT__ORA_USER_ERROR_MSG "ORA-00600: internal error code, arguments: -4510, Bypass timeout" diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 6fc77047d..499d87c83 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -275,6 +275,10 @@ DEF_BOOL(_enable_static_typing_engine, OB_CLUSTER_PARAMETER, "True", "specifies whether static typing sql execution engine is activated", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +// https://yuque.antfin-inc.com/ob/product_functionality_review/zlp56c +DEF_BOOL(_enable_defensive_check, OB_CLUSTER_PARAMETER, "True", + "specifies whether allow to do some defensive checks when the query is executed", + ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); //// tenant config DEF_TIME_WITH_CHECKER(max_stale_time_for_weak_consistency, OB_TENANT_PARAMETER, "5s", common::ObConfigStaleTimeChecker, "[5s,)", diff --git a/src/share/schema/ob_table_dml_param.cpp b/src/share/schema/ob_table_dml_param.cpp index 80c8e72c0..bd205d10a 100644 --- a/src/share/schema/ob_table_dml_param.cpp +++ b/src/share/schema/ob_table_dml_param.cpp @@ -31,9 +31,14 @@ ObTableSchemaParam::ObTableSchemaParam(ObIAllocator& allocator) fulltext_col_id_(OB_INVALID_ID), index_name_(), columns_(allocator), + col_descs_(allocator), col_map_(allocator), + projector_(allocator), is_dropped_schema_(false), - pk_name_() + pk_name_(), + full_col_descs_(allocator), + full_col_map_(allocator), + full_projector_(allocator) {} ObTableSchemaParam::~ObTableSchemaParam() @@ -54,8 +59,12 @@ void ObTableSchemaParam::reset() index_name_.reset(); columns_.reset(); col_map_.clear(); + projector_.reset(); is_dropped_schema_ = false; pk_name_.reset(); + full_col_descs_.reset(); + full_col_map_.clear(); + full_projector_.reset(); } int ObTableSchemaParam::convert(const ObTableSchema* schema, const ObIArray& col_ids) @@ -63,7 +72,8 @@ int ObTableSchemaParam::convert(const ObTableSchema* schema, const ObIArray tmp_cols; - ObSEArray column_ids_no_virtual; + ObSEArray all_column_ids; + ObSEArray tmp_col_descs; if (OB_ISNULL(schema)) { ret = OB_INVALID_ARGUMENT; @@ -100,19 +110,19 @@ int ObTableSchemaParam::convert(const ObTableSchema* schema, const ObIArrayget_column_ids(column_ids_no_virtual, false))) { + if (OB_FAIL(schema->get_column_ids(all_column_ids, false))) { LOG_WARN("fail to get column ids", K(ret)); } - for (int32_t i = 0; OB_SUCC(ret) && i < column_ids_no_virtual.count(); ++i) { - const uint64_t column_id = column_ids_no_virtual.at(i).col_id_; - const ObColumnSchemaV2* column_schema = NULL; - ObColumnParam* column = NULL; + for (int32_t i = 0; OB_SUCC(ret) && i < all_column_ids.count(); ++i) { + const uint64_t column_id = all_column_ids.at(i).col_id_; + const ObColumnSchemaV2 *column_schema = NULL; + ObColumnParam *column = NULL; if (OB_ISNULL(column_schema = schema->get_column_schema(column_id))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("The column is NULL", K(schema->get_table_id()), K(column_id), K(i)); } else if (column_schema->is_rowid_pseudo_column() && !has_exist_in_array(col_ids, column_id)) { - // ignore rowid, because storage donot store rowid column + // ignore rowid, because storage do not store rowid column // eg: create table t1(c1 int); // delete from t1; // table dml param only need c1 @@ -123,8 +133,10 @@ int ObTableSchemaParam::convert(const ObTableSchema* schema, const ObIArray& column_ids) c return ret; } -int ObTableSchemaParam::get_index_name(common::ObString& index_name) const +int ObTableSchemaParam::get_rowkey_column_ids(ObIArray &column_ids) const +{ + int ret = OB_SUCCESS; + if (!is_valid()) { + ret = OB_NOT_INIT; + LOG_WARN("param not inited", K(ret), K(*this)); + } else { + const ObColumnParam *param = NULL; + for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_column_num_; ++i) { + if (OB_ISNULL(param = columns_.at(i))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column param is NULL", K(ret), K(i)); + } else if (OB_FAIL(column_ids.push_back(param->get_column_id()))) { + LOG_WARN("Fail to add rowkey column id to column_ids", K(ret)); + } + } + } + return ret; +} + +int ObTableSchemaParam::get_index_name(common::ObString &index_name) const { int ret = OB_SUCCESS; if (!is_index_table()) { @@ -258,7 +294,40 @@ bool ObTableSchemaParam::is_depend_column(uint64_t column_id) const return is_depend; } -int64_t ObTableSchemaParam::to_string(char* buf, const int64_t buf_len) const +int ObTableSchemaParam::prepare_full_column_param(const ObTableSchema &table_schema) +{ + int ret = OB_SUCCESS; + + static const int64_t COMMON_COLUMN_NUM = 16; + ObSEArray all_column_ids; + if (OB_FAIL(table_schema.get_column_ids(all_column_ids, !table_schema.is_index_table()))) { + LOG_WARN("get column ids from table schema failed", K(ret)); + } else if (OB_FAIL(full_col_descs_.assign(all_column_ids))) { + LOG_WARN("assign full column descs failed", K(ret), K(all_column_ids)); + } else if (OB_FAIL(full_col_map_.init(full_col_descs_))) { + LOG_WARN("init full column map failed", K(ret)); + } else { + full_projector_.set_capacity(full_col_descs_.count()); + projector_.set_capacity(columns_.count()); + } + for (int64_t i = 0; OB_SUCC(ret) && i < full_col_descs_.count(); ++i) { + if (OB_FAIL(full_projector_.push_back(i))) { + LOG_WARN("init full projector failed", K(ret)); + } + } + for (int64_t i = 0; OB_SUCC(ret) && i < columns_.count(); ++i) { + int idx = OB_INVALID_INDEX; + const ObColumnParam *col = columns_.at(i); + if (OB_FAIL(full_col_map_.get(col->get_column_id(), idx))) { + LOG_WARN("get column index from full column map failed", K(ret)); + } else if (OB_FAIL(projector_.push_back(idx))) { + LOG_WARN("store projector failed", K(ret)); + } + } + return ret; +} + +int64_t ObTableSchemaParam::to_string(char *buf, const int64_t buf_len) const { int64_t pos = 0; J_OBJ_START(); @@ -272,8 +341,10 @@ int64_t ObTableSchemaParam::to_string(char* buf, const int64_t buf_len) const K_(fulltext_col_id), K_(index_name), K_(pk_name), - "columns", - ObArrayWrap(0 == columns_.count() ? NULL : &columns_.at(0), columns_.count())); + K_(columns), + K_(projector), + K_(full_col_descs), + K_(full_projector)); J_OBJ_END(); return pos; } @@ -304,6 +375,10 @@ OB_DEF_SERIALIZE(ObTableSchemaParam) LOG_WARN("failed to serialize pk name", K(ret)); } } + OB_UNIS_ENCODE(col_descs_); + OB_UNIS_ENCODE(projector_); + OB_UNIS_ENCODE(full_col_descs_); + OB_UNIS_ENCODE(full_projector_); return ret; } @@ -343,6 +418,15 @@ OB_DEF_DESERIALIZE(ObTableSchemaParam) LOG_WARN("failed to copy pk name", K(ret), K(tmp_name)); } } + OB_UNIS_DECODE(col_descs_); + OB_UNIS_DECODE(projector_); + OB_UNIS_DECODE(full_col_descs_); + OB_UNIS_DECODE(full_projector_); + if (OB_SUCC(ret) && !full_col_descs_.empty()) { + if (OB_FAIL(full_col_map_.init(full_col_descs_))) { + LOG_WARN("init full col map failed", K(ret)); + } + } return ret; } @@ -372,6 +456,10 @@ OB_DEF_SERIALIZE_SIZE(ObTableSchemaParam) } LST_DO_CODE(OB_UNIS_ADD_LEN, is_dropped_schema_); len += pk_name_.get_serialize_size(); + OB_UNIS_ADD_LEN(col_descs_); + OB_UNIS_ADD_LEN(projector_); + OB_UNIS_ADD_LEN(full_col_descs_); + OB_UNIS_ADD_LEN(full_projector_); return len; } @@ -395,8 +483,6 @@ void ObTableDMLParam::reset() tenant_schema_version_ = OB_INVALID_VERSION; data_table_.reset(); index_tables_.reset(); - col_descs_.reset(); - col_map_.clear(); } int ObTableDMLParam::convert(const ObTableSchema* table_schema, @@ -608,10 +694,10 @@ int ObTableDMLParam::prepare_storage_param(const ObIArray& column_ids) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid column_ids", K(ret), K(col_cnt)); } else { - ObSEArray tmp_col_descs; - const ObColumnParam* col_param = nullptr; + const ObColumnParam *col_param = nullptr; uint64_t column_id = OB_INVALID_ID; ObColDesc col_desc; + col_descs_.set_capacity(col_cnt); for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { column_id = column_ids.at(i); if (nullptr == (col_param = data_table_.get_column(column_id))) { @@ -621,7 +707,7 @@ int ObTableDMLParam::prepare_storage_param(const ObIArray& column_ids) col_desc.col_id_ = column_id; col_desc.col_type_ = col_param->get_meta_type(); col_desc.col_order_ = col_param->get_column_order(); - if (OB_FAIL(tmp_col_descs.push_back(col_desc))) { + if (OB_FAIL(col_descs_.push_back(col_desc))) { LOG_WARN("fail to push back column description", K(ret), K(col_desc)); } } @@ -629,9 +715,7 @@ int ObTableDMLParam::prepare_storage_param(const ObIArray& column_ids) // assign if (OB_SUCC(ret)) { - if (OB_FAIL(col_descs_.assign(tmp_col_descs))) { - LOG_WARN("fail to assign column description array", K(ret)); - } else if (OB_FAIL(col_map_.init(col_descs_))) { + if (OB_FAIL(col_map_.init(col_descs_))) { LOG_WARN("fail to init column map", K(ret)); } } diff --git a/src/share/schema/ob_table_dml_param.h b/src/share/schema/ob_table_dml_param.h index 3c129b76d..efa1ba31f 100644 --- a/src/share/schema/ob_table_dml_param.h +++ b/src/share/schema/ob_table_dml_param.h @@ -25,13 +25,16 @@ class ObTableSchemaParam { OB_UNIS_VERSION_V(1); public: - typedef common::ObFixedArray RowKeys; - typedef common::ObFixedArray Columns; + typedef common::ObFixedArray RowKeys; + typedef common::ObFixedArray Columns; + typedef common::ObFixedArray Projector; + typedef common::ObFixedArray ObColDescArray; explicit ObTableSchemaParam(common::ObIAllocator& allocator); virtual ~ObTableSchemaParam(); void reset(); - int convert(const ObTableSchema* schema, const common::ObIArray& col_ids); + int convert(const ObTableSchema *schema, const common::ObIArray &col_ids); + int prepare_full_column_param(const ObTableSchema &table_schema); OB_INLINE bool is_valid() const { return common::OB_INVALID_ID != table_id_; @@ -80,6 +83,18 @@ public: { return columns_.count(); } + OB_INLINE const Columns &get_columns() const + { + return columns_; + } + OB_INLINE const ColumnMap &get_col_map() const + { + return col_map_; + } + OB_INLINE const ObColDescArray &get_col_descs() const + { + return col_descs_; + } OB_INLINE bool is_index_table() const { return ObTableSchema::is_index_table(table_type_); @@ -107,13 +122,31 @@ public: int is_rowkey_column(const uint64_t column_id, bool& is_rowkey) const; int is_column_nullable(const uint64_t column_id, bool& is_nullable) const; - const ObColumnParam* get_column(const uint64_t column_id) const; - const ObColumnParam* get_column_by_idx(const int64_t idx) const; - const ObColumnParam* get_rowkey_column_by_idx(const int64_t idx) const; - int get_rowkey_column_ids(common::ObIArray& column_ids) const; - int get_index_name(common::ObString& index_name) const; - const common::ObString& get_pk_name() const; + const ObColumnParam *get_column(const uint64_t column_id) const; + const ObColumnParam *get_column_by_idx(const int64_t idx) const; + const ObColumnParam *get_rowkey_column_by_idx(const int64_t idx) const; + int get_rowkey_column_ids(common::ObIArray &column_ids) const; + int get_rowkey_column_ids(common::ObIArray &column_ids) const; + int get_index_name(common::ObString &index_name) const; + const common::ObString &get_pk_name() const; bool is_depend_column(uint64_t column_id) const; + OB_INLINE const Projector &get_projector() const + { + return projector_; + } + OB_INLINE const ObColDescArray &get_full_col_descs() const + { + return full_col_descs_; + } + OB_INLINE const ColumnMap &get_full_col_map() const + { + return full_col_map_; + } + OB_INLINE const Projector &get_full_projector() const + { + return full_projector_; + } + DECLARE_TO_STRING; private: @@ -132,9 +165,18 @@ private: uint64_t fulltext_col_id_; common::ObString index_name_; Columns columns_; + // generated storage param from columns_ids_ in ObTableModify, for performance improvement + ObColDescArray col_descs_; ColumnMap col_map_; + // generated storage column projector from full column map, for performance improvement + Projector projector_; // all column projector without virtual column bool is_dropped_schema_; common::ObString pk_name_; // use for printing error msg in oracle mode + + // full column info, the purpose is that read operations in DML can use fuse row cache + ObColDescArray full_col_descs_; // all column descs without virtual column + ColumnMap full_col_map_; // all column map without virtual column + Projector full_projector_; // all column projector without virtual column }; class ObTableDMLParam { diff --git a/src/share/schema/ob_table_param.cpp b/src/share/schema/ob_table_param.cpp index 29f3cf8ad..d17da702f 100644 --- a/src/share/schema/ob_table_param.cpp +++ b/src/share/schema/ob_table_param.cpp @@ -376,6 +376,9 @@ void ObColumnParam::reset() orig_default_value_.reset(); cur_default_value_.reset(); is_nullable_ = false; + is_gen_col_ = false; + is_virtual_gen_col_ = false; + is_hidden_ = false; } int ObColumnParam::deep_copy_obj(const ObObj& src, ObObj& dest) @@ -408,8 +411,17 @@ OB_DEF_SERIALIZE(ObColumnParam) { int ret = OB_SUCCESS; - LST_DO_CODE( - OB_UNIS_ENCODE, column_id_, meta_type_, accuracy_, orig_default_value_, cur_default_value_, order_, is_nullable_); + LST_DO_CODE(OB_UNIS_ENCODE, + column_id_, + meta_type_, + accuracy_, + orig_default_value_, + cur_default_value_, + order_, + is_nullable_, + is_gen_col_, + is_virtual_gen_col_, + is_hidden_); return ret; } @@ -431,6 +443,9 @@ OB_DEF_DESERIALIZE(ObColumnParam) is_nullable_ = false; } } + OB_UNIS_DECODE(is_gen_col_); + OB_UNIS_DECODE(is_virtual_gen_col_); + OB_UNIS_DECODE(is_hidden_); if (OB_SUCC(ret)) { if (OB_FAIL(deep_copy_obj(orig_default_value, orig_default_value_))) { @@ -454,7 +469,10 @@ OB_DEF_SERIALIZE_SIZE(ObColumnParam) orig_default_value_, cur_default_value_, order_, - is_nullable_); + is_nullable_, + is_gen_col_, + is_virtual_gen_col_, + is_hidden_); return len; } @@ -467,6 +485,9 @@ int ObColumnParam::assign(const ObColumnParam& other) order_ = other.order_; accuracy_ = other.accuracy_; is_nullable_ = other.is_nullable_; + is_gen_col_ = other.is_gen_col_; + is_virtual_gen_col_ = other.is_virtual_gen_col_; + is_hidden_ = other.is_hidden_; if (OB_FAIL(deep_copy_obj(other.cur_default_value_, cur_default_value_))) { LOG_WARN("Fail to deep copy cur_default_value, ", K(ret), K(cur_default_value_)); } else if (OB_FAIL(deep_copy_obj(other.orig_default_value_, orig_default_value_))) { @@ -1659,64 +1680,7 @@ int ObTableParam::convert_join_mv_rparam( return ret; } -int ObTableParam::convert_schema_param( - const share::schema::ObTableSchemaParam& schema_param, const common::ObIArray& output_column_ids) -{ - int ret = OB_SUCCESS; - if (!schema_param.is_valid() || 0 == output_column_ids.count()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(schema_param), K(output_column_ids)); - } else if (schema_param.get_rowkey_column_num() != output_column_ids.count()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("output column ids should be row key columns", K(ret), K(schema_param), K(output_column_ids)); - } else { - const int64_t COMMON_COLUMN_NUM = 16; - const ObColumnParam* src_col = NULL; - ObSEArray tmp_cols; - ObSEArray tmp_projector; - ObSEArray tmp_output_projector; - - table_id_ = schema_param.get_table_id(); - schema_version_ = schema_param.get_schema_version(); - main_table_rowkey_cnt_ = schema_param.get_rowkey_column_num(); - for (int32_t i = 0; OB_SUCC(ret) && i < schema_param.get_rowkey_column_num(); ++i) { - ObColumnParam* dst_col = nullptr; - if (NULL == (src_col = schema_param.get_rowkey_column_by_idx(i))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("The column param is NULL", K(ret), K(i)); - } else if (src_col->get_column_id() != output_column_ids.at(i)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("row key column id not match", K(ret), K(i), K(*src_col), K(output_column_ids)); - } else if (OB_FAIL(alloc_column(allocator_, dst_col))) { - LOG_WARN("alloc column failed", K(ret), K(i)); - } else if (OB_FAIL(dst_col->assign(*src_col))) { - LOG_WARN("assign column failed", K(ret), K(i)); - } else if (OB_FAIL(tmp_cols.push_back(dst_col))) { - LOG_WARN("push back column failed", K(ret), K(i)); - } else if (OB_FAIL(tmp_projector.push_back(i))) { - LOG_WARN("push back projector failed", K(ret), K(i)); - } else if (OB_FAIL(tmp_output_projector.push_back(i))) { - LOG_WARN("push back output projector failed", K(ret), K(i)); - } - } - - // assign - if (OB_SUCC(ret)) { - if (OB_FAIL(cols_.assign(tmp_cols))) { - LOG_WARN("assign failed", K(ret)); - } else if (OB_FAIL(projector_.assign(tmp_projector))) { - LOG_WARN("assign failed", K(ret)); - } else if (OB_FAIL(output_projector_.assign(tmp_output_projector))) { - LOG_WARN("assign failed", K(ret)); - } else if (OB_FAIL(create_column_map(cols_, col_map_))) { - LOG_WARN("failed to create column map", K(ret)); - } - } - } - return ret; -} - -int ObTableParam::alloc_column(ObIAllocator& allocator, ObColumnParam*& col_ptr) +int ObTableParam::alloc_column(ObIAllocator &allocator, ObColumnParam *&col_ptr) { int ret = OB_SUCCESS; void* tmp_ptr = nullptr; @@ -1751,6 +1715,9 @@ int ObTableParam::convert_column_schema_to_param(const ObColumnSchemaV2& column_ column_param.set_column_order(column_schema.get_order_in_rowkey()); column_param.set_accuracy(column_schema.get_accuracy()); column_param.set_nullable(column_schema.is_nullable()); + column_param.set_gen_col_flag(column_schema.is_generated_column(), column_schema.is_virtual_generated_column()); + column_param.set_is_hidden(column_schema.is_hidden()); + LOG_DEBUG("convert_column_schema_to_param", K(column_schema), K(column_param), K(lbt())); if (column_schema.is_generated_column() || OB_HIDDEN_LOGICAL_ROWID_COLUMN_ID == column_schema.get_column_id()) { ObObj nop_obj; nop_obj.set_nop_value(); diff --git a/src/share/schema/ob_table_param.h b/src/share/schema/ob_table_param.h index 777b61c80..f5043bef6 100644 --- a/src/share/schema/ob_table_param.h +++ b/src/share/schema/ob_table_param.h @@ -234,10 +234,31 @@ public: { is_nullable_ = nullable; } - int assign(const ObColumnParam& other); + inline void set_gen_col_flag(const bool is_gen_col, const bool is_virtual) + { + is_gen_col_ = is_gen_col; + is_virtual_gen_col_ = is_virtual; + } + inline bool is_gen_col() const + { + return is_gen_col_; + } + inline bool is_virtual_gen_col() const + { + return is_virtual_gen_col_; + } + inline void set_is_hidden(const bool is_hidden) + { + is_hidden_ = is_hidden; + } + inline bool is_hidden() const + { + return is_hidden_; + } + int assign(const ObColumnParam &other); TO_STRING_KV(K_(column_id), K_(meta_type), K_(order), K_(accuracy), K_(orig_default_value), K_(cur_default_value), - K_(is_nullable)); + K_(is_nullable), K_(is_gen_col), K_(is_virtual_gen_col), K_(is_hidden)); private: int deep_copy_obj(const common::ObObj& src, common::ObObj& dest); @@ -251,6 +272,41 @@ private: common::ObObj orig_default_value_; common::ObObj cur_default_value_; bool is_nullable_; + bool is_gen_col_; + bool is_virtual_gen_col_; + bool is_hidden_; +}; + +class ObVerticalPartitionParam { + OB_UNIS_VERSION_V(1); + +public: + ObVerticalPartitionParam(); + virtual ~ObVerticalPartitionParam(); + void reset(); + +public: + DECLARE_TO_STRING; + inline uint64_t get_table_id() const + { + return table_id_; + } + inline void set_table_id(uint64_t table_id) + { + table_id_ = table_id; + } + inline int64_t get_schema_version() const + { + return schema_version_; + } + inline void set_schema_version(int64_t version) + { + schema_version_ = version; + } + +private: + uint64_t table_id_; + int64_t schema_version_; }; class ObTableSchema; @@ -282,11 +338,6 @@ public: int convert_join_mv_rparam(const ObTableSchema& mv_schema, const ObTableSchema& right_schema, const common::ObIArray& mv_column_ids); - // convert from table schema param which is used in ObTableModify operators - // used to get conflict row only by row keys - int convert_schema_param( - const share::schema::ObTableSchemaParam& schema_param, const common::ObIArray& output_column_ids); - inline uint64_t get_table_id() const { return table_id_; diff --git a/src/sql/engine/dml/ob_multi_part_update.cpp b/src/sql/engine/dml/ob_multi_part_update.cpp index 938318666..e420bd1d1 100644 --- a/src/sql/engine/dml/ob_multi_part_update.cpp +++ b/src/sql/engine/dml/ob_multi_part_update.cpp @@ -170,11 +170,12 @@ int ObMultiPartUpdate::shuffle_update_row(ObExecContext& ctx, bool& got_row) con } while (OB_SUCC(ret) && OB_SUCC(inner_get_next_row(ctx, full_row))) { for (int64_t k = 0; OB_SUCC(ret) && k < table_dml_infos_.count(); ++k) { - const ObTableDMLInfo& table_dml_info = table_dml_infos_.at(k); - ObTableDMLCtx& table_dml_ctx = update_ctx->table_dml_ctxs_.at(k); - const ObArrayWrap& global_index_dml_infos = table_dml_info.index_infos_; - ObArrayWrap& global_index_dml_ctxs = table_dml_ctx.index_ctxs_; - const ObTableModify* sub_update = global_index_dml_infos.at(0).dml_subplans_.at(UPDATE_OP).subplan_root_; + const ObTableDMLInfo &table_dml_info = table_dml_infos_.at(k); + ObTableDMLCtx &table_dml_ctx = update_ctx->table_dml_ctxs_.at(k); + const ObArrayWrap &global_index_dml_infos = table_dml_info.index_infos_; + ObArrayWrap &global_index_dml_ctxs = table_dml_ctx.index_ctxs_; + const ObTableUpdate *sub_update = + static_cast(global_index_dml_infos.at(0).dml_subplans_.at(UPDATE_OP).subplan_root_); bool is_updated = false; bool is_filtered = false; common::ObNewRow old_row; @@ -220,9 +221,7 @@ int ObMultiPartUpdate::shuffle_update_row(ObExecContext& ctx, bool& got_row) con continue; } } - if (OB_SUCC(ret)) { - OZ(check_row_null(ctx, new_row, sub_update->get_column_infos()), new_row); - } + OZ(check_row_null(ctx, new_row, sub_update->get_column_infos(), sub_update->get_updated_column_infos()), new_row); OZ(check_updated_value(*update_ctx, table_dml_info.assign_columns_, old_row, new_row, is_updated)); if (OB_SUCC(ret) && OB_INVALID_INDEX != stmt_id_idx_) { OZ(merge_implicit_cursor( diff --git a/src/sql/engine/dml/ob_multi_part_update_op.cpp b/src/sql/engine/dml/ob_multi_part_update_op.cpp index 431528570..5f39daf75 100644 --- a/src/sql/engine/dml/ob_multi_part_update_op.cpp +++ b/src/sql/engine/dml/ob_multi_part_update_op.cpp @@ -156,7 +156,7 @@ int ObMultiPartUpdateOp::shuffle_update_row(bool& got_row) continue; } } - OZ(check_row_null(table_dml_info.assign_columns_.new_row_, sub_update->column_infos_)); + OZ(check_row_null(table_dml_info.assign_columns_.new_row_, sub_update->column_infos_, sub_update->updated_column_infos_)); OZ(check_updated_value(*this, table_dml_info.assign_columns_.get_assign_columns(), table_dml_info.assign_columns_.old_row_, diff --git a/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.cpp b/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.cpp index 31e7ff531..44353e7a4 100644 --- a/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.cpp +++ b/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.cpp @@ -163,12 +163,15 @@ int ObTableConflictRowFetcherOp::inner_open() { int ret = OB_SUCCESS; ObDMLBaseParam dml_param; - dml_param.output_exprs_ = &MY_SPEC.access_exprs_; - dml_param.op_ = this; - dml_param.op_filters_ = NULL; - dml_param.row2exprs_projector_ = &row2exprs_projector_; OZ(ObTableModify::init_dml_param_se(ctx_, MY_SPEC.index_tid_, MY_SPEC.only_data_table_, NULL, dml_param)); OZ(fetch_conflict_rows(dml_param)); + OZ(gen_cols_.init(MY_SPEC.access_exprs_.count())); + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.access_exprs_.count(); i++) { + ObExpr *expr = MY_SPEC.access_exprs_.at(i); + if (expr->eval_func_ != nullptr) { + OZ(gen_cols_.push_back(expr)); + } + } // for end LOG_DEBUG("open conflict row fetcher"); return ret; } @@ -218,22 +221,34 @@ int ObTableConflictRowFetcherOp::inner_get_next_row() LOG_WARN("invalid argument", K(ret), KP(dup_row)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.access_exprs_.count(); i++) { - const ObObj& cell = dup_row->cells_[i]; - ObDatum& datum = MY_SPEC.access_exprs_.at(i)->locate_datum_for_write(eval_ctx_); - ObExpr* expr = MY_SPEC.access_exprs_.at(i); - if (cell.is_null()) { + const ObObj &cell = dup_row->cells_[i]; + ObDatum &datum = MY_SPEC.access_exprs_.at(i)->locate_datum_for_write(eval_ctx_); + ObExpr *expr = MY_SPEC.access_exprs_.at(i); + if (OB_UNLIKELY(expr->eval_func_ != nullptr)) { + //generated column, do nothing + } else if (OB_UNLIKELY(cell.is_null())) { datum.set_null(); - } else if (cell.get_type() != expr->datum_meta_.type_) { + expr->get_eval_info(eval_ctx_).evaluated_ = true; + } else if (OB_UNLIKELY(cell.get_type() != expr->datum_meta_.type_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("type mismatch", K(ret), K(i), K(cell.get_type()), K(*expr)); } else if (OB_FAIL(datum.from_obj(cell, expr->obj_datum_map_))) { LOG_WARN("convert obj to datum failed", K(ret)); } else { - MY_SPEC.access_exprs_.at(i)->get_eval_info(eval_ctx_).evaluated_ = true; + expr->get_eval_info(eval_ctx_).evaluated_ = true; + } + } // for end + //calc generated column expr + for (int64_t i = 0; OB_SUCC(ret) && i < gen_cols_.count(); ++i) { + ObExpr *expr = gen_cols_.at(i); + ObDatum *datum = nullptr; + expr->get_eval_info(eval_ctx_).clear_evaluated_flag(); + if (OB_FAIL(expr->eval(eval_ctx_, datum))) { + LOG_WARN("eval generated column failed", K(ret)); } - } // for end + } if (OB_SUCC(ret)) { - LOG_DEBUG("fetch dup row", "row", ROWEXPR2STR(eval_ctx_, MY_SPEC.access_exprs_), K(*dup_row)); + LOG_DEBUG("fetch dup row", "row", ROWEXPR2STR(eval_ctx_, MY_SPEC.access_exprs_), KPC(dup_row)); } } } diff --git a/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.h b/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.h index 4e0f6b3e1..cf4fafb40 100644 --- a/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.h +++ b/src/sql/engine/dml/ob_table_conflict_row_fetcher_op.h @@ -116,12 +116,13 @@ private: class ObTableConflictRowFetcherOp : public ObOperator { public: - ObTableConflictRowFetcherOp(ObExecContext& ctx, const ObOpSpec& spec, ObOpInput* input) - : ObOperator(ctx, spec, input), - dup_row_iter_arr_(), - row2exprs_projector_(ctx.get_allocator()), - cur_row_idx_(0), - cur_rowkey_id_(0) + ObTableConflictRowFetcherOp(ObExecContext &ctx, const ObOpSpec &spec, ObOpInput *input) + : ObOperator(ctx, spec, input), + dup_row_iter_arr_(), + row2exprs_projector_(ctx.get_allocator()), + gen_cols_(ctx.get_allocator()), + cur_row_idx_(0), + cur_rowkey_id_(0) {} virtual int inner_open() override; @@ -140,6 +141,7 @@ private: common::ObSEArray dup_row_iter_arr_; storage::ObRow2ExprsProjector row2exprs_projector_; + ObFixedArray gen_cols_; //mark to save generated column exprs int64_t cur_row_idx_; int64_t cur_rowkey_id_; }; diff --git a/src/sql/engine/dml/ob_table_delete_returning_op.cpp b/src/sql/engine/dml/ob_table_delete_returning_op.cpp index c8e1da095..2eff3a072 100644 --- a/src/sql/engine/dml/ob_table_delete_returning_op.cpp +++ b/src/sql/engine/dml/ob_table_delete_returning_op.cpp @@ -74,7 +74,7 @@ int ObTableDeleteReturningOp::get_next_row() } } if (OB_SUCC(ret)) { - real_delete_row.cells_ = delete_row_ceils_; + real_delete_row.count_ = MY_SPEC.column_ids_.count(); real_delete_row.count_ = child_row_count_; if (OB_FAIL(partition_service_->delete_row(my_session->get_trans_desc(), dml_param_, diff --git a/src/sql/engine/dml/ob_table_modify.cpp b/src/sql/engine/dml/ob_table_modify.cpp index e85e4c96a..69e5b1813 100644 --- a/src/sql/engine/dml/ob_table_modify.cpp +++ b/src/sql/engine/dml/ob_table_modify.cpp @@ -1088,6 +1088,53 @@ int ObTableModify::validate_row(ObExprCtx& expr_ctx, ObCastCtx& column_conv_ctx, return ret; } +int ObTableModify::check_row_null(ObExecContext &ctx, const ObNewRow &calc_row, + const ObIArray &column_infos, const ObIArray &update_col_infos) const +{ + int ret = OB_SUCCESS; + if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2220) { + //兼容oracle, 如果有instead of trigger,不检查NOT NULL约束 + OV(calc_row.get_count() == column_infos.count(), OB_ERR_UNEXPECTED, calc_row, column_infos); + for (int i = 0; OB_SUCC(ret) && i < update_col_infos.count(); i++) { + int64_t col_idx = update_col_infos.at(i).projector_index_; + bool is_nullable = column_infos.at(col_idx).is_nullable_; + bool is_cell_null = calc_row.get_cell(col_idx).is_null() || + (lib::is_oracle_mode() && calc_row.get_cell(col_idx).is_null_oracle()); + if (!is_nullable && is_cell_null) { + if (is_ignore_) { + if (is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dml with ignore not supported in oracle mode"); + } else if (OB_FAIL(ObObjCaster::get_zero_value(column_infos.at(col_idx).column_type_, + column_infos.at(col_idx).coll_type_, + const_cast(calc_row.get_cell(col_idx))))) { + LOG_WARN("get column default zero value failed", K(ret), K(column_infos.at(col_idx))); + } else { + // output warning msg + ObString column_name = column_infos.at(col_idx).column_name_; + ObSQLUtils::copy_and_convert_string_charset(ctx.get_allocator(), + column_name, + column_name, + CS_TYPE_UTF8MB4_BIN, + ctx.get_my_session()->get_local_collation_connection()); + LOG_USER_WARN(OB_BAD_NULL_ERROR, column_name.length(), column_name.ptr()); + } + } else { + ObString column_name = column_infos.at(col_idx).column_name_; + ObSQLUtils::copy_and_convert_string_charset(ctx.get_allocator(), + column_name, + column_name, + CS_TYPE_UTF8MB4_BIN, + ctx.get_my_session()->get_local_collation_connection()); + ret = OB_BAD_NULL_ERROR; + LOG_USER_ERROR(OB_BAD_NULL_ERROR, column_name.length(), column_name.ptr()); + } + } + } + } + return ret; +} + int ObTableModify::check_row_null( ObExecContext& ctx, const ObNewRow& calc_row, const ObIArray& column_infos) const { diff --git a/src/sql/engine/dml/ob_table_modify.h b/src/sql/engine/dml/ob_table_modify.h index b04f8d43c..72d5c9ad4 100644 --- a/src/sql/engine/dml/ob_table_modify.h +++ b/src/sql/engine/dml/ob_table_modify.h @@ -760,11 +760,14 @@ protected: int validate_row(common::ObExprCtx& expr_ctx, common::ObCastCtx& column_conv_ctx, common::ObNewRow& calc_row, bool check_normal_column, bool check_virtual_column) const; int check_row_null( - ObExecContext& ctx, const common::ObNewRow& calc_row, const common::ObIArray& column_infos) const; - int set_autoinc_param_pkey(ObExecContext& ctx, const common::ObPartitionKey& pkey) const; - int get_part_location(ObExecContext& ctx, const ObPhyTableLocation& table_location, - const share::ObPartitionReplicaLocation*& out) const; - int get_part_location(ObExecContext& ctx, common::ObIArray& part_keys) const; + ObExecContext &ctx, const common::ObNewRow &calc_row, const common::ObIArray &column_infos) const; + int check_row_null(ObExecContext &ctx, const common::ObNewRow &calc_row, + const common::ObIArray &column_infos, + const common::ObIArray &update_col_infos) const; + int set_autoinc_param_pkey(ObExecContext &ctx, const common::ObPartitionKey &pkey) const; + int get_part_location(ObExecContext &ctx, const ObPhyTableLocation &table_location, + const share::ObPartitionReplicaLocation *&out) const; + int get_part_location(ObExecContext &ctx, common::ObIArray &part_keys) const; // for checking the rowkey whether null, the head of the row must be rowkey int check_rowkey_is_null(const ObNewRow& row, int64_t rowkey_cnt, bool& is_null) const; int get_gi_task(ObExecContext& ctx) const; diff --git a/src/sql/engine/dml/ob_table_modify_op.cpp b/src/sql/engine/dml/ob_table_modify_op.cpp index 85e395c9b..1c9fbc986 100644 --- a/src/sql/engine/dml/ob_table_modify_op.cpp +++ b/src/sql/engine/dml/ob_table_modify_op.cpp @@ -658,7 +658,59 @@ int ObTableModifyOp::calc_part_id(const ObExpr* calc_part_id_expr, ObIArray& column_infos) const +int ObTableModifyOp::check_row_null(const ObExprPtrIArray &row, + const ObIArray &column_infos, + const ObIArray &update_col_infos) const +{ + int ret = OB_SUCCESS; + if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2220) { + //兼容oracle, 如果有instead of trigger,不检查NOT NULL约束 + OV (row.count() == column_infos.count(), OB_ERR_UNEXPECTED, row, column_infos); + for (int i = 0; OB_SUCC(ret) && i < update_col_infos.count(); i++) { + int64_t col_idx = update_col_infos.at(i).projector_index_; + ObDatum *datum = NULL; + const bool is_nullable = column_infos.at(col_idx).is_nullable_; + if (OB_FAIL(row.at(col_idx)->eval(eval_ctx_, datum))) { + const ObTableInsertOp *insert_op = dynamic_cast(this); + const ObTableUpdateOp *update_op = dynamic_cast(this); + if (nullptr != insert_op) { + //compatible with old code + log_user_error_inner(ret, col_idx, insert_op->curr_row_num_ + 1, MY_SPEC.column_infos_); + } else if (nullptr != update_op) { + //compatible with old code + log_user_error_inner(ret, col_idx, update_op->get_found_rows() + 1, MY_SPEC.column_infos_); + } + } else if (!is_nullable && datum->is_null()) { + if (MY_SPEC.is_ignore_) { + ObObj zero_obj; + if (is_oracle_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("dml with ignore not supported in oracle mode"); + } else if (OB_FAIL(ObObjCaster::get_zero_value( + column_infos.at(col_idx).column_type_, + column_infos.at(col_idx).coll_type_, + zero_obj))) { + LOG_WARN("get column default zero value failed", K(ret), K(column_infos.at(col_idx))); + } else if (OB_FAIL(datum->from_obj(zero_obj))) { + LOG_WARN("assign zero obj to datum failed", K(ret), K(zero_obj)); + } else { + //output warning msg + const ObString &column_name = column_infos.at(col_idx).column_name_; + LOG_USER_WARN(OB_BAD_NULL_ERROR, column_name.length(), column_name.ptr()); + } + } else { + const ObString &column_name = column_infos.at(col_idx).column_name_; + ret = OB_BAD_NULL_ERROR; + LOG_USER_ERROR(OB_BAD_NULL_ERROR, column_name.length(), column_name.ptr()); + } + } + } + } + return ret; +} + +int ObTableModifyOp::check_row_null(const ObExprPtrIArray &row, + const ObIArray &column_infos) const { int ret = OB_SUCCESS; if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2220) { diff --git a/src/sql/engine/dml/ob_table_modify_op.h b/src/sql/engine/dml/ob_table_modify_op.h index 630027ca5..b92992e45 100644 --- a/src/sql/engine/dml/ob_table_modify_op.h +++ b/src/sql/engine/dml/ob_table_modify_op.h @@ -375,10 +375,15 @@ protected: int mark_lock_row_flag(int64_t flag); - int check_row_null(const ObExprPtrIArray& row, const common::ObIArray& column_infos) const; - int set_autoinc_param_pkey(const common::ObPartitionKey& pkey) const; - int get_part_location(const ObPhyTableLocation& table_location, const share::ObPartitionReplicaLocation*& out); - int get_part_location(common::ObIArray& part_keys); + int check_row_null(const ObExprPtrIArray &row, + const common::ObIArray &column_infos, + const common::ObIArray &update_col_infos) const; + int check_row_null(const ObExprPtrIArray &row, + const common::ObIArray &column_infos) const; + int set_autoinc_param_pkey(const common::ObPartitionKey &pkey) const; + int get_part_location(const ObPhyTableLocation &table_location, + const share::ObPartitionReplicaLocation *&out); + int get_part_location(common::ObIArray &part_keys); int get_gi_task(); // filtered if filter return false value. (move from ObPhyOperator). diff --git a/src/sql/engine/dml/ob_table_update.cpp b/src/sql/engine/dml/ob_table_update.cpp index d1911cca8..20034d61c 100644 --- a/src/sql/engine/dml/ob_table_update.cpp +++ b/src/sql/engine/dml/ob_table_update.cpp @@ -263,9 +263,9 @@ int ObTableUpdate::get_next_row(ObExecContext& ctx, const ObNewRow*& row) const } } if (OB_SUCC(ret) && !from_multi_table_dml()) { - ObNewRow& old_row = update_ctx->old_row_; - ObNewRow& new_row = update_ctx->new_row_; - OZ(check_row_null(ctx, new_row, column_infos_), new_row); + ObNewRow &old_row = update_ctx->old_row_; + ObNewRow &new_row = update_ctx->new_row_; + OZ(check_row_null(ctx, new_row, column_infos_, updated_column_infos_), new_row); } } if (OB_SUCC(ret)) { diff --git a/src/sql/engine/dml/ob_table_update.h b/src/sql/engine/dml/ob_table_update.h index 58ad37a5e..7cf3149d5 100644 --- a/src/sql/engine/dml/ob_table_update.h +++ b/src/sql/engine/dml/ob_table_update.h @@ -147,6 +147,8 @@ public: { return updated_column_infos_; } + const common::ObIArray &get_updated_column_infos() const + { return updated_column_infos_; } const common::ObIArray* get_updated_column_ids() const { diff --git a/src/sql/engine/dml/ob_table_update_op.cpp b/src/sql/engine/dml/ob_table_update_op.cpp index 2b6d0c5e4..9cf7fa402 100644 --- a/src/sql/engine/dml/ob_table_update_op.cpp +++ b/src/sql/engine/dml/ob_table_update_op.cpp @@ -200,10 +200,7 @@ int ObTableUpdateOp::prepare_next_storage_row(const ObExprPtrIArray*& output) while (OB_SUCC(ret) && !need_update_ && OB_SUCC(inner_get_next_row())) { if (OB_SUCC(ret)) { if (!MY_SPEC.from_multi_table_dml()) { - // TODO : process trigger - // OZ (TriggerHandle::init_param_rows(*this, *update_ctx, old_row, new_row), old_row, new_row); - // OZ (TriggerHandle::do_handle_before_row(*this, *update_ctx, &new_row), old_row, new_row); - OZ(check_row_null(MY_SPEC.new_row_, MY_SPEC.column_infos_)); + OZ(check_row_null(MY_SPEC.new_row_, MY_SPEC.column_infos_, MY_SPEC.updated_column_infos_)); if (MY_SPEC.need_filter_null_row_) { bool is_null = false; if (OB_FAIL(check_rowkey_is_null(MY_SPEC.old_row_, MY_SPEC.primary_key_ids_.count(), is_null))) { diff --git a/src/storage/memtable/ob_memtable_row_reader.cpp b/src/storage/memtable/ob_memtable_row_reader.cpp index ecc4f1fc2..eb6c4d51f 100644 --- a/src/storage/memtable/ob_memtable_row_reader.cpp +++ b/src/storage/memtable/ob_memtable_row_reader.cpp @@ -765,8 +765,8 @@ void ObMemtableIterRowReader::reset() } } -int ObMemtableIterRowReader::init(common::ObArenaAllocator* allocator, const share::schema::ColumnMap* cols_map, - ObNopBitMap* bitmap, const storage::ObColDescArray& columns) +int ObMemtableIterRowReader::init(common::ObIAllocator *allocator, const share::schema::ColumnMap *cols_map, + ObNopBitMap *bitmap, const storage::ObColDescArray &columns) { int ret = OB_SUCCESS; if (IS_INIT) { diff --git a/src/storage/memtable/ob_memtable_row_reader.h b/src/storage/memtable/ob_memtable_row_reader.h index ff9281507..42c57c51d 100644 --- a/src/storage/memtable/ob_memtable_row_reader.h +++ b/src/storage/memtable/ob_memtable_row_reader.h @@ -60,10 +60,10 @@ class ObMemtableIterRowReader { public: ObMemtableIterRowReader(); ~ObMemtableIterRowReader(); - int init(common::ObArenaAllocator* allocator, const share::schema::ColumnMap* cols_map, ObNopBitMap* bitmap, - const storage::ObColDescArray& columns); - int get_memtable_row(storage::ObStoreRow& row); - int set_buf(const char* buf, int64_t buf_size); + int init(common::ObIAllocator *allocator, const share::schema::ColumnMap *cols_map, ObNopBitMap *bitmap, + const storage::ObColDescArray &columns); + int get_memtable_row(storage::ObStoreRow &row); + int set_buf(const char *buf, int64_t buf_size); void reset(); void destory(); bool is_iter_end(); diff --git a/src/storage/ob_dml_param.cpp b/src/storage/ob_dml_param.cpp index 683ce6185..3bbb9bca7 100644 --- a/src/storage/ob_dml_param.cpp +++ b/src/storage/ob_dml_param.cpp @@ -15,8 +15,7 @@ #include "lib/container/ob_iarray.h" #include "share/ob_errno.h" #include "share/schema/ob_schema_struct.h" -#include "share/schema/ob_table_param.h" -#include "sql/engine/expr/ob_expr.h" +#include "share/schema/ob_table_dml_param.h" namespace oceanbase { namespace storage { @@ -187,5 +186,28 @@ DEF_TO_STRING(ObRow2ExprsProjector::Item) return pos; } +DEF_TO_STRING(ObDMLBaseParam) +{ + int64_t pos = 0; + J_OBJ_START(); + J_KV(N_TIMEOUT, + timeout_, + N_SCHEMA_VERSION, + schema_version_, + N_SCAN_FLAG, + query_flag_, + N_SQL_MODE, + sql_mode_, + N_IS_TOTAL_QUANTITY_LOG, + is_total_quantity_log_, + K_(only_data_table), + KPC_(table_param), + K_(tenant_schema_version), + K_(is_ignore), + K_(duplicated_rows), + K_(prelock)); + J_OBJ_END(); + return pos; +} } // namespace storage } // namespace oceanbase diff --git a/src/storage/ob_dml_param.h b/src/storage/ob_dml_param.h index 5020bd849..2173600f4 100644 --- a/src/storage/ob_dml_param.h +++ b/src/storage/ob_dml_param.h @@ -193,48 +193,41 @@ struct ObDMLBaseParam { schema_version_(-1), query_flag_(), sql_mode_(DEFAULT_OCEANBASE_MODE), - is_total_quantity_log_(false), tz_info_(NULL), - only_data_table_(false), table_param_(NULL), tenant_schema_version_(OB_INVALID_VERSION), + is_total_quantity_log_(false), + only_data_table_(false), is_ignore_(false), - duplicated_rows_(0), prelock_(false), - output_exprs_(NULL), - op_(NULL), - op_filters_(NULL), - row2exprs_projector_(NULL) + duplicated_rows_(0), + dml_allocator_(nullptr) + { + query_flag_.read_latest_ = common::ObQueryFlag::OBSF_MASK_READ_LATEST; + } + ~ObDMLBaseParam() {} int64_t timeout_; int64_t schema_version_; common::ObQueryFlag query_flag_; ObSQLMode sql_mode_; - bool is_total_quantity_log_; - const common::ObTimeZoneInfo* tz_info_; - bool only_data_table_; + const common::ObTimeZoneInfo *tz_info_; common::ObColumnExprArray virtual_columns_; common::ObExprCtx expr_ctx_; const share::schema::ObTableDMLParam* table_param_; int64_t tenant_schema_version_; + bool is_total_quantity_log_; + bool only_data_table_; bool is_ignore_; - mutable int64_t duplicated_rows_; bool prelock_; - - // output for sql static typing engine, NULL for old sql engine scan. - const sql::ObExprPtrIArray* output_exprs_; - sql::ObOperator* op_; - const sql::ObExprPtrIArray* op_filters_; - ObRow2ExprsProjector* row2exprs_projector_; - + mutable int64_t duplicated_rows_; + common::ObIAllocator *dml_allocator_; bool is_valid() const { return (timeout_ > 0 && schema_version_ >= 0); } - TO_STRING_KV(N_TIMEOUT, timeout_, N_SCHEMA_VERSION, schema_version_, N_SCAN_FLAG, query_flag_, N_SQL_MODE, sql_mode_, - N_IS_TOTAL_QUANTITY_LOG, is_total_quantity_log_, K_(only_data_table), KP_(table_param), K_(tenant_schema_version), - K_(is_ignore), K_(duplicated_rows), K_(prelock)); + DECLARE_TO_STRING; }; } // end namespace storage diff --git a/src/storage/ob_handle_mgr.h b/src/storage/ob_handle_mgr.h index 45fdfcbde..51ac3e783 100644 --- a/src/storage/ob_handle_mgr.h +++ b/src/storage/ob_handle_mgr.h @@ -41,7 +41,7 @@ public: } is_inited_ = false; } - int init(const bool is_multi, const bool is_ordered, common::ObArenaAllocator& allocator) + int init(const bool is_multi, const bool is_ordered, common::ObIAllocator &allocator) { int ret = common::OB_SUCCESS; void* buf = NULL; diff --git a/src/storage/ob_i_store.cpp b/src/storage/ob_i_store.cpp index aa62304f7..56beb1b50 100644 --- a/src/storage/ob_i_store.cpp +++ b/src/storage/ob_i_store.cpp @@ -560,12 +560,14 @@ int ObTableAccessParam::init(const uint64_t table_id, const int64_t schema_versi iter_param_.full_out_cols_ = nullptr != full_out_cols_ && full_out_cols_->count() > 0 ? full_out_cols_ : nullptr; if (NULL != table_param) { - iter_param_.cols_id_map_ = &table_param->get_column_map(); iter_param_.full_cols_id_map_ = &table_param->get_full_column_map(); - iter_param_.out_cols_project_ = &table_param->get_output_projector(); + iter_param_.full_out_cols_ = &table_param->get_full_col_descs(); iter_param_.full_projector_ = &table_param->get_full_projector(); iter_param_.out_cols_param_ = &table_param->get_columns(); iter_param_.full_out_cols_param_ = &table_param->get_full_columns(); + iter_param_.cols_id_map_ = &table_param->get_column_map(); + iter_param_.out_cols_project_ = &table_param->get_output_projector(); + iter_param_.projector_ = &table_param->get_projector(); out_cols_param_ = &table_param->get_columns(); enable_fast_skip_ = false; if (is_mv_right_table) { @@ -577,25 +579,46 @@ int ObTableAccessParam::init(const uint64_t table_id, const int64_t schema_versi return ret; } -int ObTableAccessParam::init_basic_param(const uint64_t table_id, const int64_t schema_version, - const int64_t rowkey_column_num, const ObIArray& column_ids, - const ObIArray* out_cols_index) +int ObTableAccessParam::init_dml_access_param(const uint64_t table_id, const int64_t schema_version, + const int64_t rowkey_column_num, share::schema::ObTableParam &table_param) { int ret = OB_SUCCESS; - if (OB_FAIL(out_col_desc_param_.init(nullptr))) { - LOG_WARN("init out cols fail", K(ret)); - } else if (OB_FAIL(out_col_desc_param_.assign(column_ids))) { - LOG_WARN("assign out cols fail", K(ret), K(column_ids)); - } else { - enable_fast_skip_ = false; - iter_param_.reset(); - iter_param_.table_id_ = table_id; - iter_param_.schema_version_ = schema_version; - iter_param_.rowkey_cnt_ = rowkey_column_num; - iter_param_.out_cols_project_ = out_cols_index; - iter_param_.out_cols_ = &out_col_desc_param_.get_col_descs(); - iter_param_.full_out_cols_ = nullptr; - } + iter_param_.table_id_ = table_id; + iter_param_.schema_version_ = schema_version; + iter_param_.rowkey_cnt_ = rowkey_column_num; + iter_param_.full_cols_id_map_ = &table_param.get_full_column_map(); + iter_param_.full_out_cols_ = &table_param.get_full_col_descs(); + iter_param_.full_projector_ = &table_param.get_full_projector(); + iter_param_.cols_id_map_ = &table_param.get_column_map(); + iter_param_.out_cols_param_ = &table_param.get_columns(); + iter_param_.out_cols_ = &table_param.get_col_descs(); + iter_param_.out_cols_project_ = &table_param.get_output_projector(); + iter_param_.projector_ = &table_param.get_projector(); + out_cols_param_ = &table_param.get_columns(); + full_out_cols_ = &table_param.get_full_col_descs(); + OZ(out_col_desc_param_.init(&table_param.get_col_descs())); + return ret; +} + +int ObTableAccessParam::init_dml_access_param(const uint64_t table_id, const int64_t schema_version, + const int64_t rowkey_column_num, const share::schema::ObTableSchemaParam &schema_param, + const ObIArray *out_cols_project) +{ + int ret = OB_SUCCESS; + iter_param_.table_id_ = table_id; + iter_param_.schema_version_ = schema_version; + iter_param_.rowkey_cnt_ = rowkey_column_num; + iter_param_.full_cols_id_map_ = &schema_param.get_full_col_map(); + iter_param_.full_out_cols_ = &schema_param.get_full_col_descs(); + iter_param_.full_projector_ = &schema_param.get_full_projector(); + iter_param_.cols_id_map_ = &schema_param.get_col_map(); + iter_param_.out_cols_param_ = &schema_param.get_columns(); + iter_param_.out_cols_ = &schema_param.get_col_descs(); + iter_param_.out_cols_project_ = out_cols_project; + iter_param_.projector_ = &schema_param.get_projector(); + out_cols_param_ = &schema_param.get_columns(); + full_out_cols_ = &schema_param.get_full_col_descs(); + OZ(out_col_desc_param_.init(&schema_param.get_col_descs())); return ret; } @@ -1027,9 +1050,9 @@ int ObTableAccessContext::init(ObTableScanParam& scan_param, const ObStoreCtx& c return ret; } -int ObTableAccessContext::init(const common::ObQueryFlag& query_flag, const ObStoreCtx& ctx, - ObArenaAllocator& allocator, ObArenaAllocator& stmt_allocator, blocksstable::ObBlockCacheWorkingSet& block_cache_ws, - const ObVersionRange& trans_version_range) +int ObTableAccessContext::init(const common::ObQueryFlag &query_flag, const ObStoreCtx &ctx, ObIAllocator &allocator, + ObIAllocator &stmt_allocator, blocksstable::ObBlockCacheWorkingSet &block_cache_ws, + const ObVersionRange &trans_version_range) { int ret = OB_SUCCESS; if (is_inited_) { @@ -1048,8 +1071,8 @@ int ObTableAccessContext::init(const common::ObQueryFlag& query_flag, const ObSt } return ret; } -int ObTableAccessContext::init(const common::ObQueryFlag& query_flag, const ObStoreCtx& ctx, - common::ObArenaAllocator& allocator, const common::ObVersionRange& trans_version_range) +int ObTableAccessContext::init(const common::ObQueryFlag &query_flag, const ObStoreCtx &ctx, + common::ObIAllocator &allocator, const common::ObVersionRange &trans_version_range) { int ret = OB_SUCCESS; diff --git a/src/storage/ob_i_store.h b/src/storage/ob_i_store.h index e13283f74..2904a87f9 100644 --- a/src/storage/ob_i_store.h +++ b/src/storage/ob_i_store.h @@ -1216,8 +1216,10 @@ public: const common::ObIArray& column_ids, const bool is_multi_version_merge = false, share::schema::ObTableParam* table_param = NULL, const bool is_mv_right_table = false); // used for get unique index conflict row - int init_basic_param(const uint64_t table_id, const int64_t schema_version, const int64_t rowkey_column_num, - const common::ObIArray& column_ids, const common::ObIArray* out_cols_index); + int init_dml_access_param(const uint64_t table_id, const int64_t schema_version, const int64_t rowkey_column_num, + share::schema::ObTableParam &table_param); + int init_dml_access_param(const uint64_t table_id, const int64_t schema_version, const int64_t rowkey_column_num, + const share::schema::ObTableSchemaParam &schema_param, const common::ObIArray *out_cols_project); // used for index back when query int init_index_back(ObTableScanParam& scan_param); // init need_fill_scale_ and search column which need fill scale @@ -1355,12 +1357,12 @@ struct ObTableAccessContext { const common::ObVersionRange& trans_version_range, const ObIStoreRowFilter* row_filter, const bool is_index_back = false); // used for merge - int init(const common::ObQueryFlag& query_flag, const ObStoreCtx& ctx, common::ObArenaAllocator& allocator, - common::ObArenaAllocator& stmt_allocator, blocksstable::ObBlockCacheWorkingSet& block_cache_ws, - const common::ObVersionRange& trans_version_range); + int init(const common::ObQueryFlag &query_flag, const ObStoreCtx &ctx, common::ObIAllocator &allocator, + common::ObIAllocator &stmt_allocator, blocksstable::ObBlockCacheWorkingSet &block_cache_ws, + const common::ObVersionRange &trans_version_range); // used for exist or simple scan - int init(const common::ObQueryFlag& query_flag, const ObStoreCtx& ctx, common::ObArenaAllocator& allocator, - const common::ObVersionRange& trans_version_range); + int init(const common::ObQueryFlag &query_flag, const ObStoreCtx &ctx, common::ObIAllocator &allocator, + const common::ObVersionRange &trans_version_range); TO_STRING_KV(K_(is_inited), K_(timeout), K_(pkey), K_(query_flag), K_(sql_mode), KP_(store_ctx), KP_(expr_ctx), KP_(limit_param), KP_(stmt_allocator), KP_(allocator), KP_(table_scan_stat), KP_(block_cache_ws), K_(out_cnt), K_(is_end), K_(trans_version_range), KP_(row_filter), K_(merge_log_ts), @@ -1379,9 +1381,9 @@ public: common::ObExprCtx* expr_ctx_; common::ObLimitParam* limit_param_; // sql statement level allocator, available before sql execute finish - common::ObArenaAllocator* stmt_allocator_; + common::ObIAllocator *stmt_allocator_; // storage scan/rescan interface level allocator, will be reclaimed in every scan/rescan call - common::ObArenaAllocator* allocator_; + common::ObIAllocator *allocator_; lib::MemoryContext stmt_mem_; // sql statement level memory entity, only for query lib::MemoryContext scan_mem_; // scan/rescan level memory entity, only for query common::ObTableScanStatistic* table_scan_stat_; diff --git a/src/storage/ob_partition_service.cpp b/src/storage/ob_partition_service.cpp index 71a2034ab..57b00ac1c 100644 --- a/src/storage/ob_partition_service.cpp +++ b/src/storage/ob_partition_service.cpp @@ -3784,6 +3784,18 @@ int ObPartitionService::delete_rows(const transaction::ObTransDesc& trans_desc, } else if (OB_FAIL(check_query_allowed(pkey, trans_desc, ctx_guard, guard))) { STORAGE_LOG(WARN, "fail to check query allowed", K(ret)); } else { + //@NOTICE:(yuchen.wyc)为了规避外键自引用带来的防御检查不过的问题: + //由于目前delete语句的TableScan操作使用的快照点是语句级,无法看到本语句的最新修改, + //因此,对于外键自引用的级联删除时,同一行可能会被自己和外键级联操作多次删除, + //这个问题目前没有出现语义上的问题,但会导致delete的防御检查报错,详见: + // https://aone.alibaba-inc.com/issue/36022956 + // DML的防御检查读使用的默认快照点是可以读到本语最新修改,因此对于该场景会出现两次读到的数据不一致的问题 + //正确的做法是从外键的删除操作上规避掉多次删除同一行的情况,这要求delete的TableScan能够看到自己的最新修改 + //由于担心这个修改的影响较大,3.2暂时从防御检查上规避掉这个问题, + // 4.0上delete改为读本语句的最新修改来避免同一行的多次删除 + if (trans_desc.get_cur_stmt_desc().is_delete_stmt()) { + const_cast(dml_param).query_flag_.read_latest_ = 0; + } ctx_guard.get_store_ctx().trans_id_ = trans_desc.get_trans_id(); ret = guard.get_partition_group()->delete_rows( ctx_guard.get_store_ctx(), dml_param, column_ids, row_iter, affected_rows); @@ -3805,6 +3817,18 @@ int ObPartitionService::delete_row(const ObTransDesc& trans_desc, const ObDMLBas } else if (OB_FAIL(check_query_allowed(pkey, trans_desc, ctx_guard, guard))) { STORAGE_LOG(WARN, "fail to check query allowed", K(ret)); } else { + //@NOTICE:(yuchen.wyc)为了规避外键自引用带来的防御检查不过的问题: + //由于目前delete语句的TableScan操作使用的快照点是语句级,无法看到本语句的最新修改, + //因此,对于外键自引用的级联删除时,同一行可能会被自己和外键级联操作多次删除, + //这个问题目前没有出现语义上的问题,但会导致delete的防御检查报错,详见: + // https://aone.alibaba-inc.com/issue/36022956 + // DML的防御检查读使用的默认快照点是可以读到本语最新修改,因此对于该场景会出现两次读到的数据不一致的问题 + //正确的做法是从外键的删除操作上规避掉多次删除同一行的情况,这要求delete的TableScan能够看到自己的最新修改 + //由于担心这个修改的影响较大,3.2暂时从防御检查上规避掉这个问题, + // 4.0上delete改为读本语句的最新修改来避免同一行的多次删除 + if (trans_desc.get_cur_stmt_desc().is_delete_stmt()) { + const_cast(dml_param).query_flag_.read_latest_ = 0; + } ctx_guard.get_store_ctx().trans_id_ = trans_desc.get_trans_id(); ret = guard.get_partition_group()->delete_row(ctx_guard.get_store_ctx(), dml_param, column_ids, row); AUDIT_PARTITION_V2(ctx_guard.get_store_ctx().mem_ctx_, PART_AUDIT_DELETE_ROW, 1); diff --git a/src/storage/ob_partition_storage.cpp b/src/storage/ob_partition_storage.cpp index f72bdb77c..943a3d6e2 100644 --- a/src/storage/ob_partition_storage.cpp +++ b/src/storage/ob_partition_storage.cpp @@ -647,6 +647,8 @@ int ObPartitionStorage::delete_row(ObDMLRunningCtx& run_ctx, RowReshape*& row_re if (OB_FAIL(reshape_delete_row(run_ctx, row_reshape, tbl_row, tbl_row))) { LOG_WARN("failed to reshape row", K(ret), K(tbl_row)); + } else if (GCONF.enable_defensive_check() && OB_FAIL(check_old_row_legitimacy(run_ctx, row))) { + LOG_WARN("check old row legitimacy failed", K(row)); } else if (!dml_param.is_total_quantity_log_) { if (OB_FAIL(write_row(run_ctx.relative_tables_.data_table_, ctx, rowkey_size, *run_ctx.col_descs_, tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { @@ -698,6 +700,9 @@ int ObPartitionStorage::delete_row(ObDMLRunningCtx& run_ctx, RowReshape*& row_re null_idx_val, &run_ctx.idx_col_descs_))) { STORAGE_LOG(WARN, "failed to generate index row", K(ret)); + } else if (GCONF.enable_defensive_check() && + OB_FAIL(check_delete_index_legitimacy(run_ctx, relative_table, run_ctx.idx_row_->row_val_))) { + LOG_WARN("check delete index legitimacy failed", K(ret)); } else if (OB_FAIL(write_index_row(relative_table, ctx, run_ctx.idx_col_descs_, *run_ctx.idx_row_))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { STORAGE_LOG(WARN, @@ -1746,167 +1751,107 @@ int ObPartitionStorage::fetch_conflict_rows(const ObStoreCtx& ctx, const ObDMLBa return ret; } -int ObPartitionStorage::multi_get_rows(const ObStoreCtx& store_ctx, const ObTableAccessParam& access_param, - ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const GetRowkeyArray& rowkeys, - ObNewRowIterator*& duplicated_rows, int64_t data_table_rowkey_cnt) +int ObPartitionStorage::single_get_row(ObSingleRowGetter &row_getter, + const ObStoreRowkey &rowkey, + ObNewRowIterator *&duplicated_rows, + int64_t data_table_rowkey_cnt) { int ret = OB_SUCCESS; - const ObTablesHandle& tables_handle = relative_table.tables_handle_; - ObMultipleGetMerge* get_merge = NULL; - void* buf = NULL; - - // look up the row - if (OB_UNLIKELY(!is_inited_) || OB_ISNULL(access_ctx.allocator_)) { - ret = OB_NOT_INIT; - STORAGE_LOG(WARN, "partition storage is not initialized", K(ret), KP(access_ctx.allocator_)); - } else if (NULL == (buf = access_ctx.allocator_->alloc(sizeof(ObMultipleGetMerge)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "Fail to allocate memory for multi get merge ", K(ret)); - } else { - { - ObStorageWriterGuard guard(store_, store_ctx, false); - if (OB_FAIL(guard.refresh_and_protect_table(relative_table))) { - STORAGE_LOG(WARN, "fail to protect table", K(ret), K(pkey_)); - } - } - get_merge = new (buf) ObMultipleGetMerge(); - - ObGetTableParam get_table_param; - ObStoreRow* row = NULL; - get_table_param.tables_handle_ = &tables_handle; - if (OB_FAIL(ret)) { - // do nothing - } else if (OB_FAIL(get_merge->init(access_param, access_ctx, get_table_param))) { - STORAGE_LOG(WARN, "Fail to init ObSingleMerge, ", K(ret)); - } else if (OB_FAIL(get_merge->open(rowkeys))) { - STORAGE_LOG(WARN, "Fail to open iter, ", K(ret)); + ObNewRow *row = nullptr; + if (OB_FAIL(row_getter.open(rowkey))) { + LOG_WARN("init single row getter failed", K(ret)); + } else if (OB_FAIL(row_getter.get_next_row(row))) { + if (OB_ITER_END != ret) { + LOG_WARN("get next row from single row getter failed", K(ret)); + } else { + ret = OB_SUCCESS; } - while (OB_SUCC(ret)) { - if (OB_FAIL(get_merge->get_next_row(row))) { - if (OB_ITER_END != ret) { - STORAGE_LOG(WARN, "failed to get next row", K(ret)); - } - } else if (ObActionFlag::OP_ROW_EXIST == row->flag_) { - // store the conflict rowkey - if (NULL == duplicated_rows) { - ObValueRowIterator* dup_iter = NULL; - if (NULL == (dup_iter = ObQueryIteratorFactory::get_insert_dup_iter())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(ERROR, "no memory to alloc ObValueRowIterator", K(ret)); - } else { - duplicated_rows = dup_iter; - if (OB_FAIL(dup_iter->init(true, data_table_rowkey_cnt))) { - STORAGE_LOG(WARN, "failed to initialize ObValueRowIterator", K(ret)); - } - } - } - if (OB_SUCC(ret)) { - ObValueRowIterator* dup_iter = static_cast(duplicated_rows); - if (OB_NOT_NULL(access_param.output_exprs_)) { - // static engine need project datum to obj row - if (OB_ISNULL(access_param.op_)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(access_param.op_)); - } else { - access_param.op_->clear_evaluated_flag(); - } - for (int64_t i = 0; OB_SUCC(ret) && i < access_param.output_exprs_->count(); i++) { - ObDatum* datum = NULL; - const sql::ObExpr* e = access_param.output_exprs_->at(i); - if (OB_ISNULL(e)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("expr is NULL", K(ret)); - } else if (OB_FAIL(e->eval(access_param.op_->get_eval_ctx(), datum))) { - LOG_WARN("evaluate expression failed", K(ret)); - } else if (OB_FAIL(datum->to_obj(row->row_val_.cells_[i], e->obj_meta_, e->obj_datum_map_))) { - LOG_WARN("convert datum to obj failed", K(ret)); - } - } - row->row_val_.count_ = access_param.output_exprs_->count(); - LOG_DEBUG("get conflict row", K_(row->row_val)); - } - if (OB_FAIL(ret)) { - - } else if (OB_FAIL(dup_iter->add_row(row->row_val_))) { - STORAGE_LOG(WARN, "failed to store conflict row", K(*row)); - } else { - LOG_DEBUG("get conflict row", K_(row->row_val)); - } - } + } else if (NULL == duplicated_rows) { + ObValueRowIterator *dup_iter = NULL; + if (OB_ISNULL(dup_iter = ObQueryIteratorFactory::get_insert_dup_iter())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "no memory to alloc ObValueRowIterator", K(ret)); + } else { + duplicated_rows = dup_iter; + if (OB_FAIL(dup_iter->init(true, data_table_rowkey_cnt))) { + STORAGE_LOG(WARN, "failed to initialize ObValueRowIterator", K(ret)); } } - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; - } - } - // revert iterator - if (NULL != get_merge) { - get_merge->~ObMultipleGetMerge(); - get_merge = NULL; } - if (OB_FAIL(ret) && duplicated_rows != NULL) { - ObQueryIteratorFactory::free_insert_dup_iter(duplicated_rows); - duplicated_rows = NULL; + if (OB_SUCC(ret) && row != nullptr) { + ObValueRowIterator *dup_iter = static_cast(duplicated_rows); + if (OB_FAIL(dup_iter->add_row(*row))) { + STORAGE_LOG(WARN, "failed to store conflict row", K(*row)); + } else { + LOG_DEBUG("get conflict row", KPC(row)); + } } return ret; } -int ObPartitionStorage::get_index_conflict_row(ObDMLRunningCtx& run_ctx, const ObTableAccessParam& table_access_param, - ObTableAccessContext& table_access_ctx, ObRelativeTable& relative_table, bool need_index_back, const ObNewRow& row, - ObNewRowIterator*& duplicated_rows) +int ObPartitionStorage::get_index_conflict_row(ObDMLRunningCtx &run_ctx, const ObIArray &out_col_ids, + ObRelativeTable &relative_table, bool need_index_back, const ObNewRow &row, ObNewRowIterator *&duplicated_rows) { int ret = OB_SUCCESS; - ObStoreRow* idx_row = run_ctx.idx_row_; - ObNewRowIterator* dup_rowkey_iter = nullptr; - const ColumnMap* col_map = run_ctx.col_map_; + ObStoreRow *idx_row = run_ctx.idx_row_; + ObNewRowIterator *dup_rowkey_iter = nullptr; bool null_idx_val = false; - ObColDescArray uk_out_descs; - ObColDescArray pk_out_descs; - ObSEArray uk_out_idxs; - ObStoreRowkey index_rowkey; - ObNewRowIterator*& tmp_rowkey_iter = need_index_back ? dup_rowkey_iter : duplicated_rows; - if (OB_FAIL(run_ctx.relative_tables_.data_table_.get_rowkey_column_ids(pk_out_descs))) { + ObSEArray pk_out_ids; + ObArenaAllocator scan_allocator(ObModIds::OB_TABLE_SCAN_ITER); + ObIAllocator *allocator = + run_ctx.dml_param_.dml_allocator_ != nullptr ? run_ctx.dml_param_.dml_allocator_ : &scan_allocator; + ObSingleRowGetter index_row_getter(*allocator, store_); + if (OB_FAIL(run_ctx.relative_tables_.data_table_.get_rowkey_column_ids(pk_out_ids))) { LOG_WARN("get rowkey column ids failed", K(ret)); } else if (OB_FAIL(relative_table.build_index_row( - row, *col_map, false, idx_row->row_val_, null_idx_val, &uk_out_descs))) { + row, *run_ctx.col_map_, false, idx_row->row_val_, null_idx_val, nullptr))) { STORAGE_LOG(WARN, "failed to generate index row", K(ret)); - } else if (OB_FAIL(get_column_index(pk_out_descs, uk_out_descs, uk_out_idxs))) { - STORAGE_LOG(WARN, - "failed to get output column index", - K(ret), - K(*run_ctx.relative_tables_.data_table_.get_schema_param()), - K(*relative_table.get_schema_param())); + } else if (OB_FAIL(index_row_getter.init_dml_access_ctx(run_ctx.store_ctx_, run_ctx.dml_param_))) { + LOG_WARN("init dml access ctx failed", K(ret)); + } else if (OB_FAIL(index_row_getter.init_dml_access_param(relative_table, run_ctx.dml_param_, pk_out_ids))) { + LOG_WARN("init basic index param failed", K(ret)); } else { - GetRowkeyArray rowkeys; - ObTableAccessParam index_access_param; + ObTableAccessContext &index_access_ctx = index_row_getter.get_access_ctx(); + int64_t dt_rowkey_cnt = run_ctx.relative_tables_.data_table_.get_rowkey_column_num(); if (relative_table.is_storage_index_table()) { - table_access_ctx.query_flag_.index_invalid_ = !relative_table.can_read_index(); + index_access_ctx.query_flag_.index_invalid_ = !relative_table.can_read_index(); } else { - table_access_ctx.query_flag_.index_invalid_ = false; + index_access_ctx.query_flag_.index_invalid_ = false; } + ObStoreRowkey index_rowkey; index_rowkey.assign(idx_row->row_val_.cells_, relative_table.get_rowkey_column_num()); - if (OB_FAIL(index_access_param.init_basic_param(relative_table.get_table_id(), - relative_table.get_schema_version(), - relative_table.get_rowkey_column_num(), - uk_out_descs, - &uk_out_idxs))) { - LOG_WARN("init basic param failed", K(ret)); - } else if (OB_FAIL(get_conflict_row( - run_ctx, index_access_param, table_access_ctx, relative_table, index_rowkey, tmp_rowkey_iter))) { - LOG_WARN("get conflict row failed", K(relative_table), K(index_rowkey), K(pk_out_descs)); - } - if (OB_SUCC(ret) && OB_UNLIKELY(need_index_back) && OB_UNLIKELY(tmp_rowkey_iter != NULL)) { - int64_t data_table_rowkey_cnt = run_ctx.relative_tables_.data_table_.get_rowkey_column_num(); - OZ(convert_row_to_rowkey(*dup_rowkey_iter, rowkeys)); - OZ(multi_get_rows(run_ctx.store_ctx_, - table_access_param, - table_access_ctx, - run_ctx.relative_tables_.data_table_, - rowkeys, - duplicated_rows, - data_table_rowkey_cnt)); + if (OB_LIKELY(!need_index_back)) { + if (OB_FAIL(single_get_row(index_row_getter, index_rowkey, duplicated_rows, dt_rowkey_cnt))) { + LOG_WARN("single get index row failed", K(ret)); + } + } else { + ObStoreRowkey table_rowkey; + ObSingleRowGetter data_row_getter(*allocator, store_); + if (OB_FAIL(index_row_getter.open(index_rowkey))) { + LOG_WARN("get index row failed", K(ret), K(index_rowkey)); + } else if (OB_FAIL(convert_row_to_rowkey(index_row_getter, table_rowkey))) { + if (OB_ITER_END != ret) { + LOG_WARN("convert row to rowkey failed", K(ret)); + } else { + ret = OB_SUCCESS; + } + } else if (OB_FAIL(data_row_getter.init_dml_access_ctx(run_ctx.store_ctx_, run_ctx.dml_param_))) { + LOG_WARN("init dml access ctx failed", K(ret)); + } else if (OB_FAIL(data_row_getter.init_dml_access_param( + run_ctx.relative_tables_.data_table_, run_ctx.dml_param_, out_col_ids))) { + LOG_WARN("init data table access param failed", K(ret)); + } else if (OB_FAIL(single_get_row(data_row_getter, table_rowkey, duplicated_rows, dt_rowkey_cnt))) { + LOG_WARN("single get index row failed", K(ret), K(index_rowkey)); + } } } + LOG_DEBUG("get index conflict row", + K(ret), + K(need_index_back), + K(out_col_ids), + K(relative_table), + K(row), + KPC(duplicated_rows)); if (dup_rowkey_iter != NULL) { ObQueryIteratorFactory::free_insert_dup_iter(dup_rowkey_iter); dup_rowkey_iter = NULL; @@ -1914,98 +1859,378 @@ int ObPartitionStorage::get_index_conflict_row(ObDMLRunningCtx& run_ctx, const O return ret; } -int ObPartitionStorage::get_conflict_row(ObDMLRunningCtx& run_ctx, const ObTableAccessParam& access_param, - ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const ObStoreRowkey& rowkey, - ObNewRowIterator*& duplicated_rows) +int ObPartitionStorage::get_conflict_row(ObDMLRunningCtx &run_ctx, const ObIArray &out_col_ids, + ObRelativeTable &relative_table, const ObStoreRowkey &rowkey, ObNewRowIterator *&duplicated_rows) { int ret = OB_SUCCESS; - ObExtStoreRowkey ext_rowkey(rowkey); - GetRowkeyArray rowkeys; + ObArenaAllocator scan_allocator(ObModIds::OB_TABLE_SCAN_ITER); + ObIAllocator *allocator = + run_ctx.dml_param_.dml_allocator_ != nullptr ? run_ctx.dml_param_.dml_allocator_ : &scan_allocator; int64_t data_table_rowkey_cnt = run_ctx.relative_tables_.data_table_.get_rowkey_column_num(); - OZ(rowkeys.push_back(ext_rowkey)); - OZ(multi_get_rows(run_ctx.store_ctx_, access_param, access_ctx, - relative_table, rowkeys, duplicated_rows, data_table_rowkey_cnt)); - LOG_DEBUG("get conflict row", K(ret), K(rowkey), K(relative_table), K(access_param), K(data_table_rowkey_cnt)); + ObSingleRowGetter single_row_getter(*allocator, store_); + if (OB_FAIL(single_row_getter.init_dml_access_ctx(run_ctx.store_ctx_, run_ctx.dml_param_))) { + LOG_WARN("init dml access ctx failed", K(ret)); + } else if (OB_FAIL(single_row_getter.init_dml_access_param(relative_table, run_ctx.dml_param_, out_col_ids))) { + LOG_WARN("init dml access param failed", K(ret)); + } else if (OB_FAIL(single_get_row(single_row_getter, rowkey, duplicated_rows, data_table_rowkey_cnt))) { + LOG_WARN("single get row failed", K(ret)); + } return ret; } -int ObPartitionStorage::convert_row_to_rowkey(ObNewRowIterator& rowkey_iter, GetRowkeyArray& rowkeys) +int ObPartitionStorage::convert_row_to_rowkey(ObSingleRowGetter &index_row_getter, ObStoreRowkey &rowkey) { int ret = OB_SUCCESS; - ObNewRow* rows = NULL; - int64_t row_count = 0; - do { - if (OB_FAIL(rowkey_iter.get_next_rows(rows, row_count))) { - if (OB_ITER_END != ret) { - STORAGE_LOG(WARN, "get next row from row iterator failed", K(ret)); - } - } - for (int64_t i = 0; OB_SUCC(ret) && i < row_count; ++i) { - ObExtStoreRowkey ext_rowkey(ObStoreRowkey(rows[i].cells_, rows[i].count_)); - LOG_DEBUG("convert row to rowkey", K(ext_rowkey), K(i), K(row_count)); - if (OB_FAIL(rowkeys.push_back(ext_rowkey))) { - LOG_WARN("store rowkey failed", K(ret), K(ext_rowkey), K(i), K(row_count)); - } + ObNewRow *row = nullptr; + if (OB_FAIL(index_row_getter.get_next_row(row))) { + if (OB_ITER_END != ret) { + LOG_WARN("get next row from index row getter failed", K(ret)); } - } while (OB_SUCC(ret)); - if (OB_ITER_END == ret) { - ret = OB_SUCCESS; + } else { + rowkey.assign(row->cells_, row->count_); } return ret; } -// primary key of index is short, so get primary key of main table by iteration -// if memtable supports output of any column(not including complete primary key is ok) -// then not necessary to provide index array for merge -int ObPartitionStorage::get_column_index( - const ObColDescIArray& tbl_col_desc, const ObColDescIArray& idx_col_desc, common::ObIArray& col_idx_array) +int ObPartitionStorage::check_old_row_legitimacy(ObDMLRunningCtx &run_ctx, const ObNewRow &old_row) { int ret = OB_SUCCESS; - if (&tbl_col_desc == &idx_col_desc) { - for (int32_t i = 0; OB_SUCC(ret) && i < tbl_col_desc.count(); ++i) { - if (OB_FAIL(col_idx_array.push_back(i))) { - STORAGE_LOG(WARN, "failed to choose output column idx", K(i), K(ret)); + + ObRelativeTable &data_table = run_ctx.relative_tables_.data_table_; + ObStoreRowkey rowkey; + rowkey.assign(old_row.cells_, data_table.get_rowkey_column_num()); + if (OB_UNLIKELY(rowkey.get_obj_cnt() > old_row.count_) || OB_ISNULL(run_ctx.column_ids_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("old row is invalid", K(ret), K(old_row), K(rowkey.get_obj_cnt()), KP(run_ctx.column_ids_)); + } else if (data_table.is_index_table() && !data_table.can_read_index()) { + // index can not be read during building index, so does not check old index row + } else { + // the vertical partition is no longer maintained, + // and the defense check skips the vertical partition function + ObDMLBaseParam &dml_param = const_cast(run_ctx.dml_param_); + ObArenaAllocator scan_allocator(ObModIds::OB_TABLE_SCAN_ITER); + ObIAllocator *allocator = + run_ctx.dml_param_.dml_allocator_ != nullptr ? run_ctx.dml_param_.dml_allocator_ : &scan_allocator; + ObSingleRowGetter old_row_getter(*allocator, store_); + ObNewRow *storage_old_row = nullptr; + const ObIArray &column_ids = *run_ctx.column_ids_; + if (OB_FAIL(old_row_getter.init_dml_access_ctx(run_ctx.store_ctx_, dml_param))) { + LOG_WARN("init dml access ctx failed", K(ret)); + } else if (OB_FAIL(old_row_getter.init_dml_access_param(data_table, dml_param, column_ids))) { + LOG_WARN("init dml access param failed", K(ret)); + } else if (OB_FAIL(old_row_getter.open(rowkey, true))) { + LOG_WARN("open old row getter failed", K(ret), K(rowkey)); + } else if (OB_FAIL(old_row_getter.get_next_row(storage_old_row))) { + if (OB_ITER_END == ret) { + ret = OB_ERR_DEFENSIVE_CHECK; + LOG_WARN("old row in storage is not exists", K(ret)); + } else { + LOG_WARN("get next row from old_row_iter failed", K(ret), KPC(run_ctx.column_ids_), K(old_row)); + } + } else if (storage_old_row->get_count() != old_row.get_count()) { + ret = OB_ERR_DEFENSIVE_CHECK; + LOG_WARN("storage old row is not matched with sql old row", K(ret)); + } + for (int64_t i = 0; OB_SUCC(ret) && i < old_row.get_count(); ++i) { + const ObObj &storage_val = storage_old_row->get_cell(i); + const ObObj &sql_val = old_row.get_cell(i); + int cmp = 0; + if (OB_UNLIKELY(OB_HIDDEN_LOGICAL_ROWID_COLUMN_ID == column_ids.at(i))) { + // skip check logical rowid, + } else if (OB_UNLIKELY(ObLongTextType == storage_val.get_type() && sql_val.is_lob_locator())) { + // skip check lob column type, + } else if (OB_UNLIKELY(storage_val.is_nop_value())) { + bool is_nop = false; + if (OB_FAIL(data_table.is_nop_default_value(column_ids.at(i), is_nop))) { + LOG_WARN("check column whether has nop default value failed", K(ret), K(column_ids.at(i))); + } else if (!is_nop) { + ret = OB_ERR_DEFENSIVE_CHECK; + LOG_WARN("storage old row is not matched with sql old row", + K(ret), + K(i), + K(column_ids.at(i)), + K(storage_val), + K(sql_val)); + } + } else if (OB_FAIL(storage_val.compare(sql_val, cmp)) || 0 != cmp) { + LOG_WARN("storage_val is not equal with sql_val, maybe catch a bug", + K(ret), + K(storage_val), + K(sql_val), + K(cmp), + K(column_ids.at(i))); + ret = OB_ERR_DEFENSIVE_CHECK; } } + if (OB_ERR_DEFENSIVE_CHECK == ret) { + ObString func_name = ObString::make_string("check_old_row_legitimacy"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_ERROR("Fatal Error!!! Catch a defensive error!", + K(ret), + "column_id", + column_ids, + KPC(storage_old_row), + "sql_old_row", + old_row, + "dml_param", + run_ctx.dml_param_, + "dml_type", + run_ctx.dml_type_); + } + } + return ret; +} + +int ObPartitionStorage::check_delete_index_legitimacy( + ObDMLRunningCtx &run_ctx, ObRelativeTable &index_table, const ObNewRow &old_row) +{ + int ret = OB_SUCCESS; + + ObStoreRowkey rowkey(old_row.cells_, index_table.get_rowkey_column_num()); + if (OB_UNLIKELY(rowkey.get_obj_cnt() > old_row.count_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("old row is invalid", K(ret), K(old_row), K(rowkey.get_obj_cnt())); + } else if (index_table.is_index_table() && !index_table.can_read_index()) { + // index can not be read during building index, so does not check old index row } else { - for (int64_t i = 0; OB_SUCC(ret) && i < tbl_col_desc.count(); ++i) { - uint64_t id = tbl_col_desc.at(i).col_id_; - bool found = false; - for (int32_t j = 0; OB_SUCC(ret) && j < idx_col_desc.count(); ++j) { - if (id == idx_col_desc.at(j).col_id_) { - found = true; - if (OB_FAIL(col_idx_array.push_back(static_cast(j)))) { - STORAGE_LOG(WARN, "failed to choose output column idx", K(id), K(ret)); - } - break; - } + // the vertical partition is no longer maintained, + // and the defense check skips the vertical partition function + ObDMLBaseParam &dml_param = const_cast(run_ctx.dml_param_); + ObArenaAllocator scan_allocator(ObModIds::OB_TABLE_SCAN_ITER); + ObIAllocator *allocator = run_ctx.dml_param_.dml_allocator_ != nullptr ? + run_ctx.dml_param_.dml_allocator_ : &scan_allocator; + ObFixedArray column_ids; + ObSingleRowGetter old_row_getter(*allocator, store_); + ObNewRow *storage_old_row = nullptr; + column_ids.set_allocator(allocator); + column_ids.set_capacity(index_table.get_rowkey_column_num()); + if (OB_FAIL(old_row_getter.init_dml_access_ctx(run_ctx.store_ctx_, dml_param))) { + LOG_WARN("init dml access ctx failed", K(ret)); + } else if (OB_FAIL(index_table.get_rowkey_column_ids(column_ids))) { + LOG_WARN("get rowkey column ids failed", K(ret)); + } else if (OB_FAIL(old_row_getter.init_dml_access_param(index_table, dml_param, column_ids))) { + LOG_WARN("init dml access param failed", K(ret)); + } else if (OB_FAIL(old_row_getter.open(rowkey, true))) { + LOG_WARN("open old row getter failed", K(ret), K(rowkey)); + } else if (OB_FAIL(old_row_getter.get_next_row(storage_old_row))) { + if (OB_ITER_END == ret) { + ret = OB_ERR_DEFENSIVE_CHECK; + LOG_WARN("old row in storage is not exists", K(ret)); + } else { + LOG_WARN("get next row from old_row_iter failed", K(ret), KPC(run_ctx.column_ids_), K(old_row)); } - if (!found) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "failed to choose output column idx", K(id), K(ret)); + } + if (OB_ERR_DEFENSIVE_CHECK == ret) { + ObString func_name = ObString::make_string("check_delete_index_legitimacy"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_ERROR("Fatal Error!!! Catch a defensive error!", + K(ret), + "column_id", + column_ids, + KPC(storage_old_row), + "sql_old_row", + old_row, + "dml_param", + run_ctx.dml_param_, + "dml_type", + run_ctx.dml_type_); + } + } + return ret; +} + +int ObPartitionStorage::check_new_row_legitimacy(ObDMLRunningCtx &run_ctx, const ObNewRow &new_row) +{ + int ret = OB_SUCCESS; + ObRelativeTable &data_table = run_ctx.relative_tables_.data_table_; + if (OB_ISNULL(run_ctx.column_ids_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("column ids is nullptr", K(ret)); + } else if (OB_FAIL(check_new_row_nullable_value(*run_ctx.column_ids_, data_table, new_row))) { + LOG_WARN( + "check new row nullable value failed", K(ret), "dml_param", run_ctx.dml_param_, "dml_type", run_ctx.dml_type_); + } else if (OB_FAIL(check_new_row_shadow_pk(*run_ctx.column_ids_, data_table, new_row))) { + LOG_WARN( + "check new row nullable value failed", K(ret), "dml_param", run_ctx.dml_param_, "dml_type", run_ctx.dml_type_); + } + return ret; +} + +int ObPartitionStorage::check_new_row_nullable_value( + const ObIArray &column_ids, ObRelativeTable &data_table, const ObNewRow &new_row) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(column_ids.count() > new_row.get_count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("new row is invalid", K(ret), K(new_row.get_count()), K(column_ids.count())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < column_ids.count(); ++i) { + uint64_t column_id = column_ids.at(i); + bool is_nullable = false; + if (OB_UNLIKELY(is_shadow_column(column_id))) { + // the shadow pk is generated internally, + // and the nullable attribute check for it is skipped + } else if (OB_FAIL(data_table.is_column_nullable_for_write(column_id, is_nullable))) { + LOG_WARN("check is_column_nullable_for_write failed", K(ret), K(column_id)); + } else if (new_row.get_cell(i).is_null() && !is_nullable) { + bool is_hidden = false; + bool is_gen_col = false; + bool is_nullable_for_read = false; + if (OB_FAIL(data_table.is_column_nullable_for_read(column_id, is_nullable_for_read))) { + LOG_WARN("check is nullable for read failed", K(ret)); + } else if (is_nullable_for_read) { + LOG_TRACE("Catch a defensive nullable error, but this column is not null novalidate", + K(column_id), + K(column_ids), + K(new_row), + K(data_table)); + } else if (OB_FAIL(data_table.is_hidden_column(column_id, is_hidden))) { + LOG_WARN("get is hidden column failed", K(ret), K(column_id)); + } else if (OB_FAIL(data_table.is_gen_column(column_id, is_gen_col))) { + LOG_WARN("get is gen column failed", K(ret), K(column_id)); + } else if (is_hidden && !is_gen_col) { + ret = OB_BAD_NULL_ERROR; + LOG_WARN("Catch a defensive nullable error, " + "maybe cause by add column not null default null ONLINE", + K(ret), + K(column_id), + K(column_ids), + K(new_row), + K(data_table)); + } else { + ret = OB_ERR_DEFENSIVE_CHECK; + ObString func_name = ObString::make_string("check_new_row_nullable_value"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_ERROR( + "Fatal Error!!! Catch a defensive error!", K(ret), K(column_id), K(column_ids), K(new_row), K(data_table)); } } } return ret; } -int ObPartitionStorage::init_dml_access_ctx(ObDMLRunningCtx& run_ctx, ObArenaAllocator& allocator, - ObBlockCacheWorkingSet& block_cache_ws, ObTableAccessContext& table_access_ctx) +int ObPartitionStorage::check_new_row_nullable_value( + const ObIArray &col_descs, ObRelativeTable &relative_table, const ObNewRow &new_row) { int ret = OB_SUCCESS; - common::ObQueryFlag query_flag; - common::ObVersionRange trans_version_range; - query_flag.read_latest_ = ObQueryFlag::OBSF_MASK_READ_LATEST; - // TODO trans_version_range will be passed as a parameter - trans_version_range.snapshot_version_ = run_ctx.store_ctx_.mem_ctx_->get_read_snapshot(); - trans_version_range.base_version_ = 0; - trans_version_range.multi_version_start_ = 0; + if (OB_UNLIKELY(col_descs.count() > new_row.get_count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("new row is invalid", K(ret), K(new_row.get_count()), K(col_descs.count())); + } + for (int64_t i = 0; OB_SUCC(ret) && i < col_descs.count(); ++i) { + uint64_t column_id = col_descs.at(i).col_id_; + bool is_nullable = false; + if (OB_UNLIKELY(is_shadow_column(column_id))) { + // the shadow pk is generated internally, + // and the nullable attribute check for it is skipped + } else if (OB_FAIL(relative_table.is_column_nullable_for_write(column_id, is_nullable))) { + LOG_WARN("check is_column_nullable_for_write failed", K(ret), K(column_id)); + } else if (new_row.get_cell(i).is_null() && !is_nullable) { + bool is_hidden = false; + bool is_gen_col = false; + bool is_nullable_for_read = false; + if (OB_FAIL(relative_table.is_column_nullable_for_read(column_id, is_nullable_for_read))) { + LOG_WARN("check is nullable for read failed", K(ret)); + } else if (is_nullable_for_read) { + // this column is not null novalidate, maybe the null column come from the old data + // so output trace log and ignore it + LOG_TRACE("Catch a defensive nullable error, but this column is not null novalidate", + K(column_id), + K(col_descs), + K(new_row), + K(relative_table)); + } else if (OB_FAIL(relative_table.is_hidden_column(column_id, is_hidden))) { + LOG_WARN("get is hidden column failed", K(ret), K(column_id)); + } else if (OB_FAIL(relative_table.is_gen_column(column_id, is_gen_col))) { + LOG_WARN("get is gen column failed", K(ret), K(column_id)); + } else if (is_hidden && !is_gen_col) { + ret = OB_BAD_NULL_ERROR; + LOG_WARN("Catch a defensive nullable error, " + "maybe cause by add column not null default null ONLINE", + K(ret), + K(column_id), + K(col_descs), + K(new_row), + K(relative_table)); + } else { + ret = OB_ERR_DEFENSIVE_CHECK; + ObString func_name = ObString::make_string("check_new_row_nullable_value"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_ERROR("Fatal Error!!! Catch a defensive error!", + K(ret), + K(column_id), + K(col_descs), + K(new_row), + K(relative_table)); + } + } + } + return ret; +} - if (OB_FAIL(table_access_ctx.init( - query_flag, run_ctx.store_ctx_, allocator, allocator, block_cache_ws, trans_version_range))) { - LOG_WARN("failed to init table access ctx", K(ret)); - } else { - table_access_ctx.expr_ctx_ = const_cast(&run_ctx.dml_param_.expr_ctx_); +int ObPartitionStorage::check_new_row_shadow_pk( + const ObIArray &column_ids, ObRelativeTable &data_table, const ObNewRow &new_row) +{ + int ret = OB_SUCCESS; + if (data_table.get_shadow_rowkey_column_num() > 0) { + // check shadow pk + int64_t rowkey_cnt = data_table.get_rowkey_column_num(); + int64_t spk_cnt = data_table.get_shadow_rowkey_column_num(); + int64_t index_col_cnt = rowkey_cnt - spk_cnt; + bool need_spk = false; + if (OB_UNLIKELY(index_col_cnt <= 0) || OB_UNLIKELY(column_ids.count() < rowkey_cnt)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN( + "index column count is invalid", K(ret), K(index_col_cnt), K(rowkey_cnt), K(spk_cnt), K(column_ids.count())); + } else if (lib::is_mysql_mode()) { + // mysql兼容:只要unique index key中有null列,则需要填充shadow列 + bool rowkey_has_null = false; + for (int64_t i = 0; !rowkey_has_null && i < index_col_cnt; i++) { + rowkey_has_null = new_row.get_cell(i).is_null(); + } + need_spk = rowkey_has_null; + } else { + // oracle兼容:只有unique index key全为null列时,才需要填充shadow列 + bool is_rowkey_all_null = true; + for (int64_t i = 0; is_rowkey_all_null && i < index_col_cnt; i++) { + is_rowkey_all_null = new_row.get_cell(i).is_null(); + } + need_spk = is_rowkey_all_null; + } + for (int64_t i = index_col_cnt; OB_SUCC(ret) && i < rowkey_cnt; ++i) { + uint64_t spk_column_id = column_ids.at(i); + uint64_t real_pk_id = spk_column_id - OB_MIN_SHADOW_COLUMN_ID; + const ObObj &spk_value = new_row.get_cell(i); + int64_t pk_idx = OB_INVALID_INDEX; + int cmp = 0; + if (OB_LIKELY(!need_spk)) { + if (!spk_value.is_null()) { + ret = OB_ERR_DEFENSIVE_CHECK; + ObString func_name = ObString::make_string("check_new_row_shadow_pk"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_ERROR("Fatal Error!!! Catch a defensive error!", + K(ret), + "column_id", + column_ids, + K(new_row), + K(data_table), + K(spk_value), + K(i), + K(spk_column_id), + K(real_pk_id)); + } + } else if (OB_UNLIKELY(!has_exist_in_array(column_ids, real_pk_id, &pk_idx))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("real pk column not exists in column_ids", K(ret), K(column_ids), K(real_pk_id)); + } else if (OB_FAIL(new_row.get_cell(pk_idx).compare(spk_value, cmp)) || 0 != cmp) { + ret = OB_ERR_DEFENSIVE_CHECK; + ObString func_name = ObString::make_string("check_new_row_shadow_pk"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + LOG_ERROR("Fatal Error!!! Catch a defensive error!", + K(ret), "column_id", column_ids, K(new_row), K(data_table), K(spk_value), + "pk_value", new_row.get_cell(pk_idx), K(pk_idx), K(i), K(spk_column_id), K(real_pk_id)); + } + } } return ret; } @@ -2018,38 +2243,15 @@ int ObPartitionStorage::get_conflict_rows(ObDMLRunningCtx& run_ctx, const ObInse common::ObNewRowIterator*& duplicated_rows) { int ret = OB_SUCCESS; - ObRelativeTables& relative_tables = run_ctx.relative_tables_; - ObRelativeTable& data_table = relative_tables.data_table_; - ObColDescArray tbl_out_descs; - ObArenaAllocator allocator(ObModIds::OB_TABLE_SCAN_ITER); - share::schema::ObTableParam table_param(allocator); - ObTableAccessParam table_access_param; - ObTableAccessContext table_access_ctx; - ObBlockCacheWorkingSet block_cache_ws; + ObRelativeTables &relative_tables = run_ctx.relative_tables_; + ObRelativeTable &data_table = relative_tables.data_table_; + ObStoreRowkey rowkey; + rowkey.assign(row.cells_, data_table.get_rowkey_column_num()); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "partition storage is not initialized", K(ret)); - } - CK(data_table.is_valid()); - OZ(block_cache_ws.init(extract_tenant_id(data_table.get_table_id()))); - OZ(ObDMLRunningCtx::prepare_column_desc(out_col_ids, data_table, tbl_out_descs)); - OZ(data_table.build_table_param(out_col_ids, table_param)); - OZ(table_access_param.init(data_table.get_table_id(), - data_table.get_schema_version(), - data_table.get_rowkey_column_num(), - tbl_out_descs, - false, - &table_param, - false)); - OZ(init_dml_access_ctx(run_ctx, allocator, block_cache_ws, table_access_ctx)); - if (OB_SUCC(ret)) { - ObStoreRowkey rowkey(row.cells_, data_table.get_rowkey_column_num()); - table_access_param.virtual_column_exprs_ = &(run_ctx.dml_param_.virtual_columns_); - table_access_param.output_exprs_ = run_ctx.dml_param_.output_exprs_; - table_access_param.op_ = run_ctx.dml_param_.op_; - table_access_param.op_filters_ = run_ctx.dml_param_.op_filters_; - table_access_param.row2exprs_projector_ = run_ctx.dml_param_.row2exprs_projector_; - OZ(get_conflict_row(run_ctx, table_access_param, table_access_ctx, data_table, rowkey, duplicated_rows)); + } else if (OB_FAIL(get_conflict_row(run_ctx, out_col_ids, data_table, rowkey, duplicated_rows))) { + LOG_WARN("get conflict row failed", K(ret), K(rowkey)); } // check conflict row(s) of index table if (OB_SUCC(ret) && !run_ctx.dml_param_.only_data_table_) { @@ -2059,13 +2261,8 @@ int ObPartitionStorage::get_conflict_rows(ObDMLRunningCtx& run_ctx, const ObInse OB_SUCC(ret) && (INSERT_RETURN_ALL_DUP == flag || NULL == duplicated_rows) && i < relative_tables.idx_cnt_; ++i) { if (relative_tables.index_tables_[i].is_unique_index()) { - OZ(get_index_conflict_row(run_ctx, - table_access_param, - table_access_ctx, - relative_tables.index_tables_[i], - need_index_back, - row, - duplicated_rows)); + OZ(get_index_conflict_row( + run_ctx, out_col_ids, relative_tables.index_tables_[i], need_index_back, row, duplicated_rows)); } } } @@ -2173,23 +2370,20 @@ int ObPartitionStorage::insert_table_rows( STORAGE_LOG(WARN, "rowkey already exists", K(relative_table.get_table_id()), K(ctx), K(ret)); } - if (OB_UNLIKELY(relative_table.is_storage_index_table())) { - for (int64_t i = 0; OB_SUCC(ret) && i < rows_info.row_count_; i++) { - ObStoreRow& row = rows_info.rows_[i]; - if (OB_FAIL(write_index_row(relative_table, ctx, col_descs, row))) { - if (OB_TRY_LOCK_ROW_CONFLICT != ret) { - STORAGE_LOG(WARN, "failed to set row", K(row), K(ret)); - } - } - } - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < rows_info.row_count_; i++) { - ObStoreRow& row = rows_info.rows_[i]; + for (int64_t i = 0; OB_SUCC(ret) && i < rows_info.row_count_; i++) { + ObStoreRow &row = rows_info.rows_[i]; + if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_legitimacy(run_ctx, row.row_val_))) { + LOG_WARN("check new row legitimacy failed", K(ret), K(row.row_val_)); + } else if (OB_LIKELY(!relative_table.is_storage_index_table())) { if (OB_FAIL(write_row(relative_table, ctx, relative_table.get_rowkey_column_num(), col_descs, row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { STORAGE_LOG(WARN, "failed to set row", K(row), K(ret)); } } + } else if (OB_FAIL(write_index_row(relative_table, ctx, col_descs, row))) { + if (OB_TRY_LOCK_ROW_CONFLICT != ret) { + STORAGE_LOG(WARN, "failed to set row", K(row), K(ret)); + } } } } @@ -2278,6 +2472,8 @@ int ObPartitionStorage::direct_insert_row_and_index(ObDMLRunningCtx& run_ctx, co KP(col_map), KP(idx_row), K(ret)); + } else if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_legitimacy(run_ctx, tbl_row.row_val_))) { + LOG_WARN("check new row legitimacy failed", K(ret), K(tbl_row.row_val_)); } else { const ObColDescIArray& col_descs = *run_ctx.col_descs_; if (OB_FAIL(write_row(relative_tables.data_table_, @@ -2759,7 +2955,7 @@ int ObPartitionStorage::get_change_type( } } else { bool is_nullable = false; - if (OB_FAIL(table.is_column_nullable(cid, is_nullable))) { + if (OB_FAIL(table.is_column_nullable_for_write(cid, is_nullable))) { LOG_WARN("check nullable fail", K(ret), K(cid), K(table)); } else if (is_nullable) { innullable = false; @@ -2853,6 +3049,8 @@ int ObPartitionStorage::process_old_row(ObDMLRunningCtx& run_ctx, const bool dat KP(idx_row), K(is_delete_total_quantity_log), K(ret)); + } else if (GCONF.enable_defensive_check() && OB_FAIL(check_old_row_legitimacy(run_ctx, tbl_row.row_val_))) { + LOG_WARN("check old row legitimacy failed", K(tbl_row.row_val_)); } else { const ObColDescIArray& col_descs = *run_ctx.col_descs_; uint64_t table_id = relative_tables.data_table_.get_table_id(); @@ -2916,33 +3114,22 @@ int ObPartitionStorage::process_old_row(ObDMLRunningCtx& run_ctx, const bool dat if (OB_FAIL(relative_tables.index_tables_[i].build_index_row( tbl_row.row_val_, *col_map, true, idx_row->row_val_, null_idx_val, &idx_col_descs))) { STORAGE_LOG(WARN, "failed to generate index row", K(ret)); - } else if (relative_tables.index_tables_[i].can_read_index() && + } else if (GCONF.enable_defensive_check() && relative_tables.index_tables_[i].can_read_index() && relative_tables.index_tables_[i].is_storage_index_table() && OB_FAIL(rowkey_exists( relative_tables.index_tables_[i], store_ctx, idx_col_descs, idx_row->row_val_, exists))) { STORAGE_LOG(WARN, "failed to check rowkey existing", K(*idx_row), K(ret)); } else if (!exists) { - if (run_ctx.store_ctx_.mem_ctx_ != NULL) { - ObMemtableCtx *curr_mt_ctx = static_cast(run_ctx.store_ctx_.mem_ctx_); - transaction::ObTransCtx *trans_ctx = curr_mt_ctx->get_trans_ctx(); - if (NULL != trans_ctx) { - if (!trans_ctx->is_bounded_staleness_read() && curr_mt_ctx->is_for_replay()) { - TRANS_LOG(WARN, "strong consistent read follower when sql check", - K(trans_ctx->get_trans_id())); - ret = OB_NOT_MASTER; - } - } - } - if (OB_NOT_MASTER != ret) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, - "DEBUG ATTENTION!!!! update or delete a non exist index row", - K(ret), - KPC(idx_row), - K(relative_tables.data_table_.tables_handle_), - K(relative_tables.index_tables_[i]), - K(relative_tables.index_tables_[i].tables_handle_)); - } + ret = OB_ERR_DEFENSIVE_CHECK; + ObString func_name = ObString::make_string("process_old_row"); + LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); + STORAGE_LOG(ERROR, + "Unexpected old row, update or delete a non exist index row", + K(ret), + KPC(idx_row), + K(relative_tables.data_table_.tables_handle_), + K(relative_tables.index_tables_[i]), + K(relative_tables.index_tables_[i].tables_handle_)); } else { // STORAGE_LOG(INFO, "build index row", K(*idx_row), K(null_idx_val), K(type)); if (ND_ROWKEY_CHANGE == type && !null_idx_val) { @@ -3120,6 +3307,8 @@ int ObPartitionStorage::process_new_row(ObDMLRunningCtx& run_ctx, const ObIArray K(new_tbl_row), K(rowkey_change), K(ret)); + } else if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_legitimacy(run_ctx, new_tbl_row.row_val_))) { + LOG_WARN("check new row legitimacy failed", K(ret), K(new_tbl_row.row_val_)); } else { // write full column clog needs to construct update_idx and pass to memtable if (OB_FAIL(process_row_of_data_table(run_ctx, update_idx, old_tbl_row, new_tbl_row, rowkey_change))) { @@ -6406,6 +6595,7 @@ int ObPartitionStorage::ObDMLRunningCtx::init(const ObIArray* column_i store_ctx_.mem_ctx_->set_table_version(dml_param_.schema_version_); store_ctx_.mem_ctx_->set_abs_expired_time(dml_param_.timeout_); store_ctx_.mem_ctx_->set_abs_lock_wait_timeout(dml_param_.timeout_); + column_ids_ = column_ids; is_inited_ = true; } @@ -6731,6 +6921,9 @@ int ObPartitionStorage::lock_row(ObRelativeTable& relative_table, const storage: if (OB_SUCC(ret)) { if (OB_FAIL(reshape_row(row, col_descs.count(), row_reshape_ins, lock_row, need_reshape_row, sql_mode))) { LOG_WARN("failed to reshape row", K(ret), K(row), K(need_reshape_row), K(sql_mode)); + } else if (GCONF.enable_defensive_check() && + OB_FAIL(check_new_row_nullable_value(col_descs, relative_table, lock_row.row_val_))) { + LOG_WARN("check lock row nullable failed", K(ret)); } else { memtable::ObMemtable* write_memtable = NULL; const uint64_t table_id = relative_table.get_table_id(); diff --git a/src/storage/ob_partition_storage.h b/src/storage/ob_partition_storage.h index 58cc02770..744561259 100644 --- a/src/storage/ob_partition_storage.h +++ b/src/storage/ob_partition_storage.h @@ -174,10 +174,12 @@ struct ObPartitionPrefixAccessStat { AccessStat rowkey_prefix_[MAX_ROWKEY_PREFIX_NUM + 1]; }; +class ObSingleRowGetter; class ObPartitionStorage : public ObIPartitionStorage { template friend class oceanbase::storageperf::ObMultiBlockBench; friend class ObPGStorage; + friend class ObSingleRowGetter; public: ObPartitionStorage(); @@ -488,6 +490,7 @@ private: relative_tables_(allocator), col_map_(nullptr), col_descs_(nullptr), + column_ids_(nullptr), idx_col_descs_(), tbl_row_(), idx_row_(NULL), @@ -515,8 +518,9 @@ private: common::ObIAllocator& allocator_; const ObRowDml dml_type_; ObRelativeTables relative_tables_; - const share::schema::ColumnMap* col_map_; - const ObColDescIArray* col_descs_; + const share::schema::ColumnMap *col_map_; + const ObColDescIArray *col_descs_; + const common::ObIArray *column_ids_; ObColDescArray idx_col_descs_; ObStoreRow tbl_row_; ObStoreRow* idx_row_; // not a must, allocate dynamically @@ -540,28 +544,24 @@ private: const share::schema::ColumnMap* col_map, const common::ObIArray& column_ids, const ObRowDml dml_type, const common::ObIArray& upd_column_ids, const ChangeType change_type, const bool is_total_quantity_log); int insert_table_row( - ObDMLRunningCtx& run_ctx, ObRelativeTable& relative_table, const ObColDescIArray& col_descs, ObStoreRow& row); - int insert_table_rows(ObDMLRunningCtx& run_ctx, ObRelativeTable& relative_table, const ObColDescIArray& col_descs, - ObRowsInfo& rows_info); - int insert_index_rows(ObDMLRunningCtx& run_ctx, ObStoreRow* rows, int64_t row_count); - int direct_insert_row_and_index(ObDMLRunningCtx& run_ctx, const ObStoreRow& tbl_row); - int get_column_index(const ObColDescIArray& tbl_col_desc, const ObColDescIArray& idx_col_desc, - common::ObIArray& col_idx_array); - int convert_row_to_rowkey(common::ObNewRowIterator& iter, GetRowkeyArray& rowkeys); - int get_conflict_row(ObDMLRunningCtx& run_ctx, const ObTableAccessParam& access_param, - ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const common::ObStoreRowkey& rowkey, - common::ObNewRowIterator*& duplicated_rows); - int get_index_conflict_row(ObDMLRunningCtx& run_ctx, const ObTableAccessParam& table_access_param, - ObTableAccessContext& table_access_ctx, ObRelativeTable& relative_table, bool need_index_back, - const common::ObNewRow& row, common::ObNewRowIterator*& duplicated_rows); - int multi_get_rows(const ObStoreCtx& store_ctx, const ObTableAccessParam& access_param, - ObTableAccessContext& access_ctx, ObRelativeTable& relative_table, const GetRowkeyArray& rowkeys, - common::ObNewRowIterator*& duplicated_rows, int64_t data_table_rowkey_cnt); - int get_conflict_rows(ObDMLRunningCtx& run_ctx, const ObInsertFlag flag, - const common::ObIArray& dup_col_ids, const common::ObNewRow& row, - common::ObNewRowIterator*& duplicated_rows); - int init_dml_access_ctx(ObDMLRunningCtx& run_ctx, common::ObArenaAllocator& allocator, - blocksstable::ObBlockCacheWorkingSet& block_cache_ws, ObTableAccessContext& table_access_ctx); + ObDMLRunningCtx &run_ctx, ObRelativeTable &relative_table, const ObColDescIArray &col_descs, ObStoreRow &row); + int insert_table_rows(ObDMLRunningCtx &run_ctx, ObRelativeTable &relative_table, const ObColDescIArray &col_descs, + ObRowsInfo &rows_info); + int insert_index_rows(ObDMLRunningCtx &run_ctx, ObStoreRow *rows, int64_t row_count); + int direct_insert_row_and_index(ObDMLRunningCtx &run_ctx, const ObStoreRow &tbl_row); + int convert_row_to_rowkey(ObSingleRowGetter &index_row_getter, ObStoreRowkey &rowkey); + int get_conflict_row(ObDMLRunningCtx &run_ctx, const common::ObIArray &out_col_ids, + ObRelativeTable &relative_table, const ObStoreRowkey &rowkey, common::ObNewRowIterator *&duplicated_rows); + int get_index_conflict_row(ObDMLRunningCtx &run_ctx, const common::ObIArray &out_col_ids, + ObRelativeTable &relative_table, bool need_index_back, const common::ObNewRow &row, + common::ObNewRowIterator *&duplicated_rows); + int single_get_row(ObSingleRowGetter &row_getter, + const ObStoreRowkey &rowkey, + common::ObNewRowIterator *&duplicated_rows, + int64_t data_table_rowkey_cnt); + int get_conflict_rows(ObDMLRunningCtx &run_ctx, const ObInsertFlag flag, + const common::ObIArray &dup_col_ids, const common::ObNewRow &row, + common::ObNewRowIterator *&duplicated_rows); int get_change_type( const common::ObIArray& update_ids, const ObRelativeTable& table, ChangeType& change_type); int check_rowkey_change(const common::ObIArray& update_ids, const ObRelativeTables& relative_tables, @@ -691,8 +691,21 @@ private: int lock_rows_( const ObStoreCtx& ctx, const ObTableScanParam& scan_param, const common::ObNewRow& row, RowReshape*& row_reshape); int check_useless_index_mini_merge(const storage::ObSSTableMergeCtx &ctx); - int dump_error_info(ObSSTable& main_sstable, ObSSTable& index_sstable); void check_leader_changed_for_sql_recheck_(ObDMLRunningCtx &run_ctx, int &ret); + + int dump_error_info(ObSSTable &main_sstable, ObSSTable &index_sstable); + ////////////////// + /// do write row strict check + int check_old_row_legitimacy(ObDMLRunningCtx &run_ctx, const common::ObNewRow &row); + int check_delete_index_legitimacy( + ObDMLRunningCtx &run_ctx, ObRelativeTable &index_table, const common::ObNewRow &row); + int check_new_row_legitimacy(ObDMLRunningCtx &run_ctx, const common::ObNewRow &row); + int check_new_row_nullable_value( + const common::ObIArray &column_ids, ObRelativeTable &data_table, const common::ObNewRow &new_row); + int check_new_row_nullable_value(const common::ObIArray &col_descs, + ObRelativeTable &relative_table, const common::ObNewRow &new_row); + int check_new_row_shadow_pk( + const common::ObIArray &column_ids, ObRelativeTable &data_table, const common::ObNewRow &new_row); // disallow copy; DISALLOW_COPY_AND_ASSIGN(ObPartitionStorage); diff --git a/src/storage/ob_relative_table.cpp b/src/storage/ob_relative_table.cpp index f9e25dca0..2f72d9c98 100644 --- a/src/storage/ob_relative_table.cpp +++ b/src/storage/ob_relative_table.cpp @@ -197,7 +197,27 @@ int ObRelativeTable::get_rowkey_column_ids(ObIArray& column_ids) cons return ret; } -int ObRelativeTable::get_column_data_length(const uint64_t column_id, int32_t& len) const +int ObRelativeTable::get_rowkey_column_ids(ObIArray &column_ids) const +{ + int ret = OB_SUCCESS; + if (!is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("relative table is invalid", K(ret), KPC(this)); + } else { + if (!use_schema_param_) { + if (OB_FAIL(schema_->get_rowkey_column_ids(column_ids))) { + LOG_WARN("get rowkey column ids from schema fail", K(ret)); + } + } else { + if (OB_FAIL(schema_param_->get_rowkey_column_ids(column_ids))) { + LOG_WARN("get rowkey column ids from param fail", K(ret)); + } + } + } + return ret; +} + +int ObRelativeTable::get_column_data_length(const uint64_t column_id, int32_t &len) const { int ret = OB_SUCCESS; if (!is_valid()) { @@ -250,7 +270,7 @@ int ObRelativeTable::is_rowkey_column_id(const uint64_t column_id, bool& is_rowk return ret; } -int ObRelativeTable::is_column_nullable(const uint64_t column_id, bool& is_nullable) const +int ObRelativeTable::is_column_nullable_for_write(const uint64_t column_id, bool& is_nullable) const { int ret = OB_SUCCESS; is_nullable = false; @@ -276,6 +296,121 @@ int ObRelativeTable::is_column_nullable(const uint64_t column_id, bool& is_nulla return ret; } +int ObRelativeTable::is_column_nullable_for_read(const uint64_t column_id, bool &is_nullable_for_read) const +{ + int ret = OB_SUCCESS; + is_nullable_for_read = false; + if (!is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("relative table is invalid", K(ret), K(*this)); + } else if (OB_INVALID_ID == column_id) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid column id", K(ret), K(column_id)); + } else if (!use_schema_param_) { + const ObColumnSchemaV2 *col = NULL; + if (nullptr == (col = schema_->get_column_schema(column_id))) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column schema is null", K(ret), K(column_id), K(*schema_)); + } else { + is_nullable_for_read = col->is_nullable(); + } + } else { + const ObColumnParam *col = schema_param_->get_column(column_id); + if (OB_ISNULL(col)) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column schema is null", K(ret), K(column_id), K(*schema_)); + } else { + is_nullable_for_read = col->is_nullable(); + } + } + return ret; +} + +int ObRelativeTable::is_nop_default_value(const uint64_t column_id, bool &is_nop) const +{ + int ret = OB_SUCCESS; + is_nop = false; + if (!is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("relative table is invalid", K(ret), K(*this)); + } else if (OB_HIDDEN_ROWID_COLUMN_ID == column_id) { + // rowid column need to compute on fly + is_nop = true; + } else if (!use_schema_param_) { + const ObColumnSchemaV2 *column = NULL; + if (OB_ISNULL(column = schema_->get_column_schema(column_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("wrong column id", K(ret), K(column_id)); + } else if (column->is_generated_column()) { + // generated column need to compute on fly + is_nop = true; + } + } else { + const ObColumnParam *param = NULL; + if (OB_ISNULL(param = schema_param_->get_column(column_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("wrong column id", K(ret), K(column_id), K(*schema_param_)); + } else if (param->get_cur_default_value().is_nop_value()) { + is_nop = true; + } + } + return ret; +} + +int ObRelativeTable::is_hidden_column(const uint64_t column_id, bool &is_hidden) const +{ + int ret = OB_SUCCESS; + is_hidden = false; + if (!is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("relative table is invalid", K(ret), K(*this)); + } else if (!use_schema_param_) { + const ObColumnSchemaV2 *col = NULL; + if (nullptr == (col = schema_->get_column_schema(column_id))) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column schema is null", K(ret), K(column_id), K(*schema_)); + } else { + is_hidden = col->is_hidden(); + } + } else { + const ObColumnParam *col = schema_param_->get_column(column_id); + if (OB_ISNULL(col)) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column schema is null", K(ret), K(column_id), K(*schema_)); + } else { + is_hidden = col->is_hidden(); + } + } + return ret; +} + +int ObRelativeTable::is_gen_column(const uint64_t column_id, bool &is_gen_col) const +{ + int ret = OB_SUCCESS; + is_gen_col = false; + if (!is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("relative table is invalid", K(ret), K(*this)); + } else if (!use_schema_param_) { + const ObColumnSchemaV2 *col = NULL; + if (nullptr == (col = schema_->get_column_schema(column_id))) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column schema is null", K(ret), K(column_id), K(*schema_)); + } else { + is_gen_col = col->is_generated_column(); + } + } else { + const ObColumnParam *col = schema_param_->get_column(column_id); + if (OB_ISNULL(col)) { + ret = OB_SCHEMA_ERROR; + LOG_WARN("column schema is null", K(ret), K(column_id), K(*schema_)); + } else { + is_gen_col = col->is_gen_col(); + } + } + return ret; +} + int64_t ObRelativeTable::get_rowkey_column_num() const { return use_schema_param_ ? schema_param_->get_rowkey_column_num() : schema_->get_rowkey_column_num(); @@ -513,33 +648,8 @@ int ObRelativeTable::check_index_column_in_map(const ColumnMap& col_map, const i return ret; } -int ObRelativeTable::build_table_param( - const common::ObIArray& out_col_ids, share::schema::ObTableParam& table_param) const -{ - int ret = OB_SUCCESS; - if (!is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("relative table is invalid", K(ret), K(*this)); - } else if (0 == out_col_ids.count()) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("output column ids count is 0", K(ret)); - } else { - table_param.reset(); - if (!use_schema_param_) { - if (OB_FAIL(table_param.convert(*schema_, *schema_, out_col_ids, false))) { - LOG_WARN("build table param from schema fail", K(ret), K(*schema_)); - } - } else { - if (OB_FAIL(table_param.convert_schema_param(*schema_param_, out_col_ids))) { - LOG_WARN("build table param from param fail", K(ret), K(*schema_param_)); - } - } - } - return ret; -} - -int ObRelativeTable::build_index_row(const ObNewRow& table_row, const ColumnMap& col_map, const bool only_rowkey, - ObNewRow& index_row, bool& null_idx_val, ObIArray* idx_columns) +int ObRelativeTable::build_index_row(const ObNewRow &table_row, const ColumnMap &col_map, const bool only_rowkey, + ObNewRow &index_row, bool &null_idx_val, ObIArray *idx_columns) { int ret = OB_SUCCESS; if (!is_valid()) { @@ -669,7 +779,11 @@ int ObRelativeTable::set_index_value(const ObNewRow& table_row, const ColumnMap& // ------ ObRelativeTables ------ // bool ObRelativeTables::set_table_param(const ObTableDMLParam* param) { - if (OB_ISNULL(param) || !param->is_valid()) { + if (OB_ISNULL(param) || !param->is_valid() || param->get_data_table().get_full_col_descs().empty()) { + // only when the full column descs in table_dml_param is not empty, + // we will use table_dml_param to generate dml running context + // otherwise we will generate dml running context from ObTableSchema + // because the full column descs maybe empty during the observer upgrade use_table_param_ = false; table_param_ = NULL; } else { diff --git a/src/storage/ob_relative_table.h b/src/storage/ob_relative_table.h index edbae1093..fe9541bfb 100644 --- a/src/storage/ob_relative_table.h +++ b/src/storage/ob_relative_table.h @@ -33,13 +33,18 @@ public: bool set_schema_param(const share::schema::ObTableSchemaParam* param); uint64_t get_table_id() const; int64_t get_schema_version() const; - int get_col_desc(const uint64_t column_id, share::schema::ObColDesc& col_desc) const; - int get_col_desc_by_idx(const int64_t idx, share::schema::ObColDesc& col_desc) const; - int get_rowkey_col_id_by_idx(const int64_t idx, uint64_t& col_id) const; - int get_rowkey_column_ids(common::ObIArray& column_ids) const; - int get_column_data_length(const uint64_t column_id, int32_t& len) const; - int is_rowkey_column_id(const uint64_t column_id, bool& is_rowkey) const; - int is_column_nullable(const uint64_t column_id, bool& is_nullable) const; + int get_col_desc(const uint64_t column_id, share::schema::ObColDesc &col_desc) const; + int get_col_desc_by_idx(const int64_t idx, share::schema::ObColDesc &col_desc) const; + int get_rowkey_col_id_by_idx(const int64_t idx, uint64_t &col_id) const; + int get_rowkey_column_ids(common::ObIArray &column_ids) const; + int get_rowkey_column_ids(common::ObIArray &column_ids) const; + int get_column_data_length(const uint64_t column_id, int32_t &len) const; + int is_rowkey_column_id(const uint64_t column_id, bool &is_rowkey) const; + int is_column_nullable_for_write(const uint64_t column_id, bool &is_nullable_for_write) const; + int is_column_nullable_for_read(const uint64_t column_id, bool &is_nullable_for_read) const; + int is_nop_default_value(const uint64_t column_id, bool &is_nop) const; + int is_hidden_column(const uint64_t column_id, bool &is_hidden) const; + int is_gen_column(const uint64_t column_id, bool &is_gen_col) const; OB_INLINE bool allow_not_ready() const { return allow_not_ready_; @@ -59,13 +64,12 @@ public: bool can_read_index() const; bool is_unique_index() const; bool is_domain_index() const; - int check_rowkey_in_column_ids(const common::ObIArray& column_ids, const bool has_other_column) const; - int check_column_in_map(const share::schema::ColumnMap& col_map) const; - int check_index_column_in_map(const share::schema::ColumnMap& col_map, const int64_t data_table_rowkey_cnt) const; - int build_table_param(const common::ObIArray& out_col_ids, share::schema::ObTableParam& table_param) const; - int build_index_row(const common::ObNewRow& table_row, const share::schema::ColumnMap& col_map, - const bool only_rowkey, common::ObNewRow& index_row, bool& null_idx_val, - common::ObIArray* idx_columns); + int check_rowkey_in_column_ids(const common::ObIArray &column_ids, const bool has_other_column) const; + int check_column_in_map(const share::schema::ColumnMap &col_map) const; + int check_index_column_in_map(const share::schema::ColumnMap &col_map, const int64_t data_table_rowkey_cnt) const; + int build_index_row(const common::ObNewRow &table_row, const share::schema::ColumnMap &col_map, + const bool only_rowkey, common::ObNewRow &index_row, bool &null_idx_val, + common::ObIArray *idx_columns); OB_INLINE bool use_schema_param() const { return use_schema_param_; @@ -78,8 +82,11 @@ public: { return schema_param_; } - TO_STRING_KV("index_id", NULL == schema_ ? 0 : schema_->get_table_id(), KPC(schema_), K_(allow_not_ready), - K_(use_schema_param), KPC(schema_param_)); + OB_INLINE const share::schema::ObTableSchema *get_schema() const + { + return schema_; + } + TO_STRING_KV(K_(allow_not_ready), K_(use_schema_param), KPC_(schema), KPC_(schema_param)); private: int get_rowkey_col_desc_by_idx(const int64_t idx, share::schema::ObColDesc& col_desc) const; diff --git a/src/storage/ob_single_merge.cpp b/src/storage/ob_single_merge.cpp index 42a344255..3a82f8c58 100644 --- a/src/storage/ob_single_merge.cpp +++ b/src/storage/ob_single_merge.cpp @@ -23,7 +23,9 @@ ObSingleMerge::ObSingleMerge() : rowkey_(NULL), fuse_row_cache_fetcher_() {} ObSingleMerge::~ObSingleMerge() -{} +{ + reset(); +} int ObSingleMerge::open(const ObExtStoreRowkey& rowkey) { @@ -151,7 +153,10 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row) bool final_result = false; bool is_fuse_row_empty = false; int64_t sstable_end_log_ts = 0; - ObStoreRow& fuse_row = full_row_; + ObStoreRow &fuse_row = full_row_; + int64_t column_cnt = access_param_->iter_param_.projector_ != nullptr + ? access_param_->iter_param_.projector_->count() + : access_param_->iter_param_.out_cols_->count(); nop_pos_.reset(); fuse_row.row_val_.count_ = 0; fuse_row.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST; @@ -221,8 +226,7 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row) cache_row.flag_ = handle_.value_->get_flag(); // cache_row.snapshot_version_ = handle_.value_->get_snapshot_version(); if (is_fuse_row_empty) { - row.row_val_.count_ = - ObActionFlag::OP_ROW_EXIST == cache_row.flag_ ? access_param_->iter_param_.projector_->count() : 0; + row.row_val_.count_ = ObActionFlag::OP_ROW_EXIST == cache_row.flag_ ? column_cnt : 0; row.flag_ = ObActionFlag::OP_ROW_DOES_NOT_EXIST; row.from_base_ = false; row.snapshot_version_ = 0L; @@ -258,20 +262,19 @@ int ObSingleMerge::inner_get_next_row(ObStoreRow& row) if (OB_SUCC(ret)) { STORAGE_LOG(DEBUG, "row before project", K(fuse_row)); if (!is_fuse_row_empty) { - row.row_val_.count_ = - ObActionFlag::OP_ROW_EXIST == fuse_row.flag_ ? access_param_->iter_param_.projector_->count() : 0; + row.row_val_.count_ = ObActionFlag::OP_ROW_EXIST == fuse_row.flag_ ? column_cnt : 0; } if (enable_fuse_row_cache) { if (!is_fuse_row_empty) { if (ObActionFlag::OP_ROW_EXIST == fuse_row.flag_) { if (OB_FAIL(project_row(fuse_row, access_param_->iter_param_.projector_, 0 /*range idx delta*/, row))) { - STORAGE_LOG(WARN, "fail to project row", K(ret), K(fuse_row), K(*access_param_->iter_param_.projector_)); + STORAGE_LOG(WARN, "fail to project row", K(ret), K(fuse_row), KPC(access_param_->iter_param_.projector_)); } else { STORAGE_LOG(DEBUG, "after project row", K(fuse_row), K(row), - K(*access_param_->iter_param_.projector_), + KPC(access_param_->iter_param_.projector_), K(access_param_->iter_param_.table_id_)); } } else { diff --git a/src/storage/ob_single_merge.h b/src/storage/ob_single_merge.h index c4bc216a3..a71b4c214 100644 --- a/src/storage/ob_single_merge.h +++ b/src/storage/ob_single_merge.h @@ -18,7 +18,6 @@ namespace oceanbase { namespace storage { - class ObSingleMerge : public ObMultipleMerge { public: ObSingleMerge(); @@ -50,7 +49,6 @@ private: // disallow copy DISALLOW_COPY_AND_ASSIGN(ObSingleMerge); }; - } /* namespace storage */ } /* namespace oceanbase */ diff --git a/src/storage/ob_sstable_multi_version_row_iterator.cpp b/src/storage/ob_sstable_multi_version_row_iterator.cpp index 95b94b113..113f507bf 100644 --- a/src/storage/ob_sstable_multi_version_row_iterator.cpp +++ b/src/storage/ob_sstable_multi_version_row_iterator.cpp @@ -68,7 +68,7 @@ void ObSSTableMultiVersionRowIterator::reuse() } template -int ObSSTableMultiVersionRowIterator::new_iterator(ObArenaAllocator& allocator) +int ObSSTableMultiVersionRowIterator::new_iterator(ObIAllocator &allocator) { int ret = OB_SUCCESS; if (NULL == iter_) { diff --git a/src/storage/ob_sstable_multi_version_row_iterator.h b/src/storage/ob_sstable_multi_version_row_iterator.h index aa9f70d23..4704ba534 100644 --- a/src/storage/ob_sstable_multi_version_row_iterator.h +++ b/src/storage/ob_sstable_multi_version_row_iterator.h @@ -33,8 +33,8 @@ protected: const void* query_range) = 0; virtual int inner_get_next_row(const ObStoreRow*& row) = 0; template - int new_iterator(common::ObArenaAllocator& allocator); - int get_not_exist_row(const common::ObStoreRowkey& rowkey, const ObStoreRow*& row); + int new_iterator(common::ObIAllocator &allocator); + int get_not_exist_row(const common::ObStoreRowkey &rowkey, const ObStoreRow *&row); protected: const ObTableIterParam* iter_param_; diff --git a/src/storage/ob_sstable_row_iterator.h b/src/storage/ob_sstable_row_iterator.h index dadb33b82..a0aa1949e 100644 --- a/src/storage/ob_sstable_row_iterator.h +++ b/src/storage/ob_sstable_row_iterator.h @@ -250,7 +250,7 @@ public: array_ = nullptr; capacity_ = 0; } - int reserve(common::ObArenaAllocator& allocator, const int64_t count) + int reserve(common::ObIAllocator &allocator, const int64_t count) { int ret = common::OB_SUCCESS; if (capacity_ < count) { diff --git a/src/storage/ob_value_row_iterator.cpp b/src/storage/ob_value_row_iterator.cpp index 045002ace..d9dfb4208 100644 --- a/src/storage/ob_value_row_iterator.cpp +++ b/src/storage/ob_value_row_iterator.cpp @@ -10,12 +10,14 @@ * See the Mulan PubL v2 for more details. */ +#define USING_LOG_PREFIX STORAGE #include "ob_value_row_iterator.h" +#include "ob_partition_storage.h" +#include "ob_single_merge.h" namespace oceanbase { using namespace oceanbase::common; namespace storage { - ObValueRowIterator::ObValueRowIterator() : ObNewRowIterator(), is_inited_(false), @@ -130,5 +132,159 @@ void ObValueRowIterator::reset() cur_idx_ = 0; } +ObSingleRowGetter::ObSingleRowGetter(ObIAllocator &allocator, ObPartitionStore &store) + : store_(store), + single_merge_(nullptr), + store_ctx_(nullptr), + output_projector_(allocator), + relative_table_(nullptr), + table_param_(nullptr), + allocator_(allocator) +{} + +ObSingleRowGetter::~ObSingleRowGetter() +{ + if (single_merge_ != nullptr) { + single_merge_->~ObSingleMerge(); + allocator_.free(single_merge_); + single_merge_ = nullptr; + } + if (table_param_ != nullptr) { + table_param_->~ObTableParam(); + allocator_.free(table_param_); + table_param_ = nullptr; + } +} + +int ObSingleRowGetter::init_dml_access_ctx(const ObStoreCtx &store_ctx, const ObDMLBaseParam &dml_param) +{ + int ret = OB_SUCCESS; + common::ObVersionRange trans_version_range; + // TODO (muwei) trans_version_range值后续由上层传入 + trans_version_range.snapshot_version_ = store_ctx.mem_ctx_->get_read_snapshot(); + trans_version_range.base_version_ = 0; + trans_version_range.multi_version_start_ = 0; + store_ctx_ = &store_ctx; + + if (OB_FAIL(access_ctx_.init(dml_param.query_flag_, store_ctx, allocator_, trans_version_range))) { + LOG_WARN("failed to init table access ctx", K(ret)); + } else { + access_ctx_.expr_ctx_ = const_cast(&dml_param.expr_ctx_); + } + return ret; +} + +int ObSingleRowGetter::init_dml_access_param( + ObRelativeTable &relative_table, const ObDMLBaseParam &dml_param, const ObIArray &out_col_ids) +{ + int ret = OB_SUCCESS; + relative_table_ = &relative_table; + get_table_param_.tables_handle_ = &(relative_table.tables_handle_); + if (!dml_param.virtual_columns_.empty() && !relative_table.is_index_table()) { + //The index table does not contain virtual columns, no need to set virtual_columns + access_param_.virtual_column_exprs_ = &(dml_param.virtual_columns_); + } + if (OB_UNLIKELY(!relative_table.use_schema_param())) { + const share::schema::ObTableSchema *schema = relative_table.get_schema(); + if (OB_FAIL(create_table_param())) { + LOG_WARN("create table param failed", K(ret)); + } else if (OB_FAIL(table_param_->convert(*schema, *schema, out_col_ids, false))) { + LOG_WARN("build table param from schema fail", K(ret), KPC(schema)); + } else if (OB_FAIL(access_param_.init_dml_access_param(relative_table.get_table_id(), + relative_table.get_schema_version(), + relative_table.get_rowkey_column_num(), + *table_param_))) { + LOG_WARN("init dml access param failed", K(ret)); + } + } else { + const share::schema::ObTableSchemaParam *schema_param = relative_table.get_schema_param(); + output_projector_.set_capacity(out_col_ids.count()); + for (int32_t i = 0; OB_SUCC(ret) && i < out_col_ids.count(); ++i) { + int idx = OB_INVALID_INDEX; + if (OB_FAIL(schema_param->get_col_map().get(out_col_ids.at(i), idx))) { + LOG_WARN("get column index from column map failed", K(ret), K(out_col_ids.at(i))); + } else if (OB_FAIL(output_projector_.push_back(idx))) { + LOG_WARN("store output projector failed", K(ret)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(access_param_.init_dml_access_param(relative_table.get_table_id(), + relative_table.get_schema_version(), + relative_table.get_rowkey_column_num(), + *schema_param, + &output_projector_))) { + LOG_WARN("init dml access param failed", K(ret)); + } + } + } + LOG_DEBUG("init dml access param", K(ret), K(out_col_ids), K(relative_table), K(dml_param), K_(access_param)); + return ret; +} + +int ObSingleRowGetter::create_table_param() +{ + int ret = OB_SUCCESS; + void *buf = nullptr; + if (table_param_ != nullptr) { + ret = OB_INIT_TWICE; + LOG_WARN("init table param twice", K(ret)); + } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(share::schema::ObTableParam)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("allocate table param failed", K(ret), K(sizeof(share::schema::ObTableParam))); + } else { + table_param_ = new (buf) share::schema::ObTableParam(allocator_); + } + return ret; +} + +int ObSingleRowGetter::open(const ObStoreRowkey &rowkey, bool use_fuse_row_cache) +{ + int ret = OB_SUCCESS; + void *buf = nullptr; + + new (&ext_rowkey_) ObExtStoreRowkey(rowkey); + if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObSingleMerge)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("Fail to allocate memory for multi get merge ", K(ret)); + } else { + { + ObStorageWriterGuard guard(store_, *store_ctx_, false); + if (OB_FAIL(guard.refresh_and_protect_table(*relative_table_))) { + STORAGE_LOG(WARN, "fail to protect table", K(ret)); + } + } + single_merge_ = new (buf) ObSingleMerge(); + } + if (OB_SUCC(ret)) { + if (OB_FAIL(single_merge_->init(access_param_, access_ctx_, get_table_param_))) { + STORAGE_LOG(WARN, "Fail to init ObSingleMerge, ", K(ret)); + } else if (OB_FAIL(single_merge_->open(ext_rowkey_))) { + STORAGE_LOG(WARN, "Fail to open iter, ", K(ret)); + } + if (use_fuse_row_cache) { + access_ctx_.use_fuse_row_cache_ = true; + access_ctx_.fuse_row_cache_hit_rate_ = 100L; + } + } + return ret; +} + +int ObSingleRowGetter::get_next_row(ObNewRow *&row) +{ + int ret = OB_SUCCESS; + row = nullptr; + while (OB_SUCC(ret)) { + ObStoreRow *store_row = NULL; + if (OB_FAIL(single_merge_->get_next_row(store_row))) { + if (OB_ITER_END != ret) { + STORAGE_LOG(WARN, "failed to get next row", K(ret)); + } + } else if (ObActionFlag::OP_ROW_EXIST == store_row->flag_) { + row = &store_row->row_val_; + break; + } + } + return ret; +} } // end namespace storage } // end namespace oceanbase diff --git a/src/storage/ob_value_row_iterator.h b/src/storage/ob_value_row_iterator.h index cf83f59de..65efc6778 100644 --- a/src/storage/ob_value_row_iterator.h +++ b/src/storage/ob_value_row_iterator.h @@ -20,6 +20,7 @@ #include "common/row/ob_row_iterator.h" #include "common/rowkey/ob_rowkey.h" #include "storage/ob_i_store.h" +#include "storage/ob_dml_param.h" namespace oceanbase { namespace storage { class ObValueRowIterator : public common::ObNewRowIterator { @@ -47,6 +48,50 @@ private: DISALLOW_COPY_AND_ASSIGN(ObValueRowIterator); }; +class ObSingleMerge; +class ObSingleRowGetter { + typedef common::ObFixedArray Projector; + +public: + ObSingleRowGetter(common::ObIAllocator &allocator, ObPartitionStore &store); + ~ObSingleRowGetter(); + + int init_dml_access_ctx(const ObStoreCtx &store_ctx, const ObDMLBaseParam &dml_param); + int init_dml_access_param( + ObRelativeTable &data_table, const ObDMLBaseParam &dml_param, const common::ObIArray &out_col_ids); + ObTableAccessParam &get_access_param() + { + return access_param_; + } + ObTableAccessContext &get_access_ctx() + { + return access_ctx_; + } + void set_relative_table(ObRelativeTable *relative_table) + { + relative_table_ = relative_table; + } + int open(const ObStoreRowkey &rowkey, bool use_fuse_row_cache = false); + int get_next_row(common::ObNewRow *&row); + +private: + int create_table_param(); + +private: + ObPartitionStore &store_; + ObSingleMerge *single_merge_; + const ObStoreCtx *store_ctx_; + Projector output_projector_; + ObTableAccessParam access_param_; + ObTableAccessContext access_ctx_; + ObGetTableParam get_table_param_; + ObRelativeTable *relative_table_; + share::schema::ObTableParam *table_param_; + union { + ObExtStoreRowkey ext_rowkey_; + }; + common::ObIAllocator &allocator_; +}; } // end namespace storage } // end namespace oceanbase diff --git a/src/storage/transaction/ob_trans_define.h b/src/storage/transaction/ob_trans_define.h index 6126ba72f..e1532ca35 100644 --- a/src/storage/transaction/ob_trans_define.h +++ b/src/storage/transaction/ob_trans_define.h @@ -773,7 +773,11 @@ public: { return stmt_type_; } - const char* get_sql_id() const + inline bool is_delete_stmt() const + { + return sql::stmt::T_DELETE == stmt_type_; + } + const char *get_sql_id() const { return sql_id_.ptr(); } -- GitLab