提交 4d366728 编写于 作者: S suz-yang 提交者: ob-robot

fix direct load fallback

上级 0367afb6
......@@ -771,7 +771,7 @@ TEST_F(TestFileManager, test_remote_path_to_macro_id)
file_id.set_storage_object_type(static_cast<uint64_t>(ObStorageObjectType::SHARED_INC_MAJOR_DATA_MACRO));
file_id.set_second_id(3);
file_id.set_third_id(2);
file_id.set_macro_transfer_epoch(0);
file_id.set_macro_private_transfer_epoch(0);
file_id.set_tenant_seq(5);
// cluster_id/tenant_id/tablet/tablet_id/reorganization_scn/inc_major/cg_id/data/macro_seq_id
check_path_to_macro_id(false/*is_local_cache*/, ObStorageObjectType::SHARED_INC_MAJOR_DATA_MACRO, file_id);
......
......@@ -66,7 +66,8 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam()
load_level_(ObDirectLoadLevel::INVALID_LEVEL),
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
online_sample_percent_(100.),
enable_inc_major_(false)
enable_inc_major_(false),
is_backup_(false)
{
column_ids_.set_tenant_id(MTL_ID());
}
......@@ -2191,7 +2192,6 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
ObSQLSessionInfo *session = nullptr;
ObSchemaGetterGuard *schema_guard = nullptr;
int64_t total_line_count = 0;
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
if (OB_UNLIKELY(load_args.file_iter_.count() > ObTableLoadSequenceNo::MAX_DATA_ID)) {
ret = OB_NOT_SUPPORTED;
......@@ -2247,7 +2247,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
}
}
if (OB_SUCC(ret) && !is_backup) {
if (OB_SUCC(ret) && !execute_param_.is_backup_) {
FileLoadExecutor *file_load_executor = nullptr;
DataDescIterator data_desc_iter;
if (1 == load_args.file_iter_.count() && 0 == execute_param_.ignore_row_num_ &&
......@@ -2295,7 +2295,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
file_load_executor = nullptr;
}
}
if (OB_SUCC(ret) && is_backup) {
if (OB_SUCC(ret) && execute_param_.is_backup_) {
BackupLoadExecutor *backup_load_executor = nullptr;
if (OB_ISNULL(backup_load_executor = OB_NEWx(BackupLoadExecutor, execute_ctx_.allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
......@@ -2350,7 +2350,6 @@ int ObLoadDataDirectImpl::init_execute_param()
load_stmt_->get_field_or_var_list();
ObSchemaGetterGuard *schema_guard = ctx_->get_sql_ctx()->schema_guard_;
const ObTableSchema *table_schema = nullptr;
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
execute_param_.tenant_id_ = load_args.tenant_id_;
execute_param_.database_id_ = load_args.database_id_;
execute_param_.table_id_ = load_args.table_id_;
......@@ -2377,6 +2376,10 @@ int ObLoadDataDirectImpl::init_execute_param()
execute_param_.insert_mode_ = optimizer_ctx->insert_mode_;
execute_param_.load_level_ = optimizer_ctx->load_level_;
execute_param_.enable_inc_major_ = optimizer_ctx->enable_inc_major_;
execute_param_.is_backup_ = optimizer_ctx->is_backup();
if (OB_FAIL(execute_param_.column_ids_.assign(optimizer_ctx->get_column_ids()))) {
LOG_WARN("fail to assign columns ids", KR(ret));
}
}
}
// parallel_
......@@ -2433,53 +2436,6 @@ int ObLoadDataDirectImpl::init_execute_param()
}
data_access_param.compression_format_ = load_args.compression_format_;
}
// column_ids_
if (OB_SUCC(ret)) {
execute_param_.column_ids_.reset();
if (is_backup) { // 备份数据导入
if (OB_FAIL(ObTableLoadSchema::get_column_ids(table_schema, execute_param_.column_ids_))) {
LOG_WARN("fail to get column ids for backup", KR(ret));
}
} else if (load_stmt_->get_default_table_columns()) { // 默认列导入
if (OB_FAIL(ObTableLoadSchema::get_user_column_ids(table_schema, execute_param_.column_ids_))) {
LOG_WARN("fail to get user column ids", KR(ret));
}
} else { // 指定列导入
const static uint64_t INVALID_COLUMN_ID = UINT64_MAX;
ObArray<uint64_t> user_column_ids;
ObArray<ObString> user_column_names;
user_column_ids.set_tenant_id(MTL_ID());
user_column_names.set_tenant_id(MTL_ID());
if (OB_FAIL(ObTableLoadSchema::get_user_column_id_and_names(table_schema, user_column_ids, user_column_names))) {
LOG_WARN("fail to get user column ids and names", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < field_or_var_list.count(); ++i) {
const ObLoadDataStmt::FieldOrVarStruct &field_or_var_struct = field_or_var_list.at(i);
if (OB_UNLIKELY(!field_or_var_struct.is_table_column_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("var is not supported", KR(ret), K(field_or_var_struct), K(i),
K(field_or_var_list));
} else {
const uint64_t column_id = field_or_var_struct.column_id_;
int64_t found_column_idx = -1;
for (int64_t j = 0; found_column_idx == -1 && j < user_column_ids.count(); ++j) {
const uint64_t user_column_id = user_column_ids.at(j);
if (column_id == user_column_id) {
found_column_idx = j;
}
}
if (OB_UNLIKELY(found_column_idx == -1)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unknow column", KR(ret), K(user_column_ids), K(field_or_var_struct));
} else if (OB_FAIL(execute_param_.column_ids_.push_back(column_id))) {
LOG_WARN("fail to push back column id", KR(ret));
} else {
user_column_ids.at(found_column_idx) = INVALID_COLUMN_ID;
}
}
}
}
}
// compressor_type_
if (OB_SUCC(ret)) {
if (OB_FAIL(ObDDLUtil::get_temp_store_compress_type(
......@@ -2510,7 +2466,6 @@ int ObLoadDataDirectImpl::init_execute_context()
{
int ret = OB_SUCCESS;
const ObLoadArgument &load_args = load_stmt_->get_load_arguments();
const bool is_backup = ObLoadDataFormat::OB_BACKUP_1_4 == load_args.access_info_.get_load_data_format();
execute_ctx_.exec_ctx_.exec_ctx_ = ctx_;
execute_ctx_.allocator_ = &ctx_->get_allocator();
ObTableLoadParam load_param;
......@@ -2521,7 +2476,7 @@ int ObLoadDataDirectImpl::init_execute_context()
load_param.batch_size_ = execute_param_.batch_row_count_;
load_param.max_error_row_count_ = execute_param_.max_error_rows_;
load_param.column_count_ = execute_param_.column_ids_.count();
load_param.need_sort_ = is_backup ? false : execute_param_.need_sort_;
load_param.need_sort_ = execute_param_.need_sort_;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
load_param.dup_action_ = execute_param_.dup_action_;
......
......@@ -100,7 +100,8 @@ private:
K_(compressor_type),
K_(online_sample_percent),
K_(tablet_ids),
K_(enable_inc_major));
K_(enable_inc_major),
K_(is_backup));
public:
uint64_t tenant_id_;
uint64_t database_id_;
......@@ -126,6 +127,7 @@ private:
double online_sample_percent_;
ObArray<ObTabletID> tablet_ids_;
bool enable_inc_major_;
bool is_backup_;
};
struct LoadExecuteContext
......
......@@ -36,6 +36,8 @@ public:
DEF(MAX_FORMAT, )
DECLARE_ENUM(Type, type, LOAD_DATA_FORMAT_DEF, static);
static bool is_backup(Type type) { return type == OB_BACKUP_1_4 || type == OB_BACKUP_3_X || type == OB_BACKUP_2_X_LOG || type == OB_BACKUP_2_X_PHY ; }
};
class ObLoadDataStorageInfo : public share::ObExternalTableStorageInfo
......
......@@ -52,19 +52,37 @@ public:
bool is_insert_into() const { return load_mode_ == ObDirectLoadMode::INSERT_INTO; }
void set_is_online_gather_statistics(bool is_online_gather_statistics) { is_online_gather_statistics_ = is_online_gather_statistics; }
void set_online_sample_percent(double online_sample_percent) { online_sample_percent_ = online_sample_percent; }
TO_STRING_KV(K_(table_id), K_(load_method), K_(insert_mode), K_(load_mode), K_(load_level),
K_(dup_action), K_(max_error_row_count), K_(need_sort), K_(can_use_direct_load),
K_(use_direct_load), K_(is_optimized_by_default_load_mode),
K_(is_online_gather_statistics), K_(online_sample_percent));
bool is_backup() const { return is_backup_; }
const ObArray<uint64_t> &get_column_ids() const { return column_ids_; }
TO_STRING_KV(K_(table_id),
"load_method", storage::ObDirectLoadMethod::get_type_string(load_method_),
"insert_mode", storage::ObDirectLoadInsertMode::get_type_string(insert_mode_),
"load_mode", storage::ObDirectLoadMode::get_type_string(load_mode_),
"load_level", storage::ObDirectLoadLevel::get_type_string(load_level_),
K_(dup_action),
K_(max_error_row_count),
K_(need_sort),
K_(is_backup),
K_(is_mview_complete_refresh),
K_(is_insert_overwrite),
K_(is_enabled),
K_(enable_inc_major),
K_(is_optimized_by_default_load_mode),
K_(can_use_direct_load),
K_(use_direct_load),
K_(is_online_gather_statistics),
K_(online_sample_percent),
K_(column_ids));
private:
void enable_by_direct_load_hint(const ObDirectLoadHint &hint);
void enable_by_append_hint();
void enable_by_config(ObExecContext *exec_ctx);
void enable_by_overwrite();
int check_exec_ctx(ObExecContext *exec_ctx);
int check_semantics();
int check_transaction(ObSQLSessionInfo *session_info);
int check_support_insert_overwrite(const ObGlobalHint &global_hint);
int check_support_direct_load(ObExecContext *exec_ctx);
public:
uint64_t table_id_;
storage::ObDirectLoadMethod::Type load_method_;
......@@ -74,12 +92,17 @@ public:
sql::ObLoadDupActionType dup_action_;
int64_t max_error_row_count_;
bool need_sort_;
bool is_backup_; // backup load, only load data now
bool is_mview_complete_refresh_;
bool is_insert_overwrite_;
bool is_enabled_; // _ob_enable_direct_load
bool enable_inc_major_; // _enable_inc_major_direct_load
bool is_optimized_by_default_load_mode_; // optimized by default load mode
bool can_use_direct_load_;
bool use_direct_load_;
bool is_optimized_by_default_load_mode_; // optimized by default load mode
bool enable_inc_major_;
bool is_online_gather_statistics_;
double online_sample_percent_;
ObArray<uint64_t> column_ids_;
};
} // namespace sql
......
......@@ -846,7 +846,7 @@ int ObLoadDataResolver::resolve_filename(ObLoadDataStmt *load_stmt, ParseNode *n
char *path = nullptr;
int64_t path_len = 0;
ObArray<ObString> file_list;
if (load_args.access_info_.get_load_data_format() == ObLoadDataFormat::OB_BACKUP_1_4) {
if (ObLoadDataFormat::is_backup(load_args.access_info_.get_load_data_format())) {
load_args.file_name_ = temp_file_name;
} else {
if (OB_ISNULL(file_ptr = temp_file_name.reverse_find('/'))) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册