/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGE #include "storage/ls/ob_ls_tablet_service.h" #include "common/row/ob_row_store.h" #include "lib/objectpool/ob_server_object_pool.h" #include "lib/utility/ob_macro_utils.h" #include "lib/utility/utility.h" #include "logservice/ob_log_base_header.h" #include "logservice/ob_log_base_type.h" #include "logservice/ob_log_service.h" #include "observer/report/ob_i_meta_report.h" #include "share/ob_disk_usage_table_operator.h" #include "share/ob_rpc_struct.h" #include "share/rc/ob_tenant_base.h" #include "share/schema/ob_table_param.h" #include "share/schema/ob_tenant_schema_service.h" #include "share/ob_ddl_common.h" #include "storage/blocksstable/ob_index_block_builder.h" #include "storage/blocksstable/ob_sstable_meta.h" #include "storage/ob_dml_running_ctx.h" #include "storage/ob_partition_range_spliter.h" #include "storage/ob_query_iterator_factory.h" #include "storage/ob_relative_table.h" #include "storage/ob_row_reshape.h" #include "storage/ob_storage_struct.h" #include "storage/ob_storage_table_guard.h" #include "storage/ob_value_row_iterator.h" #include "storage/access/ob_table_scan_iterator.h" #include "storage/access/ob_table_scan_range.h" #include "storage/access/ob_rows_info.h" #include "storage/access/ob_table_scan_range.h" #include "storage/access/ob_rows_info.h" #include "storage/access/ob_table_estimator.h" #include "storage/access/ob_index_sstable_estimator.h" #include "storage/blocksstable/ob_sstable.h" #include "storage/ls/ob_ls.h" #include "storage/tablet/ob_tablet.h" #include "storage/tablet/ob_tablet_iterator.h" #include "storage/tablet/ob_tablet_binding_helper.h" #include "storage/tablet/ob_tablet_create_delete_helper.h" #include "storage/tablet/ob_tablet_create_sstable_param.h" #include "storage/tablet/ob_tablet_service_clog_replay_executor.h" #include "storage/tablet/ob_tablet_status.h" #include "storage/tablet/ob_tablet_slog_helper.h" #include "storage/tx/ob_trans_define.h" #include "storage/tx/ob_trans_part_ctx.h" #include "storage/tx_storage/ob_ls_service.h" #include "storage/meta_mem/ob_tablet_map_key.h" #include "storage/meta_mem/ob_tenant_meta_mem_mgr.h" #include "storage/slog/ob_storage_log_replayer.h" #include "storage/slog/ob_storage_log_struct.h" #include "storage/slog/ob_storage_logger.h" using namespace oceanbase::share; using namespace oceanbase::common; using namespace oceanbase::blocksstable; namespace oceanbase { namespace storage { ObLSTabletService::ObLSTabletService() : ls_(nullptr), tx_data_memtable_mgr_(), tx_ctx_memtable_mgr_(), lock_memtable_mgr_(), tablet_id_set_(), bucket_lock_(), rs_reporter_(nullptr), allow_to_read_mgr_(), is_inited_(false), is_stopped_(false) { } ObLSTabletService::~ObLSTabletService() { } int ObLSTabletService::init( ObLS *ls, observer::ObIMetaReport *rs_reporter) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret), K_(is_inited)); } else if (OB_ISNULL(ls) || OB_ISNULL(rs_reporter)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ls), KP(rs_reporter)); } else if (OB_FAIL(tablet_id_set_.init(ObTabletCommon::BUCKET_LOCK_BUCKET_CNT, MTL_ID()))) { LOG_WARN("fail to init tablet id set", K(ret)); } else if (OB_FAIL(bucket_lock_.init(ObTabletCommon::BUCKET_LOCK_BUCKET_CNT, ObLatchIds::TABLET_BUCKET_LOCK, "TabletSvrBucket", MTL_ID()))) { LOG_WARN("failed to init bucket lock", K(ret)); } else if (OB_FAIL(set_allow_to_read_(ls))) { LOG_WARN("failed to set allow to read", K(ret)); } else { ls_ = ls; rs_reporter_ = rs_reporter; is_stopped_ = false; is_inited_ = true; } if (OB_UNLIKELY(!is_inited_)) { destroy(); } return ret; } void ObLSTabletService::destroy() { delete_all_tablets(); tablet_id_set_.destroy(); tx_data_memtable_mgr_.destroy(); tx_ctx_memtable_mgr_.destroy(); lock_memtable_mgr_.destroy(); bucket_lock_.destroy(); rs_reporter_ = nullptr; ls_= nullptr; is_stopped_ = false; is_inited_ = false; } int ObLSTabletService::stop() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { is_stopped_ = true; } return ret; } int ObLSTabletService::offline() { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { DestroyMemtableAndMemberOperator clean_mem_op(this); if (OB_FAIL(tablet_id_set_.foreach(clean_mem_op))) { LOG_WARN("fail to clean memtables", K(ret), K(clean_mem_op.cur_tablet_id_)); } } return ret; } int ObLSTabletService::online() { return OB_SUCCESS; } int ObLSTabletService::replay( const void *buffer, const int64_t nbytes, const palf::LSN &lsn, const int64_t ts_ns) { int ret = OB_SUCCESS; int64_t pos = 0; const char *log_buf = static_cast(buffer); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_FAIL(ObTabletServiceClogReplayExecutor::execute(ls_, log_buf, nbytes, pos, lsn, ts_ns))) { LOG_WARN("replay tablet log error", K(ret), K(lsn), K(ts_ns)); } return ret; } void ObLSTabletService::switch_to_follower_forcedly() { // TODO } int ObLSTabletService::switch_to_leader() { int ret = OB_SUCCESS; //TODO return ret; } int ObLSTabletService::switch_to_follower_gracefully() { int ret = OB_SUCCESS; //TODO return ret; } int ObLSTabletService::resume_leader() { int ret = OB_SUCCESS; //TODO return ret; } int ObLSTabletService::flush(int64_t rec_log_ts) { UNUSED(rec_log_ts); return OB_SUCCESS; } int64_t ObLSTabletService::get_rec_log_ts() { return INT64_MAX; } int ObLSTabletService::prepare_for_safe_destroy() { int ret = OB_SUCCESS; if (OB_FAIL(delete_all_tablets())) { LOG_WARN("fail to delete all tablets", K(ret)); } return ret; } int ObLSTabletService::safe_to_destroy(bool &is_safe) { int ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); bool all_table_released = false; is_safe = true; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { int64_t tx_data_memtable_mgr_ref = tx_data_memtable_mgr_.get_ref(); int64_t tx_ctx_memtable_mgr_ref = tx_ctx_memtable_mgr_.get_ref(); int64_t lock_memtable_mgr_ref = lock_memtable_mgr_.get_ref(); if (0 != tx_data_memtable_mgr_ref || 0 != tx_ctx_memtable_mgr_ref || 0 != lock_memtable_mgr_ref) { if (REACH_TIME_INTERVAL(60L * 1000000)) { // 60s LOG_INFO("inner tablet memtable mgr can't destroy", K(tx_data_memtable_mgr_ref), K(tx_ctx_memtable_mgr_ref), K(lock_memtable_mgr_ref)); } is_safe = false; } else { tx_data_memtable_mgr_.destroy(); tx_ctx_memtable_mgr_.destroy(); lock_memtable_mgr_.destroy(); } if (is_safe) { if (OB_FAIL(t3m->gc_tables_in_queue(all_table_released))) { LOG_WARN("failed to check all table released", K(ret)); is_safe = false; } else { is_safe = all_table_released; } } } return ret; } int ObLSTabletService::batch_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const int64_t create_scn, const bool is_replay_clog) { ALLOW_NEXT_LOG(); LOG_INFO("batch create tablet", K(arg), K(is_replay_clog)); int ret = OB_SUCCESS; ObTimeGuard time_guard("CreateTablets", 3000000/*3 seconds*/); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!arg.is_valid()) || OB_UNLIKELY(create_scn <= OB_INVALID_TIMESTAMP)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(arg), K(create_scn)); } else if (OB_UNLIKELY(arg.id_ != ls_->get_ls_id())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls id does not equal", K(ret), "arg ls id", arg.id_, "ls id", ls_->get_ls_id()); } else { NonLockedHashSet data_tablet_id_set; NonLockedHashSet existed_tablet_id_set; if (OB_FAIL(data_tablet_id_set.create(arg.tablets_.count()))) { LOG_WARN("failed to init data tablet id set", K(ret), K(arg)); } else if (OB_FAIL(existed_tablet_id_set.create(arg.get_tablet_count()))) { LOG_WARN("failed to init existed tablet id set", K(ret), K(arg)); } else if (OB_FAIL(verify_tablets(arg, existed_tablet_id_set))) { LOG_WARN("failed to verify tablets", K(ret), K(arg)); } else if (!is_replay_clog && existed_tablet_id_set.size() > 0) { // NOT in clog replay procedure, existed tablet id is NOT allowed ret = OB_TABLET_EXIST; LOG_WARN("tablet already exists", K(ret), K(arg), K(existed_tablet_id_set)); } else if (is_replay_clog && existed_tablet_id_set.size() > 0) { time_guard.click("Verify"); // in clog replay procedure and tablet id exists // we should build new arg to do batch create tablets obrpc::ObBatchCreateTabletArg new_arg; if (OB_FAIL(build_batch_create_tablet_arg(arg, existed_tablet_id_set, new_arg))) { LOG_WARN("failed to build new batch create tablet arg", K(ret), K(arg), K(existed_tablet_id_set)); } else if (new_arg.get_tablet_count() == 0) { // all tablet ids exist, no need to create, do nothing } else if (OB_UNLIKELY(!new_arg.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("new arg is invalid", K(ret), K(new_arg)); } else if (FALSE_IT(time_guard.click("BuildArg"))) { } else if (OB_FAIL(do_batch_create_tablets(new_arg, create_scn, is_replay_clog, time_guard, data_tablet_id_set))) { LOG_WARN("failed to do batch create tablets", K(ret), K(create_scn), K(new_arg), K(is_replay_clog)); } } else { if (OB_FAIL(do_batch_create_tablets(arg, create_scn, is_replay_clog, time_guard, data_tablet_id_set))) { LOG_WARN("failed to do batch create tablets", K(ret), K(arg), K(create_scn), K(is_replay_clog)); } } } return ret; } int ObLSTabletService::do_batch_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const int64_t create_scn, const bool is_replay_clog, common::ObTimeGuard &time_guard, NonLockedHashSet &data_tablet_id_set) { UNUSED(is_replay_clog); int ret = OB_SUCCESS; const share::ObLSID &ls_id = arg.id_; ObSArray tablet_handles; if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(arg)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) { const obrpc::ObCreateTabletInfo &info = arg.tablets_[i]; if (OB_FAIL(create_tablet(ls_id, arg, create_scn, info, tablet_handles, data_tablet_id_set))) { LOG_WARN("failed to create tablet", K(ret), K(create_scn), K(info), K(i)); } } time_guard.click("CreTablet"); if (OB_FAIL(ret)) { } else { // bucket lock, protect write slog and concurrent tablet CAS operation ObSArray all_tablet_id_hash_array; ObMultiBucketLockGuard lock_guard(bucket_lock_, true/*is_write_lock*/); if (OB_FAIL(get_all_tablet_id_hash_array(arg, all_tablet_id_hash_array))) { LOG_WARN("failed to get all tablet id array", K(ret), K(arg)); } else if (OB_FAIL(lock_guard.lock_multi_buckets(all_tablet_id_hash_array))) { LOG_WARN("failed to lock multi buckets", K(ret)); } time_guard.click("Lock"); if (OB_FAIL(ret)) { } else if (OB_FAIL(add_batch_tablets(ls_id, data_tablet_id_set, tablet_handles))) { LOG_ERROR("fatal error, failed to add batch tablets", K(ret), K(tablet_handles), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } time_guard.click("WrSlog"); } if (OB_FAIL(ret)) { // remove tablets just created int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(post_handle_batch_create_tablets(arg))) { LOG_ERROR("fatal error, failed to remove redundant tablets", K(tmp_ret), K(arg), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); // remove redundant tablets should never fail } } } return ret; } int ObLSTabletService::batch_remove_tablets( const obrpc::ObBatchRemoveTabletArg &arg, const bool is_replay_clog) { LOG_INFO("batch remove tablet", K(arg), K(is_replay_clog)); int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); ObStorageLogger *slogger = MTL(ObStorageLogger*); ObSArray all_tablet_id_hash_array; common::ObLinearHashMap delete_tablet_infos; ObTimeGuard time_guard("RemoveTablets", 3000000/*3 seconds*/); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(arg)); } else if (OB_UNLIKELY(arg.id_ != ls_->get_ls_id())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("log stream id does not equal", K(ret), "arg ls id", arg.id_, "log stream id", ls_->get_ls_id()); } else if (OB_FAIL(delete_tablet_infos.init("tablet info", tenant_id))) { LOG_WARN("failed to init hash map", K(ret), K(tenant_id)); } else if (OB_FAIL(parse_and_verify_delete_tablet_info(arg, delete_tablet_infos))) { LOG_WARN("failed to parse and verify delete tablet info", K(ret), K(arg)); } else if (OB_FAIL(get_all_tablet_id_hash_array(delete_tablet_infos, all_tablet_id_hash_array))) { LOG_WARN("failed to get all tablet id array", K(ret)); } else { time_guard.click("Prepare"); HashMapTabletDeleteFunctor functor(ls_); ObMultiBucketLockGuard lock_guard(bucket_lock_, true/*is_write_lock*/); if (OB_UNLIKELY(all_tablet_id_hash_array.empty())) { LOG_INFO("all tablets have been removed, do nothing", K(arg)); } else if (OB_FAIL(lock_guard.lock_multi_buckets(all_tablet_id_hash_array))) { LOG_WARN("failed to lock multi buckets", K(ret)); } else if (FALSE_IT(time_guard.click("Lock"))) { } else if (OB_FAIL(delete_tablet_infos.for_each(functor))) { LOG_WARN("failed to iterate hash map", K(ret)); } else if (FALSE_IT(time_guard.click("Collect"))) { } else if (OB_FAIL(slogger->write_log(functor.get_slog_params()))) { LOG_WARN("failed to write slog for remove tablets", K(ret), K(functor)); } else if (FALSE_IT(time_guard.click("WrSlog"))) { } else if (OB_FAIL(handle_remove_tablets(functor.get_slog_params(), delete_tablet_infos))) { LOG_ERROR("failed to handle remove tablets", K(ret), K(functor), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else if (FALSE_IT(time_guard.click("Execute"))) { } } return ret; } int ObLSTabletService::delete_all_tablets() { int ret = OB_SUCCESS; if (OB_NOT_NULL(ls_)) { const ObLSID &ls_id = ls_->get_ls_id(); ObSArray tablet_id_array; GetAllTabletIDOperator op(tablet_id_array); common::ObBucketWLockAllGuard lock_guard(bucket_lock_); if (OB_FAIL(tablet_id_set_.foreach(op))) { LOG_WARN("failed to traverse tablet id set", K(ret), K(ls_id)); } else if (tablet_id_array.empty()) { // tablet id array is empty, do nothing } else { for (int64_t i = 0; OB_SUCC(ret) && i < tablet_id_array.count(); ++i) { const ObTabletID &tablet_id = tablet_id_array.at(i); if (OB_FAIL(do_remove_tablet(ls_id, tablet_id))) { LOG_ERROR("failed to do remove tablet", K(ret), K(ls_id), K(tablet_id)); ob_usleep(1000 * 1000); ob_abort(); } } if (OB_SUCC(ret)) { report_tablet_to_rs(tablet_id_array); } } } return ret; } int ObLSTabletService::remove_tablets(const common::ObIArray &tablet_id_array) { int ret = OB_SUCCESS; const int64_t tablet_cnt = tablet_id_array.count(); ObSArray all_tablet_id_hash_array; ObSArray tablet_ids; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(0 == tablet_cnt)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args, tablet id array is empty", K(ret), K(tablet_id_array)); } else if (OB_FAIL(all_tablet_id_hash_array.reserve(tablet_cnt))) { LOG_WARN("failed to reserve memory for array", K(ret), K(tablet_cnt)); } else if (OB_FAIL(tablet_ids.reserve(tablet_cnt))) { LOG_WARN("failed to reserve memory for array", K(ret), K(tablet_cnt)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < tablet_cnt; ++i) { const ObTabletID &tablet_id = tablet_id_array.at(i); if (OB_FAIL(all_tablet_id_hash_array.push_back(tablet_id.hash()))) { LOG_WARN("failed to push back tablet id hash value", K(ret), K(tablet_id)); } } } if (OB_SUCC(ret)) { ObMetaDiskAddr tablet_addr; ObMultiBucketLockGuard lock_guard(bucket_lock_, true/*is_write_lock*/); if (OB_FAIL(lock_guard.lock_multi_buckets(all_tablet_id_hash_array))) { LOG_WARN("failed to lock multi buckets", K(ret)); } else { const share::ObLSID &ls_id = ls_->get_ls_id(); ObTabletHandle tablet_handle; ObTabletMapKey key; key.ls_id_ = ls_id; // check tablet existence for (int64_t i = 0; OB_SUCC(ret) && i < tablet_cnt; ++i) { const ObTabletID &tablet_id = tablet_id_array.at(i); key.tablet_id_ = tablet_id; tablet_addr.reset(); if (OB_FAIL(ObTabletCreateDeleteHelper::get_tablet(key, tablet_handle))) { if (OB_TABLET_NOT_EXIST == ret) { ret = OB_SUCCESS; LOG_INFO("tablet does not exist, maybe already deleted", K(ret), K(key)); } else { LOG_WARN("failed to get tablet", K(ret), K(key)); } } else if (OB_FAIL(tablet_handle.get_obj()->get_meta_disk_addr(tablet_addr))) { LOG_WARN("failed to get tablet addr", K(ret), K(key)); } else if (!tablet_addr.is_disked()) { if (OB_FAIL(do_remove_tablet(ls_id, tablet_id))) { LOG_WARN("failed to remove non disked tablet from memory", K(ret), K(key)); } else { FLOG_INFO("succeeded to remove non disked tablet from memory", K(ret), K(key)); } } else if (OB_FAIL(tablet_ids.push_back(tablet_id))) { LOG_WARN("failed to push back tablet id", K(ret), K(tablet_id)); } } // write slog and do remove tablet if (OB_FAIL(ret)) { } else if (tablet_ids.empty()) { LOG_INFO("all tablets already deleted, do nothing", K(ret), K(ls_id), K(tablet_id_array)); } else if (OB_FAIL(ObTabletSlogHelper::write_remove_tablet_slog(ls_id, tablet_ids))) { LOG_WARN("failed to write remove tablet slog", K(ret), K(ls_id), K(tablet_ids)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < tablet_ids.count(); ++i) { const ObTabletID &tablet_id = tablet_ids.at(i); if (OB_FAIL(do_remove_tablet(ls_id, tablet_id))) { LOG_ERROR("failed to do remove tablet", K(ret), K(ls_id), K(tablet_id)); ob_usleep(1000 * 1000); ob_abort(); } } if (OB_SUCC(ret)) { report_tablet_to_rs(tablet_ids); } } } } return ret; } // TODO(yunshan.tys) cope with failure of deleting tablet (tablet hasn't been loaded from disk) int ObLSTabletService::do_remove_tablet( const share::ObLSID &ls_id, const ObTabletID &tablet_id) { int ret = OB_SUCCESS; const ObTabletMapKey key(ls_id, tablet_id); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_FAIL(tablet_id_set_.erase(tablet_id))) { if (OB_HASH_NOT_EXIST == ret) { // tablet id is already erased ret = OB_SUCCESS; } else { LOG_WARN("fail to erase tablet id from set", K(ret), K(ls_id), K(tablet_id)); } } if (OB_SUCC(ret)) { // loop retry to delete tablet from t3m while (OB_FAIL(t3m->del_tablet(key))) { if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { LOG_ERROR("failed to delete tablet from t3m", K(ret), K(ls_id), K(tablet_id)); } } } if (OB_SUCC(ret)) { FLOG_INFO("succeeded to remove tablet", K(ret), K(ls_id), K(tablet_id)); } return ret; } int ObLSTabletService::get_tablet( const ObTabletID &tablet_id, ObTabletHandle &handle, const int64_t timeout_us) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || timeout_us < ObTabletCommon::DIRECT_GET_COMMITTED_TABLET_TIMEOUT_US)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(timeout_us)); } else if (OB_FAIL(check_and_get_tablet(tablet_id, handle, timeout_us))) { if (OB_TABLET_NOT_EXIST == ret) { LOG_DEBUG("failed to check and get tablet", K(ret), K(tablet_id), K(timeout_us)); } else { LOG_WARN("failed to check and get tablet", K(ret), K(tablet_id), K(timeout_us)); } } return ret; } int ObLSTabletService::get_tablet_addr(const ObTabletMapKey &key, ObMetaDiskAddr &addr) { int ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { // 加读锁保证ckpt与对tablet的操作串行, 防止ckpt时tablet操作处在 // slog已经写入而tablet addr未更新的状态,导致slog丢失 ObBucketHashRLockGuard lock_guard(bucket_lock_, key.tablet_id_.hash()); if (OB_FAIL(t3m->get_tablet_addr(key, addr))) { LOG_WARN("fail to get tablet addr", K(ret), K(key)); } } return ret; } /** int ObLSTabletService::report_update_tablet( const ObTabletHandle *old_tablet_handle, const ObTabletHandle *new_tablet_handle) { int ret = OB_SUCCESS; int64_t data_size_before = 0; int64_t used_size_before = 0; int64_t data_size_after = 0; int64_t used_size_after = 0; ObDiskReportFileType file_type; if (OB_UNLIKELY(nullptr == old_tablet_handle && nullptr == new_tablet_handle)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(old_tablet_handle), KP(new_tablet_handle)); } else { do { if (nullptr != old_tablet_handle && old_tablet_handle->is_valid() && OB_FAIL(old_tablet_handle->get_obj()->get_sstables_size(data_size_before, used_size_before))) { if (OB_ALLOCATE_MEMORY_FAILED == ret) { ret = OB_EAGAIN; LOG_WARN("fail to allocate memory, retry", K(ret)); } else { LOG_WARN("failed to get old tablet's disk usage", K(ret), K(data_size_before), K(used_size_before)); } } else if (nullptr != new_tablet_handle && new_tablet_handle->is_valid() && OB_FAIL(new_tablet_handle->get_obj()->get_sstables_size(data_size_after, used_size_after))) { if (OB_ALLOCATE_MEMORY_FAILED == ret) { ret = OB_EAGAIN; LOG_WARN("fail to allocate memory, retry", K(ret)); } else { LOG_WARN("failed to get new tablet's disk usage", K(ret), K(data_size_after), K(used_size_after)); } } else { if (nullptr != old_tablet_handle) { parse_file_type(*old_tablet_handle, file_type); } else { parse_file_type(*new_tablet_handle, file_type); } if (OB_FAIL(disk_reporter_->set_tenant_data_usage(data_size_before, used_size_before, data_size_after, used_size_after, file_type))) { LOG_WARN("failed to set tenant's data usage", K(ret), K(data_size_before), K(used_size_before), K(data_size_after), K(used_size_after), K(file_type)); } } } while (OB_EAGAIN == ret); } return ret; } */ void ObLSTabletService::report_tablet_to_rs( const common::ObIArray &tablet_id_array) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); const share::ObLSID &ls_id = ls_->get_ls_id(); // ignore ret on purpose for (int64_t i = 0; i < tablet_id_array.count(); ++i) { const common::ObTabletID &tablet_id = tablet_id_array.at(i); if (tablet_id.is_ls_inner_tablet()) { // no need to report for ls inner tablet continue; } else if (OB_FAIL(rs_reporter_->submit_tablet_update_task(tenant_id, ls_id, tablet_id))) { LOG_WARN("failed to report tablet info", KR(ret), K(tenant_id), K(ls_id), K(tablet_id)); } } } int ObLSTabletService::table_scan(ObTableScanIterator &iter, ObTableScanParam ¶m) { int ret = OB_SUCCESS; NG_TRACE(S_table_scan_begin); ObTabletHandle data_tablet; AllowToReadMgr::AllowToReadInfo read_info; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) { } else if (!read_info.allow_to_read()) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } else if (OB_FAIL(prepare_scan_table_param(param, *(MTL(ObTenantSchemaService*)->get_schema_service())))) { LOG_WARN("failed to prepare scan table param", K(ret), K(param)); } else if (OB_FAIL(get_tablet_with_timeout(param.tablet_id_, data_tablet, param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(param)); } else if (OB_FAIL(inner_table_scan(data_tablet, iter, param))) { LOG_WARN("failed to do table scan", K(ret), KP(&iter), K(param)); } else { bool is_same = false; allow_to_read_mgr_.check_read_info_same(read_info, is_same); if (!is_same) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_), KP(&iter)); } } NG_TRACE(S_table_scan_end); return ret; } int ObLSTabletService::table_rescan(ObTableScanParam ¶m, ObNewRowIterator *result) { int ret = OB_SUCCESS; NG_TRACE(S_table_rescan_begin); ObTabletHandle data_tablet; AllowToReadMgr::AllowToReadInfo read_info; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K(result), K_(is_inited)); } else if (OB_ISNULL(result)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret)); } else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) { } else if (!read_info.allow_to_read()) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } else if (OB_FAIL(prepare_scan_table_param(param, *(MTL(ObTenantSchemaService*)->get_schema_service())))) { LOG_WARN("failed to prepare scan table param", K(ret), K(result), K(param)); } else { ObTableScanIterator *iter = static_cast(result); if (OB_FAIL(get_tablet_with_timeout(param.tablet_id_, data_tablet, param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(param)); } else if (OB_FAIL(inner_table_scan(data_tablet, *iter, param))) { LOG_WARN("failed to do table scan", K(ret), K(result), K(param)); } } if (OB_SUCC(ret)) { bool is_same = false; allow_to_read_mgr_.check_read_info_same(read_info, is_same); if (!is_same) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } } NG_TRACE(S_table_rescan_end); return ret; } int ObLSTabletService::refresh_tablet_addr( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, const ObMetaDiskAddr &new_addr, ObTabletHandle &tablet_handle) { int ret = OB_SUCCESS; const ObTabletMapKey key(ls_id, tablet_id); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_UNLIKELY(!new_addr.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(new_addr)); } while (OB_SUCC(ret)) { ret = tablet_id_set_.set(tablet_id); if (OB_SUCC(ret)) { break; } else if (OB_ALLOCATE_MEMORY_FAILED == ret) { usleep(100 * 1000); if (REACH_COUNT_INTERVAL(100)) { LOG_ERROR("no memory for tablet id set, retry", K(ret), K(tablet_id)); } ret = OB_SUCCESS; } else { LOG_WARN("fail to set tablet id set", K(ret), K(tablet_id)); } } if (OB_FAIL(ret)) { } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, new_addr, tablet_handle, tablet_handle))) { LOG_WARN("failed to add tablet to meta mem mgr", K(ret), K(key), K(new_addr), K(tablet_handle)); } return ret; } int ObLSTabletService::update_tablet_object_and_addr( ObTabletHandle &new_tablet_handle, const ObMetaDiskAddr &new_addr) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); share::ObLSID ls_id; common::ObTabletID tablet_id; ObTabletHandle old_tablet_handle; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_UNLIKELY(!new_tablet_handle.is_valid()) || OB_UNLIKELY(!new_addr.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(new_tablet_handle), K(new_addr)); } else if (FALSE_IT(ls_id = new_tablet_handle.get_obj()->tablet_meta_.ls_id_)) { // do nothing } else if (FALSE_IT(tablet_id = new_tablet_handle.get_obj()->tablet_meta_.tablet_id_)) { // do nothing } else if (OB_FAIL(direct_get_tablet(tablet_id, old_tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(ObTabletMapKey(ls_id, tablet_id), new_addr, old_tablet_handle, new_tablet_handle))) { LOG_WARN("failed to compare and swap tablet", K(ret), K(ls_id), K(tablet_id), K(new_addr)); } else { FLOG_INFO("succeeded to update tablet object and addr", K(ret), K(tenant_id), K(ls_id), K(tablet_id)); } return ret; } int ObLSTabletService::trim_old_tablets(const ObTabletID &tablet_id) { int ret = OB_SUCCESS; ObTabletHandle tablet_handle_head; ObIAllocator &allocator = MTL(ObTenantMetaMemMgr*)->get_tenant_allocator(); if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle_head))) { LOG_WARN("failed to check and get tablet", K(ret), K(tablet_id)); } else if (OB_UNLIKELY(!tablet_handle_head.get_obj()->get_tablet_meta().has_next_tablet_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("doesn't have old tablet", K(ret), "tablet_meta", tablet_handle_head.get_obj()->get_tablet_meta()); } else { ObTablet *tablet_head = tablet_handle_head.get_obj(); tablet_head->trim_tablet_list(); ObMetaDiskAddr disk_addr; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle_head, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(tablet_handle_head), K(disk_addr)); } else if (OB_FAIL(MTL(ObTenantMetaMemMgr*)->compare_and_swap_tablet(key, disk_addr, tablet_handle_head, tablet_handle_head))) { LOG_ERROR("failed to compare and swap tablet", K(key), K(disk_addr), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else { FLOG_INFO("succeeded to trim tablets list", K(key), K(disk_addr)); } } return ret; } int ObLSTabletService::rollback_rebuild_tablet(const ObTabletID &tablet_id) { int ret = OB_SUCCESS; ObTabletHandle tablet_handle_head; if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle_head))) { LOG_WARN("failed to check and get tablet", K(ret), K(tablet_id)); } else { ObTablet *tablet_head = tablet_handle_head.get_obj(); ObTabletHandle next_tablet_handle; ObMetaObj meta_obj; tablet_head->get_next_tablet_guard().get_obj(meta_obj); next_tablet_handle.set_obj(meta_obj); ObMetaDiskAddr disk_addr; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(next_tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(next_tablet_handle), K(disk_addr)); } else if (OB_FAIL(MTL(ObTenantMetaMemMgr*)->compare_and_swap_tablet(key, disk_addr, tablet_handle_head, next_tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else { FLOG_INFO("succeeded to rollback rebuild", K(key), K(disk_addr)); } } return ret; } int ObLSTabletService::rebuild_tablet_with_old( const ObMigrationTabletParam &mig_tablet_param, ObMetaObjGuard &tablet_guard) { int ret = OB_SUCCESS; ObTabletHandle new_tablet_handle; ObTablet *new_tablet = nullptr; ObFreezer *freezer = ls_->get_freezer(); ObMetaDiskAddr disk_addr; ObTabletHandle old_tablet_handle; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); const common::ObTabletID &tablet_id = mig_tablet_param.tablet_id_; const share::ObLSID &ls_id = mig_tablet_param.ls_id_; const ObTabletMapKey key(ls_id, tablet_id); if (OB_FAIL(direct_get_tablet(tablet_id, old_tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(key)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, new_tablet_handle, true/*only acquire*/))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (FALSE_IT(new_tablet = new_tablet_handle.get_obj())) { } else if (OB_FAIL(new_tablet->init(mig_tablet_param, true/*is_update*/, freezer))) { LOG_WARN("failed to init tablet", K(ret), K(mig_tablet_param)); } else if (FALSE_IT(new_tablet->set_next_tablet_guard(tablet_guard))) { } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(new_tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(new_tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, old_tablet_handle, new_tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr)); ob_usleep(1000 * 1000); ob_abort(); } else if (OB_FAIL(new_tablet->start_ddl_if_need())) { LOG_WARN("start ddl if need failed", K(ret), K(key)); } else { LOG_INFO("rebuild tablet with old succeed", K(ret), K(key), K(disk_addr)); } return ret; } int ObLSTabletService::migrate_update_tablet( const ObMigrationTabletParam &mig_tablet_param) { int ret = OB_SUCCESS; const common::ObTabletID &tablet_id = mig_tablet_param.tablet_id_; const share::ObLSID &ls_id = mig_tablet_param.ls_id_; const ObTabletMapKey key(mig_tablet_param.ls_id_, mig_tablet_param.tablet_id_); ObTabletHandle new_tablet_handle; ObTablet *new_tablet = nullptr; ObMetaDiskAddr disk_addr; ObFreezer *freezer = ls_->get_freezer(); ObTabletHandle old_tablet_handle; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_FAIL(direct_get_tablet(tablet_id, old_tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(key)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, new_tablet_handle, true/*only acquire*/))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (FALSE_IT(new_tablet = new_tablet_handle.get_obj())) { } else if (OB_FAIL(new_tablet->init(mig_tablet_param, true/*is_update*/, freezer))) { LOG_WARN("failed to init tablet", K(ret), K(mig_tablet_param)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(new_tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(new_tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, old_tablet_handle, new_tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr)); ob_usleep(1000 * 1000); ob_abort(); } else if (OB_FAIL(new_tablet->start_ddl_if_need())) { LOG_WARN("start ddl if need failed", K(ret)); } else { LOG_INFO("migrate update tablet succeed", K(ret), K(key), K(disk_addr)); } return ret; } int ObLSTabletService::migrate_create_tablet( const ObMigrationTabletParam &mig_tablet_param, ObTabletHandle &handle) { int ret = OB_SUCCESS; const share::ObLSID &ls_id = mig_tablet_param.ls_id_; const common::ObTabletID &tablet_id = mig_tablet_param.tablet_id_; const ObTabletMapKey key(ls_id, tablet_id); ObFreezer *freezer = ls_->get_freezer(); ObTabletHandle tablet_handle; ObTablet *tablet = nullptr; ObMetaDiskAddr disk_addr; if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, tablet_handle))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (FALSE_IT(tablet = tablet_handle.get_obj())) { } else if (OB_FAIL(tablet->init(mig_tablet_param, false/*is_update*/, freezer))) { LOG_WARN("failed to init tablet", K(ret), K(mig_tablet_param)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("failed to write create tablet slog", K(ret), K(tablet_handle), K(disk_addr)); } else if (OB_FAIL(refresh_tablet_addr(ls_id, tablet_id, disk_addr, tablet_handle))) { LOG_WARN("failed to refresh tablet addr", K(ret), K(ls_id), K(tablet_id), K(disk_addr), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else if (OB_FAIL(tablet->start_ddl_if_need())) { LOG_WARN("start ddl if need failed", K(ret)); } else if (OB_FAIL(try_pin_tablet_if_needed(tablet_handle))) { LOG_WARN("failed to try pin tablet", K(ret), K(ls_id), K(tablet_id)); } else { LOG_INFO("migrate create tablet succeed", K(ret), K(key), K(disk_addr)); } if (OB_SUCC(ret)) { handle = tablet_handle; } else { int tmp_ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_TMP_FAIL(tablet_id_set_.erase(tablet_id))) { if (OB_HASH_NOT_EXIST != tmp_ret) { LOG_ERROR("fail to erase tablet id from set", K(tmp_ret), K(tablet_id)); } } if (OB_TMP_FAIL(t3m->del_tablet(key))) { LOG_WARN("fail to erase tablet from meta memory manager", K(tmp_ret), K(key)); } } return ret; } int ObLSTabletService::update_tablet_table_store( const common::ObTabletID &tablet_id, const ObUpdateTableStoreParam ¶m, ObTabletHandle &handle) { int ret = OB_SUCCESS; const share::ObLSID &ls_id = ls_->get_ls_id(); const ObTabletMapKey key(ls_id, tablet_id); ObTabletHandle old_tablet_handle; ObTabletHandle new_tablet_handle; ObTablet *new_tablet = nullptr; ObTimeGuard time_guard("UpdateTableStore", 3000000/*3 seconds*/); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || !param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(param)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, new_tablet_handle, true/*only acquire*/))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_TABLET_NOT_EXIST; } else { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } } else { new_tablet = new_tablet_handle.get_obj(); time_guard.click("Acquire"); ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); time_guard.click("Lock"); if (OB_FAIL(direct_get_tablet(tablet_id, old_tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else { time_guard.click("GetOld"); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObTablet *old_tablet = old_tablet_handle.get_obj(); const ObTabletTxMultiSourceDataUnit *tx_data = nullptr; const ObTabletBindingInfo *binding_info = nullptr; const ObTabletAutoincSeq *auto_inc_seq = nullptr; ObMetaDiskAddr disk_addr; if (OB_FAIL(choose_msd(param, *old_tablet, tx_data, binding_info, auto_inc_seq))) { LOG_WARN("failed to choose msd", K(ret), K(param), KPC(old_tablet)); } else if (OB_FAIL(new_tablet->init(param, *old_tablet, *tx_data, *binding_info, *auto_inc_seq))) { LOG_WARN("failed to init tablet", K(ret), K(param), KPC(old_tablet), KPC(tx_data), KPC(binding_info), KPC(auto_inc_seq)); } else if (FALSE_IT(time_guard.click("InitNew"))) { } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(new_tablet_handle, disk_addr))) { LOG_WARN("fail to write update tablet slog", K(ret), K(new_tablet_handle), K(disk_addr)); } else if (FALSE_IT(time_guard.click("WrSlog"))) { } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, old_tablet_handle, new_tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr), K(old_tablet_handle), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else if (FALSE_IT(time_guard.click("CASwap"))) { } else { handle = new_tablet_handle; LOG_INFO("succeeded to build new tablet", K(ret), K(key), K(disk_addr), K(param), K(handle)); } } } return ret; } int ObLSTabletService::choose_msd( const ObUpdateTableStoreParam ¶m, const ObTablet &old_tablet, const ObTabletTxMultiSourceDataUnit *&tx_data, const ObTabletBindingInfo *&binding_info, const share::ObTabletAutoincSeq *&auto_inc_seq) { int ret = OB_SUCCESS; const ObTabletMeta &old_tablet_meta = old_tablet.get_tablet_meta(); tx_data = ¶m.tx_data_; binding_info = ¶m.binding_info_; auto_inc_seq = ¶m.auto_inc_seq_; if (!tx_data->is_valid()) { tx_data = &old_tablet_meta.tx_data_; } if (!binding_info->is_valid()) { binding_info = &old_tablet_meta.ddl_data_; } if (!auto_inc_seq->is_valid()) { auto_inc_seq = &old_tablet_meta.autoinc_seq_; } return ret; } int ObLSTabletService::update_tablet_report_status(const common::ObTabletID &tablet_id) { int ret = OB_SUCCESS; ObTabletHandle tablet_handle; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id)); } else if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else { ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObMetaDiskAddr disk_addr; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); ObTablet *tablet = tablet_handle.get_obj(); if (tablet->tablet_meta_.report_status_.need_report()) { tablet->tablet_meta_.report_status_.cur_report_version_ = tablet->tablet_meta_.report_status_.merge_snapshot_version_; if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, tablet_handle, tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else { LOG_INFO("succeeded to build new tablet", K(ret), K(key), K(disk_addr), K(tablet_handle)); } } else { FLOG_INFO("tablet doesn't need to report", K(ret)); } } return ret; } int ObLSTabletService::update_tablet_restore_status( const common::ObTabletID &tablet_id, const ObTabletRestoreStatus::STATUS &restore_status) { int ret = OB_SUCCESS; ObTabletHandle tablet_handle; ObTabletRestoreStatus::STATUS current_status = ObTabletRestoreStatus::RESTORE_STATUS_MAX; bool can_change = false; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid()) || OB_UNLIKELY(!ObTabletRestoreStatus::is_valid(restore_status))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(restore_status)); } else if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else { ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObMetaDiskAddr disk_addr; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); ObTablet *tablet = tablet_handle.get_obj(); if (OB_FAIL(tablet->tablet_meta_.ha_status_.get_restore_status(current_status))) { LOG_WARN("failed to get restore status", K(ret), KPC(tablet)); } else if (OB_FAIL(ObTabletRestoreStatus::check_can_change_status(current_status, restore_status, can_change))) { LOG_WARN("failed to check can change status", K(ret), K(current_status), K(restore_status), KPC(tablet)); } else if (!can_change) { ret = OB_ERR_UNEXPECTED; LOG_WARN("can not change restore status", K(ret), K(current_status), K(restore_status), KPC(tablet)); } else if (OB_FAIL(tablet->tablet_meta_.ha_status_.set_restore_status(restore_status))) { LOG_WARN("failed to set restore status", K(ret), K(restore_status), KPC(tablet)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, tablet_handle, tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(key), K(disk_addr)); ob_abort(); } else { LOG_INFO("succeeded to build new tablet", K(ret), K(key), K(disk_addr), K(restore_status), K(tablet_handle)); } } return ret; } int ObLSTabletService::update_tablet_ha_data_status( const common::ObTabletID &tablet_id, const ObTabletDataStatus::STATUS &data_status) { int ret = OB_SUCCESS; ObTabletHandle tablet_handle; ObTabletDataStatus::STATUS current_status = ObTabletDataStatus::DATA_STATUS_MAX; bool can_change = false; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid()) || OB_UNLIKELY(!ObTabletDataStatus::is_valid(data_status))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(data_status)); } else if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else { ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObMetaDiskAddr disk_addr; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); ObTablet *tablet = tablet_handle.get_obj(); if (OB_FAIL(tablet->tablet_meta_.ha_status_.get_data_status(current_status))) { LOG_WARN("failed to get data status", K(ret), KPC(tablet)); } else if (OB_FAIL(ObTabletDataStatus::check_can_change_status(current_status, data_status, can_change))) { LOG_WARN("failed to check can change status", K(ret), K(current_status), K(data_status), KPC(tablet)); } else if (!can_change) { ret = OB_ERR_UNEXPECTED; LOG_WARN("can not change data status", K(ret), K(current_status), K(data_status), KPC(tablet)); } else if (current_status == data_status) { LOG_INFO("data status is same, skip update", K(tablet_id), K(current_status), K(data_status)); } else if (OB_FAIL(tablet->tablet_meta_.ha_status_.set_data_status(data_status))) { LOG_WARN("failed to set data status", K(ret), KPC(tablet), K(data_status)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, tablet_handle, tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else { LOG_INFO("succeeded to build new tablet", K(ret), K(key), K(disk_addr), K(data_status), K(tablet_handle)); } } return ret; } int ObLSTabletService::update_tablet_ha_expected_status( const common::ObTabletID &tablet_id, const ObTabletExpectedStatus::STATUS &expected_status) { int ret = OB_SUCCESS; ObTabletHandle tablet_handle; ObTabletExpectedStatus::STATUS current_status = ObTabletExpectedStatus::EXPECTED_STATUS_MAX; bool can_change = false; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid()) || OB_UNLIKELY(!ObTabletExpectedStatus::is_valid(expected_status))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(expected_status)); } else if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else { ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObMetaDiskAddr disk_addr; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); ObTablet *tablet = tablet_handle.get_obj(); if (OB_FAIL(tablet->tablet_meta_.ha_status_.get_expected_status(current_status))) { LOG_WARN("failed to get data status", K(ret), KPC(tablet)); } else if (expected_status == current_status) { LOG_INFO("tablet ha expected status is same, no need update", K(tablet_id), K(current_status), K(expected_status)); } else if (OB_FAIL(ObTabletExpectedStatus::check_can_change_status(current_status, expected_status, can_change))) { LOG_WARN("failed to check can change status", K(ret), K(current_status), K(expected_status), KPC(tablet)); } else if (!can_change) { ret = OB_ERR_UNEXPECTED; LOG_WARN("can not change meta status", K(ret), K(current_status), K(expected_status), KPC(tablet)); } else { if (OB_FAIL(tablet->tablet_meta_.ha_status_.set_expected_status(expected_status))) { LOG_WARN("failed to set ha meta status", K(ret), KPC(tablet), K(expected_status)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("failed to write update tablet slog", K(ret), K(tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, tablet_handle, tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr), K(lbt())); usleep(1000 * 1000); ob_abort(); } else { LOG_INFO("succeeded to update tablet meta status", K(ret), K(key), K(disk_addr), K(expected_status), KPC(tablet)); } } } return ret; } int ObLSTabletService::handle_remove_tablets( const common::ObIArray &slog_params, const common::ObLinearHashMap &delete_tablet_infos) { int ret = OB_SUCCESS; const int32_t put_tablet_cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_TENANT_STORAGE, ObRedoLogSubType::OB_REDO_LOG_PUT_TABLET); const int32_t delete_tablet_cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_TENANT_STORAGE, ObRedoLogSubType::OB_REDO_LOG_DELETE_TABLET); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); for (int64_t i = 0; OB_SUCC(ret) && i < slog_params.count(); ++i) { const ObStorageLogParam &slog_param = slog_params.at(i); if (OB_UNLIKELY(!slog_param.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("slog entry is null", K(ret), K(slog_param)); } else if (slog_param.cmd_ == put_tablet_cmd) { const ObMetaDiskAddr &disk_addr = slog_param.disk_addr_; const ObCreateTabletLog *slog = static_cast(slog_param.data_); const common::ObTabletID &tablet_id = slog->tablet_->get_tablet_meta().tablet_id_; ObTabletMapKey key(ls_->get_ls_id(), tablet_id); DeleteTabletInfo info; if (OB_FAIL(delete_tablet_infos.get(tablet_id, info))) { LOG_WARN("failed to get info from hash map", K(ret)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, info.old_data_tablet_handle_, info.new_data_tablet_handle_))) { LOG_WARN("failed to compare and swap tablet", K(ret), K(key), K(disk_addr), "old_data_tablet_handle", info.old_data_tablet_handle_, "new_data_tablet_handle", info.new_data_tablet_handle_); } } else if (slog_param.cmd_ == delete_tablet_cmd) { const ObDeleteTabletLog *slog = static_cast(slog_param.data_); const share::ObLSID &ls_id = slog->ls_id_; const common::ObTabletID &tablet_id = slog->tablet_id_; if (OB_FAIL(do_remove_tablet(ls_id, tablet_id))) { LOG_ERROR("failed to remove tablet", K(ret), K(ls_id), K(tablet_id), K(i)); } } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected slog param cmd type", K(ret), K(slog_param)); } } return ret; } int ObLSTabletService::replay_create_tablet( const ObMetaDiskAddr &disk_addr, const char *buf, const int64_t buf_len, const ObTabletID &tablet_id) { int ret = OB_SUCCESS; bool b_exist = false; ObFreezer *freezer = ls_->get_freezer(); const ObLSID &ls_id = ls_->get_ls_id(); common::ObIAllocator &allocator = MTL(ObTenantMetaMemMgr*)->get_tenant_allocator(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(has_tablet(ls_id, tablet_id, b_exist))) { LOG_WARN("fail to check tablet existence", K(ret), K(ls_id), K(tablet_id)); } else if (b_exist) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("restart replay tablet should not exist", K(ret), K(ls_id), K(tablet_id)); } else { const ObTabletMapKey key(ls_id, tablet_id); ObTabletHandle new_tablet_handle; ObTablet *new_tablet = nullptr; int64_t pos = 0; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, new_tablet_handle))) { LOG_WARN("fail to acquire tablet", K(ret), K(key)); } else if (FALSE_IT(new_tablet = new_tablet_handle.get_obj())) { // do nothing } else if (OB_FAIL(new_tablet->deserialize(allocator, buf, buf_len, pos))) { LOG_WARN("fail to deserialize tablet", K(ret), K(buf), K(buf_len), K(pos)); } else if (OB_FAIL(new_tablet->init_shared_params(ls_id, tablet_id, new_tablet->get_tablet_meta().max_sync_storage_schema_version_, freezer))) { LOG_WARN("failed to init shared params", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(refresh_tablet_addr(ls_id, tablet_id, disk_addr, new_tablet_handle))) { LOG_WARN("failed to refresh tablet addr", K(key), K(ls_id), K(tablet_id), K(disk_addr)); } else if (OB_FAIL(new_tablet->start_ddl_if_need())) { LOG_WARN("start ddl if need failed", K(ret)); } else if (OB_FAIL(try_pin_tablet_if_needed(new_tablet_handle))) { LOG_WARN("failed to try pin tablet", K(ret), K(ls_id), K(tablet_id)); } else { FLOG_INFO("succeeded to create tablet for replay slog", K(ret), K(ls_id), K(tablet_id), K(new_tablet_handle)); } if (OB_FAIL(ret) && !b_exist) { int tmp_ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_TMP_FAIL(tablet_id_set_.erase(tablet_id))) { if (OB_HASH_NOT_EXIST != ret) { LOG_ERROR("fail to erase tablet id from set", K(tmp_ret), K(tablet_id)); } } if (OB_TMP_FAIL(t3m->del_tablet(key))) { LOG_WARN("fail to erase tablet from meta memory manager", K(tmp_ret), K(key)); } } } return ret; } int ObLSTabletService::try_pin_tablet_if_needed(const ObTabletHandle &tablet_handle) { int ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObTablet *tablet = tablet_handle.get_obj(); ObTabletTxMultiSourceDataUnit tx_data; bool exist_on_memtable = false; if (OB_ISNULL(tablet)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, tablet is null", K(ret), K(tablet_handle)); } else if (OB_FAIL(tablet->inner_get_tx_data(tx_data, exist_on_memtable))) { LOG_WARN("failed to get tx data", K(ret), KPC(tablet)); } else if (!tx_data.is_valid()) { // tablet is not valid, do nothing } else if (!tx_data.is_in_tx()) { // tablet not in tx, do nothing } else { const ObLSID &ls_id = tablet->get_tablet_meta().ls_id_; const ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_; const ObTabletMapKey key(ls_id, tablet_id); if (OB_FAIL(t3m->insert_pinned_tablet(key))) { LOG_WARN("failed to insert pinned tablet", K(ret), K(key)); } } return ret; } int ObLSTabletService::check_and_get_tablet( const common::ObTabletID &tablet_id, ObTabletHandle &handle, const int64_t timeout_us) { int ret = OB_SUCCESS; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); if (OB_FAIL(ObTabletCreateDeleteHelper::check_and_get_tablet(key, handle, timeout_us))) { if (OB_TABLET_NOT_EXIST == ret) { LOG_DEBUG("failed to check and get tablet", K(ret), K(key), K(timeout_us)); } else { LOG_WARN("failed to check and get tablet", K(ret), K(key), K(timeout_us)); } } return ret; } int ObLSTabletService::get_tablet_with_timeout( const common::ObTabletID &tablet_id, ObTabletHandle &handle, const int64_t retry_timeout_us, const int64_t get_timeout_us) { int ret = OB_SUCCESS; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || get_timeout_us < ObTabletCommon::DIRECT_GET_COMMITTED_TABLET_TIMEOUT_US)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(get_timeout_us)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::check_and_get_tablet(key, handle, get_timeout_us))) { while (OB_ALLOCATE_MEMORY_FAILED == ret && ObTimeUtility::current_time() < retry_timeout_us) { ret = ObTabletCreateDeleteHelper::check_and_get_tablet(key, handle, get_timeout_us); } if (OB_ALLOCATE_MEMORY_FAILED == ret) { ret = OB_TIMEOUT; LOG_WARN("retry until reaching the timeout", K(ret), K(retry_timeout_us)); } else if (OB_FAIL(ret)) { LOG_WARN("fail to check and get tablet", K(ret), K(key)); } } return ret; } int ObLSTabletService::direct_get_tablet(const common::ObTabletID &tablet_id, ObTabletHandle &handle) { int ret = OB_SUCCESS; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); if (OB_FAIL(ObTabletCreateDeleteHelper::get_tablet(key, handle))) { LOG_WARN("failed to get tablet from t3m", K(ret), K(key)); } return ret; } int ObLSTabletService::inner_table_scan( ObTabletHandle &tablet_handle, ObTableScanIterator &iter, ObTableScanParam ¶m) { // NOTICE: ObTableScanParam for_update_ param is ignored here, // upper layer will handle it, so here for_update_ is always false int ret = OB_SUCCESS; ObStoreCtx &store_ctx = iter.get_ctx_guard().get_store_ctx(); int64_t data_max_schema_version = 0; bool is_bounded_staleness_read = (NULL == param.trans_desc_) ? false : param.snapshot_.is_weak_read(); if (OB_UNLIKELY(!tablet_handle.is_valid()) || OB_UNLIKELY(!param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_handle), K(param)); } else if (is_bounded_staleness_read && OB_FAIL(tablet_handle.get_obj()->get_max_schema_version(data_max_schema_version))) { LOG_WARN("failed to get max schema version", K(ret), K(param)); } else if (is_bounded_staleness_read && OB_FAIL(tablet_handle.get_obj()->check_schema_version_for_bounded_staleness_read( param.schema_version_, data_max_schema_version, param.index_id_))) { //check schema_version with ref_table_id, because schema_version of scan_param is from ref table LOG_WARN("check schema version for bounded staleness read fail", K(ret), K(param)); //need to get store ctx of PG, cur_key_ saves the real partition } else if (0 == param.fb_snapshot_) { ret = OB_SNAPSHOT_DISCARDED; } else if (OB_FAIL(ObTabletBindingHelper::check_snapshot_readable( tablet_handle, store_ctx.mvcc_acc_ctx_.get_snapshot_version()))) { LOG_WARN("failed to check snapshot readable", K(ret)); } else { if (param.need_switch_param_) { if (OB_FAIL(iter.switch_param(param, tablet_handle))) { LOG_WARN("failed to init table scan iterator, ", K(ret)); } } else if (OB_FAIL(iter.init(param, tablet_handle))) { LOG_WARN("failed to init table scan iterator, ", K(ret)); } } if (OB_FAIL(ret)) { LOG_WARN("failed to do table scan", K(ret), K(param), K(*this), K(data_max_schema_version)); } return ret; } int ObLSTabletService::has_tablet( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, bool &b_exist) { int ret = OB_SUCCESS; b_exist = false; const ObTabletMapKey key(ls_id, tablet_id); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_FAIL(t3m->has_tablet(key, b_exist))) { LOG_WARN("failed to check tablet exist", K(ret), K(key)); } return ret; } int ObLSTabletService::create_tablet( const share::ObLSID &ls_id, const obrpc::ObBatchCreateTabletArg &arg, const int64_t create_scn, const obrpc::ObCreateTabletInfo &info, common::ObIArray &tablet_handle_array, NonLockedHashSet &data_tablet_id_set) { int ret = OB_SUCCESS; const common::ObTabletID &data_tablet_id = info.data_tablet_id_; const common::ObSArray &tablet_ids = info.tablet_ids_; if (OB_UNLIKELY(!info.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid create tablet info", K(ret), K(info)); } else if (tablet_ids.count() == 1 && common::is_contain(tablet_ids, data_tablet_id)) { // only data tablet. if (OB_FAIL(build_single_data_tablet(ls_id, arg, create_scn, info, tablet_handle_array))) { LOG_WARN("failed to build single data tablet", K(ret), K(ls_id), K(arg), K(create_scn), K(info)); } } else { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid create tablet info", K(ret), K(info)); } return ret; } int ObLSTabletService::do_create_tablet( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, const common::ObTabletID &data_tablet_id, const common::ObIArray &index_tablet_array, const int64_t create_scn, const int64_t snapshot_version, const share::schema::ObTableSchema &table_schema, const lib::Worker::CompatMode &compat_mode, const common::ObTabletID &lob_meta_tablet_id, const common::ObTabletID &lob_piece_tablet_id, ObTabletHandle &tablet_handle) { int ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); const ObTabletMapKey key(ls_id, tablet_id); ObTableHandleV2 table_handle; ObTabletCreateSSTableParam param; ObFreezer *freezer = ls_->get_freezer(); bool need_create_empty_major_sstable = false; ObTabletTableStoreFlag table_store_flag; table_store_flag.set_with_major_sstable(); ObTabletHandle handle; if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, handle))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::check_need_create_empty_major_sstable( table_schema, need_create_empty_major_sstable))) { LOG_WARN("failed to check need create sstable", K(ret)); } else if (!need_create_empty_major_sstable) { table_store_flag.set_without_major_sstable(); LOG_INFO("no need to create sstable", K(ls_id), K(tablet_id), K(table_schema)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::build_create_sstable_param( table_schema, tablet_id, snapshot_version, param))) { LOG_WARN("failed to build create sstable param", K(ret), K(tablet_id), K(table_schema), K(snapshot_version), K(param)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::create_sstable(param, table_handle))) { LOG_WARN("failed to create sstable", K(ret), K(param)); } if (OB_FAIL(ret)) { } else if (OB_FAIL(handle.get_obj()->init(ls_id, tablet_id, data_tablet_id, lob_meta_tablet_id, lob_piece_tablet_id, create_scn, snapshot_version, table_schema, compat_mode, table_store_flag, table_handle, freezer))) { LOG_WARN("failed to init tablet", K(ret), K(ls_id), K(tablet_id), K(data_tablet_id), K(index_tablet_array), K(create_scn), K(snapshot_version), K(table_schema), K(compat_mode), K(table_store_flag)); int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(t3m->del_tablet(key))) { LOG_ERROR("failed to delete tablet from t3m", K(ret), K(key), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } } else { tablet_handle = handle; } return ret; } int ObLSTabletService::build_single_data_tablet( const share::ObLSID &ls_id, const obrpc::ObBatchCreateTabletArg &arg, const int64_t create_scn, const obrpc::ObCreateTabletInfo &info, common::ObIArray &tablet_handle_array) { int ret = OB_SUCCESS; const int64_t snapshot_version = arg.frozen_timestamp_; const common::ObTabletID &data_tablet_id = info.data_tablet_id_; const common::ObSArray &tablet_ids = info.tablet_ids_; const common::ObSArray &table_schemas = arg.table_schemas_; const lib::Worker::CompatMode &compat_mode = info.compat_mode_; common::ObSArray empty_array; ObTabletHandle tablet_handle; common::ObTabletID empty_tablet_id; int64_t index = -1; if (OB_FAIL(get_tablet_schema_index(data_tablet_id, tablet_ids, index))) { LOG_WARN("failed to get tablet schema index in array", K(ret), K(data_tablet_id)); } else if (OB_UNLIKELY(index < 0)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, table schema index is invalid", K(ret), K(data_tablet_id), K(index)); } else if (OB_FAIL(do_create_tablet(ls_id, data_tablet_id, data_tablet_id, empty_array, create_scn, snapshot_version, table_schemas[info.table_schema_index_[index]], compat_mode, empty_tablet_id, empty_tablet_id, tablet_handle))) { LOG_WARN("failed to do create tablet", K(ret), K(ls_id), K(data_tablet_id), K(create_scn), K(snapshot_version), K(compat_mode)); } else if (OB_FAIL(tablet_handle_array.push_back(tablet_handle))) { LOG_WARN("failed to insert tablet handle into array", K(ret), K(tablet_handle)); } return ret; } int ObLSTabletService::build_batch_create_tablet_arg( const obrpc::ObBatchCreateTabletArg &old_arg, const NonLockedHashSet &existed_tablet_id_set, obrpc::ObBatchCreateTabletArg &new_arg) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; const common::ObSArray &old_info_array = old_arg.tablets_; for (int64_t i = 0; OB_SUCC(ret) && i < old_info_array.count(); ++i) { const obrpc::ObCreateTabletInfo &old_info = old_info_array[i]; const common::ObSArray &old_tablet_id_array = old_info.tablet_ids_; ObCreateTabletInfo new_info; new_info.data_tablet_id_ = old_info.data_tablet_id_; new_info.compat_mode_ = old_info.compat_mode_; for (int64_t j = 0; OB_SUCC(ret) && j < old_tablet_id_array.count(); ++j) { const common::ObTabletID &old_tablet_id = old_tablet_id_array[j]; tmp_ret = existed_tablet_id_set.exist_refactored(old_tablet_id); if (OB_HASH_EXIST == tmp_ret) { LOG_INFO("tablet id exists, should skip", K(old_tablet_id)); } else if (OB_HASH_NOT_EXIST == tmp_ret) { const int64_t old_table_schema_index = old_info.table_schema_index_[j]; if (OB_FAIL(new_info.tablet_ids_.push_back(old_tablet_id))) { LOG_WARN("failed to push back old tablet id", K(ret), K(old_tablet_id)); } else if (OB_FAIL(new_info.table_schema_index_.push_back(old_table_schema_index))) { LOG_WARN("failed to push back table schema index", K(ret), K(old_tablet_id)); } } else { ret = tmp_ret; LOG_WARN("unexpected error when checking old tablet id", K(ret), K(old_tablet_id)); } } if (OB_FAIL(ret)) { // do nothing } else if (new_info.get_tablet_count() > 0) { if (OB_FAIL(new_arg.tablets_.push_back(new_info))) { LOG_WARN("failed to push back tablet info", K(ret), K(new_info)); } } else { LOG_INFO("skip create tablet info", K(old_info)); } } // copy other members in arg if (OB_FAIL(ret)) { // do nothing } else { new_arg.id_ = old_arg.id_; new_arg.frozen_timestamp_ = old_arg.frozen_timestamp_; if (new_arg.get_tablet_count() > 0 && OB_FAIL(new_arg.table_schemas_.assign(old_arg.table_schemas_))) { LOG_WARN("failed to assign table schemas", K(ret), K(old_arg)); } } return ret; } int ObLSTabletService::add_batch_tablets( const share::ObLSID &ls_id, const NonLockedHashSet &data_tablet_id_set, common::ObIArray &tablet_handle_array) { int ret = OB_SUCCESS; const ObTablet* tablet = nullptr; ObSArray disk_addr_arr; const int64_t tablet_count = tablet_handle_array.count(); if (OB_FAIL(disk_addr_arr.reserve(tablet_count))) { LOG_WARN("fail to reserve memory for disk_addr_array", K(ret), K(tablet_count)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle_array, disk_addr_arr))) { LOG_WARN("fail to write slog", K(ret), K(ls_id)); } int tmp_ret = OB_SUCCESS; common::ObTabletID tablet_id; for (int64_t i = 0; OB_SUCC(ret) && i < tablet_count; ++i) { const ObMetaDiskAddr &new_addr = disk_addr_arr[i]; ObTabletHandle &tablet_handle = tablet_handle_array.at(i); if (OB_ISNULL(tablet = tablet_handle.get_obj())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet is null", K(ret), K(tablet_handle)); } else if (FALSE_IT(tablet_id = tablet->tablet_meta_.tablet_id_)) { } else if (FALSE_IT(tmp_ret = data_tablet_id_set.exist_refactored(tablet_id))) { } else if (OB_UNLIKELY(OB_HASH_EXIST != tmp_ret && OB_HASH_NOT_EXIST != tmp_ret)) { ret = tmp_ret; LOG_WARN("failed to check tablet id existence", K(ret), K(tablet_id)); } else if ((OB_HASH_EXIST == tmp_ret) && OB_FAIL(update_tablet_object_and_addr(tablet_handle, new_addr))) { LOG_WARN("failed to update tablet object and addr", K(ret), K(tablet_handle), K(new_addr)); } else if ((OB_HASH_NOT_EXIST == tmp_ret) && OB_FAIL(refresh_tablet_addr(ls_id, tablet_id, new_addr, tablet_handle))) { LOG_WARN("failed to refresh tablet addr", K(ret), K(ls_id), K(tablet_id), K(new_addr)); } else { FLOG_INFO("succeeded to add tablet", K(ret), K(tmp_ret), K(ls_id), K(tablet_id), K(tablet_handle), K(new_addr), K(tablet_count)); } if (OB_FAIL(ret)) { LOG_ERROR("failed to modify memory after writing slog", K(ret), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } } return ret; } int ObLSTabletService::post_handle_batch_create_tablets( const obrpc::ObBatchCreateTabletArg &arg) { int ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObTabletMapKey key; key.ls_id_ = arg.id_; if (OB_UNLIKELY(!arg.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(arg)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) { const obrpc::ObCreateTabletInfo &info = arg.tablets_[i]; for (int64_t j = 0; OB_SUCC(ret) && j < info.tablet_ids_.count(); ++j) { const common::ObTabletID &tablet_id = info.tablet_ids_[j]; key.tablet_id_ = tablet_id; if (OB_FAIL(tablet_id_set_.erase(tablet_id))) { if (OB_HASH_NOT_EXIST == ret) { // tablet id does not exist in hash set ret = OB_SUCCESS; } else { LOG_WARN("fail to erase tablet id from set", K(ret), K(tablet_id)); } } if (OB_FAIL(ret)) { } else if (OB_FAIL(t3m->del_tablet(key))) { if (OB_HASH_NOT_EXIST == ret) { // key does not exist in hash map ret = OB_SUCCESS; } else { LOG_WARN("failed to delete tablet", K(ret), K(key)); } } } } } return ret; } int ObLSTabletService::create_memtable( const common::ObTabletID &tablet_id, const int64_t schema_version, const bool for_replay, const int64_t clog_checkpoint_ts) { int ret = OB_SUCCESS; ObTimeGuard time_guard("ObLSTabletService::create_memtable", 10 * 1000); ObTabletHandle handle; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || schema_version < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(schema_version)); } else { // we need bucket lock here to protect multi version tablet creation // during tablet creating new memtable and put it into table store. ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); time_guard.click("Lock"); if (OB_FAIL(direct_get_tablet(tablet_id, handle))) { LOG_WARN("fail to get tablet", K(ret), K(tablet_id)); } else if (FALSE_IT(time_guard.click("get tablet"))) { } else if (OB_UNLIKELY(!handle.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, invalid tablet handle", K(ret), K(handle)); } else if (OB_FAIL(handle.get_obj()->create_memtable(schema_version, clog_checkpoint_ts, for_replay))) { if (OB_MINOR_FREEZE_NOT_ALLOW != ret) { LOG_WARN("fail to create memtable", K(ret), K(handle), K(schema_version), K(tablet_id)); } } else { time_guard.click("tablet create_memtable"); } } return ret; } int ObLSTabletService::get_read_tables( const common::ObTabletID &tablet_id, const int64_t snapshot_version, ObTabletTableIterator &iter, const bool allow_no_ready_read) { int ret = OB_SUCCESS; ObTabletHandle &handle = iter.tablet_handle_; iter.reset(); AllowToReadMgr::AllowToReadInfo read_info; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || snapshot_version < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tablet_id), K(snapshot_version)); } else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) { } else if (!read_info.allow_to_read()) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } else if (OB_FAIL(check_and_get_tablet(tablet_id, handle))) { LOG_WARN("fail to check and get tablet", K(ret), K(tablet_id)); } else if (OB_UNLIKELY(!handle.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, invalid tablet handle", K(ret), K(handle)); } else if (OB_FAIL(handle.get_obj()->get_read_tables(snapshot_version, iter, allow_no_ready_read))) { LOG_WARN("fail to get read tables", K(ret), K(handle), K(tablet_id), K(snapshot_version), K(iter), K(allow_no_ready_read)); } else { bool is_same = false; allow_to_read_mgr_.check_read_info_same(read_info, is_same); if (!is_same) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } } return ret; } int ObLSTabletService::build_create_sstable_param_for_migration( const blocksstable::ObMigrationSSTableParam &mig_param, ObTabletCreateSSTableParam ¶m) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!mig_param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(mig_param)); } else { param.table_key_ = mig_param.table_key_; param.schema_version_ = mig_param.basic_meta_.schema_version_; param.create_snapshot_version_ = mig_param.basic_meta_.create_snapshot_version_; param.progressive_merge_round_ = mig_param.basic_meta_.progressive_merge_round_; param.progressive_merge_step_ = mig_param.basic_meta_.progressive_merge_step_; param.is_ready_for_read_ = false; param.table_mode_ = mig_param.basic_meta_.table_mode_; param.index_type_ = static_cast(mig_param.basic_meta_.index_type_); param.rowkey_column_cnt_ = mig_param.basic_meta_.rowkey_column_count_; param.root_row_store_type_ = mig_param.basic_meta_.row_store_type_; param.index_blocks_cnt_ = mig_param.basic_meta_.index_macro_block_count_; param.data_blocks_cnt_ = mig_param.basic_meta_.data_macro_block_count_; param.micro_block_cnt_ = mig_param.basic_meta_.data_micro_block_count_; param.use_old_macro_block_count_ = mig_param.basic_meta_.use_old_macro_block_count_; param.row_count_ = mig_param.basic_meta_.row_count_; param.column_cnt_ = mig_param.basic_meta_.column_cnt_; param.data_checksum_ = mig_param.basic_meta_.data_checksum_; param.occupy_size_ = mig_param.basic_meta_.occupy_size_; param.original_size_ = mig_param.basic_meta_.original_size_; param.max_merged_trans_version_ = mig_param.basic_meta_.max_merged_trans_version_; param.ddl_log_ts_ = mig_param.basic_meta_.ddl_log_ts_; param.filled_tx_log_ts_ = mig_param.basic_meta_.filled_tx_log_ts_; param.contain_uncommitted_row_ = mig_param.basic_meta_.contain_uncommitted_row_; param.compressor_type_ = mig_param.basic_meta_.compressor_type_; param.encrypt_id_ = mig_param.basic_meta_.encrypt_id_; param.master_key_id_ = mig_param.basic_meta_.master_key_id_; MEMCPY(param.encrypt_key_, mig_param.basic_meta_.encrypt_key_, share::OB_MAX_TABLESPACE_ENCRYPT_KEY_LENGTH); param.root_block_addr_.set_none_addr(); param.data_block_macro_meta_addr_.set_none_addr();; if (OB_FAIL(param.column_checksums_.assign(mig_param.column_checksums_))) { LOG_WARN("fail to assign column checksums", K(ret), K(mig_param)); } } return ret; } int ObLSTabletService::get_tablet_schema_index( const common::ObTabletID &tablet_id, const common::ObIArray &table_ids, int64_t &index) { int ret = OB_SUCCESS; bool match = false; for (int64_t i = 0; !match && i < table_ids.count(); ++i) { if (table_ids.at(i) == tablet_id) { index = i; match = true; } } if (!match) { ret = OB_ENTRY_NOT_EXIST; LOG_WARN("cannot find target tablet id in array", K(ret), K(tablet_id)); } return ret; } int ObLSTabletService::get_all_tablet_id_hash_array( const obrpc::ObBatchCreateTabletArg &arg, common::ObIArray &all_tablet_id_hash_array) { int ret = OB_SUCCESS; for (int i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) { const obrpc::ObCreateTabletInfo &info = arg.tablets_[i]; if (OB_FAIL(all_tablet_id_hash_array.push_back(info.data_tablet_id_.hash()))) { LOG_WARN("failed to push back data tablet id", K(ret), "data tablet id", info.data_tablet_id_); } else { // don't worry about duplicate data tablet id in arg, // ObMultiBucketLockGuard will try to push the array into set to remove duplicate elements, // and then sort the array to avoid possible dead lock const common::ObSArray &tablet_ids = info.tablet_ids_; for (int64_t j = 0; OB_SUCC(ret) && j < tablet_ids.count(); ++j) { const common::ObTabletID &tablet_id = tablet_ids[j]; if (OB_FAIL(all_tablet_id_hash_array.push_back(tablet_id.hash()))) { LOG_WARN("failed to push back tablet id", K(ret), K(tablet_id)); } } } } return ret; } int ObLSTabletService::parse_and_verify_delete_tablet_info( const obrpc::ObBatchRemoveTabletArg &arg, common::ObLinearHashMap &delete_tablet_infos) { int ret = OB_SUCCESS; const common::ObSArray &tablet_ids = arg.tablet_ids_; bool b_exist = true; ObTabletHandle tablet_handle; for (int64_t i = 0; i < tablet_ids.count(); ++i) { const common::ObTabletID &tablet_id = tablet_ids.at(i); if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle))) { if (OB_TABLET_NOT_EXIST == ret) { b_exist = false; ret = OB_SUCCESS; FLOG_INFO("tablet does not exist", K(ret), K(tablet_id)); } else { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } } else { b_exist = true; } if (OB_SUCC(ret) && b_exist) { if (tablet_handle.get_obj()->is_data_tablet()) { // delete data tablet and all local index tablets DeleteTabletInfo info; info.delete_data_tablet_ = true; info.old_data_tablet_handle_ = tablet_handle; if (OB_FAIL(delete_tablet_infos.insert(tablet_id, info))) { LOG_WARN("failed to insert into hash map", K(ret), K(tablet_id)); } } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, tablet handle is invalid", K(ret), K(tablet_handle)); } } } return ret; } int ObLSTabletService::get_all_tablet_id_hash_array( common::ObLinearHashMap &delete_tablet_infos, common::ObIArray &all_tablet_id_hash_array) { int ret = OB_SUCCESS; HashMapTabletGetFunctor functor(all_tablet_id_hash_array); // don't worry about duplicate data tablet id in arg, // ObMultiBucketLockGuard will try to push the array into set to remove duplicate elements, // and then sort the array to avoid possible dead lock if (OB_FAIL(delete_tablet_infos.for_each(functor))) { LOG_WARN("failed to iterate hash map to get all tablet id", K(ret)); } return ret; } int ObLSTabletService::insert_rows( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const common::ObIArray &column_ids, common::ObNewRowIterator *row_iter, int64_t &affected_rows) { int ret = OB_SUCCESS; NG_TRACE(S_insert_rows_begin); ObTabletHandle tablet_handle; int64_t afct_num = 0; int64_t dup_num = 0; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!ctx.is_valid()) || !ctx.is_write() || OB_UNLIKELY(!dml_param.is_valid()) || OB_UNLIKELY(column_ids.count() <= 0) || OB_ISNULL(row_iter)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ctx), K(dml_param), K(column_ids), KP(row_iter)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), lob_allocator, ObDmlFlag::DF_INSERT); ObIAllocator &work_allocator = run_ctx.allocator_; void *ptr = nullptr; ObStoreRow *tbl_rows = nullptr; int64_t row_count = 0; //index of row that exists int64_t row_count_first_bulk = 0; bool first_bulk = true; ObNewRow *rows = nullptr; ObRowsInfo rows_info; const ObRelativeTable &data_table = run_ctx.relative_table_; if (OB_FAIL(prepare_dml_running_ctx(&column_ids, nullptr, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } while (OB_SUCC(ret) && OB_SUCC(get_next_rows(row_iter, rows, row_count))) { if (row_count <= 0) { ret = OB_ERR_UNEXPECTED; LOG_WARN("row_count should be greater than 0", K(ret)); } else if (first_bulk) { first_bulk = false; row_count_first_bulk = row_count; const ObTableReadInfo &full_read_info = tablet_handle.get_obj()->get_full_read_info(); if (OB_FAIL(rows_info.init(data_table, ctx, full_read_info))) { LOG_WARN("Failed to init rows info", K(ret), K(data_table)); } else if (OB_ISNULL(ptr = work_allocator.alloc(row_count * sizeof(ObStoreRow)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_ERROR("fail to allocate memory", K(ret), K(row_count)); } else { tbl_rows = new (ptr) ObStoreRow[row_count]; for (int64_t i = 0; i < row_count; i++) { tbl_rows[i].flag_.set_flag(ObDmlFlag::DF_INSERT); } } } if (OB_FAIL(ret)) { } else if (OB_ISNULL(tbl_rows)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, tbl_rows is NULL", K(ret), KP(tbl_rows)); } else if (OB_FAIL(insert_rows_to_tablet(tablet_handle, run_ctx, rows, row_count, rows_info, tbl_rows, afct_num, dup_num))) { LOG_WARN("insert to each tablets fail", K(ret)); } } if (OB_ITER_END == ret) { ret = OB_SUCCESS; } if (nullptr != ptr) { work_allocator.free(ptr); } lob_allocator.reset(); if (OB_SUCC(ret)) { LOG_DEBUG("succeeded to insert rows", K(ret)); affected_rows = afct_num; EVENT_ADD(STORAGE_INSERT_ROW_COUNT, afct_num); } } NG_TRACE(S_insert_rows_end); return ret; } int ObLSTabletService::insert_row( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const common::ObIArray &column_ids, const common::ObIArray &duplicated_column_ids, const common::ObNewRow &row, const ObInsertFlag flag, int64_t &affected_rows, common::ObNewRowIterator *&duplicated_rows) { int ret = OB_SUCCESS; const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTabletHandle tablet_handle; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!ctx.is_valid() || !ctx.is_write() || !dml_param.is_valid() || column_ids.count() <= 0 || duplicated_column_ids.count() <= 0 || !row.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ctx), K(dml_param), K(column_ids), K(duplicated_column_ids), K(row), K(flag)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), lob_allocator, ObDmlFlag::DF_INSERT); ObIAllocator &work_allocator = run_ctx.allocator_; duplicated_rows = nullptr; ObStoreRow &tbl_row = run_ctx.tbl_row_; const ObRelativeTable &data_table = run_ctx.relative_table_; if (OB_FAIL(prepare_dml_running_ctx(&column_ids, nullptr, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } else { tbl_row.flag_.set_flag(ObDmlFlag::DF_INSERT); tbl_row.row_val_ = row; if (OB_FAIL(get_conflict_rows(tablet_handle, run_ctx, flag, duplicated_column_ids, tbl_row.row_val_, duplicated_rows))) { LOG_WARN("failed to get conflict row(s)", K(ret), K(duplicated_column_ids), K(row)); } else if (nullptr == duplicated_rows) { if (OB_FAIL(insert_row_to_tablet(tablet_handle, run_ctx, tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("failed to write row", K(ret)); } } else { LOG_DEBUG("succeeded to insert row", K(ret), K(row)); affected_rows = 1; EVENT_INC(STORAGE_INSERT_ROW_COUNT); } } else { ret = OB_ERR_PRIMARY_KEY_DUPLICATE; } } lob_allocator.reset(); } return ret; } static inline bool is_lob_update(ObDMLRunningCtx &run_ctx, const ObIArray &update_idx) { bool bool_ret = false; for (int64_t i = 0; i < update_idx.count() && !bool_ret; ++i) { int64_t idx = update_idx.at(i); if (run_ctx.col_descs_->at(idx).col_type_.is_lob_v2()) { bool_ret = true; } } return bool_ret; } int ObLSTabletService::update_rows( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const ObIArray &column_ids, const ObIArray< uint64_t> &updated_column_ids, ObNewRowIterator *row_iter, int64_t &affected_rows) { int ret = OB_SUCCESS; NG_TRACE(S_update_rows_begin); const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTabletHandle tablet_handle; int64_t afct_num = 0; int64_t dup_num = 0; int64_t got_row_count = 0; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!ctx.is_valid() || !ctx.is_write() || !dml_param.is_valid() || column_ids.count() <= 0 || updated_column_ids.count() <= 0 || nullptr == row_iter)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ctx), K(dml_param), K(column_ids), K(updated_column_ids), KP(row_iter)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { timeguard.click("Get"); ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), lob_allocator, ObDmlFlag::DF_UPDATE); ObIAllocator &work_allocator = run_ctx.allocator_; ObStoreRow old_tbl_row; void *old_row_cells = nullptr; ObStoreRow &new_tbl_row = run_ctx.tbl_row_; bool rowkey_change = false; UpdateIndexArray update_idx; ObRowStore row_store; bool delay_new = false; bool lob_update = false; const ObRelativeTable &relative_table = run_ctx.relative_table_; if (OB_FAIL(prepare_dml_running_ctx(&column_ids, &updated_column_ids, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } else if (FALSE_IT(timeguard.click("Prepare"))) { } else if (OB_UNLIKELY(!relative_table.is_valid())) { ret = OB_ERR_SYS; LOG_ERROR("data table is not prepared", K(ret)); } else if (OB_FAIL(construct_update_idx(relative_table.get_rowkey_column_num(), run_ctx.col_map_, updated_column_ids, update_idx))) { LOG_WARN("failed to construct update_idx", K(ret), K(updated_column_ids)); } else if (FALSE_IT(timeguard.click("Construct"))) { } else if (OB_FAIL(check_rowkey_change(updated_column_ids, relative_table, rowkey_change, delay_new))) { LOG_WARN("failed to check rowkey changes", K(ret)); } else { timeguard.click("Check"); const int64_t num = relative_table.get_column_count(); if (OB_ISNULL(old_row_cells = work_allocator.alloc( static_cast(sizeof(common::ObObj) * num)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_ERROR("failed to malloc temp row cells", K(ret)); } else { timeguard.click("AllocOld"); old_tbl_row.row_val_.cells_ = new (old_row_cells) ObObj[num](); old_tbl_row.flag_.set_flag(ObDmlFlag::DF_UPDATE); new_tbl_row.flag_.set_flag(ObDmlFlag::DF_UPDATE); } lob_update = is_lob_update(run_ctx, update_idx); } int64_t cur_time = 0; while (OB_SUCC(ret) && OB_SUCC(get_next_row_from_iter(row_iter, old_tbl_row, true)) && OB_SUCC(get_next_row_from_iter(row_iter, new_tbl_row, false))) { LOG_DEBUG("get_dml_update_row", KP(row_iter), K(old_tbl_row), K(new_tbl_row)); bool duplicate = false; ++got_row_count; cur_time = ObTimeUtility::current_time(); if ((0 == (0x1FF & got_row_count)) && (cur_time > dml_param.timeout_)) { //checking timeout cost too much, so check every 512 rows ret = OB_TIMEOUT; LOG_WARN("query timeout", K(cur_time), K(dml_param), K(ret)); } else if (OB_FAIL(update_row_to_tablet(tablet_handle, run_ctx, rowkey_change, update_idx, delay_new, lob_update, old_tbl_row, new_tbl_row, &row_store, duplicate))) { LOG_WARN("failed to update row to tablets", K(ret), K(old_tbl_row), K(new_tbl_row)); } else if (duplicate) { dup_num++; } else { afct_num++; } timeguard.click("Update"); } if (OB_ITER_END == ret) { ret = OB_SUCCESS; } if (OB_SUCC(ret) && row_store.get_row_count() > 0) { void *ptr1 = nullptr; const int64_t num = relative_table.get_column_count(); if (OB_ISNULL(ptr1 = work_allocator.alloc(sizeof(common::ObObj) * num))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_ERROR("failed to malloc temp row cells", K(ret)); } else { timeguard.click("AllocNew"); new_tbl_row.row_val_.cells_ = new(ptr1) ObObj[num](); ObRowStore::Iterator row_iter2 = row_store.begin(); // when total_quantity_log is true, we should iterate new_tbl_row and old_tbl_row, and // dispose these two rows together, otherwise, when total_quantity_log is false, // row_iter2 doesn't contain old rows, and old_tbl_row is a dummy param in process_new_row while (OB_SUCC(ret) && OB_SUCC(row_iter2.get_next_row(new_tbl_row.row_val_))) { int64_t data_tbl_rowkey_len = relative_table.get_rowkey_column_num(); bool tbl_rowkey_change = false; if (OB_FAIL(row_iter2.get_next_row(old_tbl_row.row_val_))) { LOG_WARN("fail to get row from row stores", K(ret)); } else if (FALSE_IT(timeguard.click("GetNext"))) { } else if (rowkey_change && OB_FAIL(check_rowkey_value_change(old_tbl_row.row_val_, new_tbl_row.row_val_, data_tbl_rowkey_len, tbl_rowkey_change))) { LOG_WARN("check data table rowkey change failed", K(ret), K(old_tbl_row), K(new_tbl_row), K(data_tbl_rowkey_len)); } else if (OB_FAIL(process_new_row(tablet_handle, run_ctx, update_idx, old_tbl_row, new_tbl_row, tbl_rowkey_change))) { LOG_WARN("fail to process new row", K(ret), K(old_tbl_row), K(new_tbl_row)); } timeguard.click("Process"); } work_allocator.free(ptr1); ptr1 = nullptr; new_tbl_row.row_val_.cells_ = nullptr; } } if (OB_ITER_END == ret) { ret = OB_SUCCESS; } if (nullptr != old_row_cells) { work_allocator.free(old_row_cells); } lob_allocator.reset(); old_tbl_row.row_val_.cells_ = nullptr; if (OB_SUCC(ret)) { affected_rows = afct_num; EVENT_ADD(STORAGE_UPDATE_ROW_COUNT, afct_num); } if (timeguard.get_diff() > 3 * 1000 * 1000) { LOG_WARN("update rows use too much time", K(afct_num), K(got_row_count)); } } NG_TRACE(S_update_rows_end); return ret; } int ObLSTabletService::put_rows( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const ObIArray &column_ids, ObNewRowIterator *row_iter, int64_t &affected_rows) { int ret = OB_SUCCESS; const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTabletHandle tablet_handle; int64_t afct_num = 0; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!ctx.is_valid()) || OB_UNLIKELY(!ctx.is_write()) || OB_UNLIKELY(!dml_param.is_valid()) || OB_UNLIKELY(column_ids.count() <= 0) || OB_ISNULL(row_iter)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ctx), K(dml_param), K(column_ids), KP(row_iter)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), lob_allocator, ObDmlFlag::DF_UPDATE); ObIAllocator &work_allocator = run_ctx.allocator_; ObNewRow *row = nullptr; ObStoreRow &tbl_row = run_ctx.tbl_row_; const ObRelativeTable &data_table = run_ctx.relative_table_; if (OB_FAIL(prepare_dml_running_ctx(&column_ids, nullptr, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } else { tbl_row.flag_.set_flag(ObDmlFlag::DF_UPDATE); } int64_t cur_time = 0; while (OB_SUCC(ret) && OB_SUCC(row_iter->get_next_row(row))) { cur_time = ObTimeUtility::current_time(); tbl_row.row_val_ = *row; if (cur_time > dml_param.timeout_) { ret = OB_TIMEOUT; LOG_WARN("query timeout", K(ret), K(cur_time), K(dml_param)); } else if (OB_FAIL(insert_row_to_tablet(tablet_handle, run_ctx, tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to write row", K(ret)); } } ++afct_num; } if (OB_ITER_END == ret) { ret = OB_SUCCESS; } lob_allocator.reset(); if (OB_SUCC(ret)) { affected_rows = afct_num; EVENT_ADD(STORAGE_INSERT_ROW_COUNT, afct_num); } } return ret; } int ObLSTabletService::delete_rows( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const ObIArray &column_ids, ObNewRowIterator *row_iter, int64_t &affected_rows) { int ret = OB_SUCCESS; NG_TRACE(S_delete_rows_begin); const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTabletHandle tablet_handle; ObRowReshape *row_reshape = nullptr; int64_t afct_num = 0; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_ISNULL(row_iter) || !ctx.is_valid() || !ctx.is_write() || column_ids.count() <= 0 || OB_ISNULL(row_iter)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(dml_param), K(column_ids), KP(row_iter), K(ctx)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { ObArenaAllocator lob_allocator(ObModIds::OB_LOB_ACCESS_BUFFER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), lob_allocator, ObDmlFlag::DF_DELETE); ObNewRow *row = nullptr; if (OB_FAIL(prepare_dml_running_ctx(&column_ids, nullptr, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } // delete table rows int64_t cur_time = 0; while (OB_SUCC(ret) && OB_SUCC(row_iter->get_next_row(row))) { cur_time = ObTimeUtility::current_time(); if (cur_time > run_ctx.dml_param_.timeout_) { ret = OB_TIMEOUT; LOG_WARN("query timeout", K(cur_time), K(run_ctx.dml_param_), K(ret)); } else if (OB_ISNULL(row)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get next row from iterator failed", KP(row), K(ret)); } else if (OB_FAIL(delete_row_in_tablet(tablet_handle, run_ctx, *row))) { LOG_WARN("fail to delete row", K(ret), K(row)); } else { ++afct_num; } } lob_allocator.reset(); if (OB_ITER_END == ret) { ret = OB_SUCCESS; } if (OB_SUCC(ret)) { affected_rows = afct_num; EVENT_ADD(STORAGE_DELETE_ROW_COUNT, afct_num); } } NG_TRACE(S_delete_rows_end); return ret; } int ObLSTabletService::lock_rows( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const int64_t abs_lock_timeout, const ObLockFlag lock_flag, const bool is_sfu, ObNewRowIterator *row_iter, int64_t &affected_rows) { UNUSEDx(lock_flag, is_sfu); NG_TRACE(S_lock_rows_begin); int ret = OB_SUCCESS; const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); ObTabletHandle tablet_handle; int64_t afct_num = 0; ObColDescArray col_desc; common::ObSEArray column_ids; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tablet service is not initialized", K(ret)); } else if (OB_UNLIKELY(!ctx.is_valid() || !ctx.is_write() || !dml_param.is_valid() || OB_ISNULL(row_iter))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ctx), K(dml_param), KPC(row_iter)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { timeguard.click("Get"); ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), ObDmlFlag::DF_LOCK); ObNewRow *row = nullptr; if (OB_FAIL(prepare_dml_running_ctx(nullptr, nullptr, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } else if (FALSE_IT(timeguard.click("Prepare"))) { } else if (OB_FAIL(run_ctx.relative_table_.get_rowkey_column_ids(col_desc))) { LOG_WARN("Fail to get column desc", K(ret)); } else if (OB_FAIL(run_ctx.relative_table_.get_rowkey_column_ids(column_ids))) { LOG_WARN("Fail to get column ids", K(ret)); } else { timeguard.click("GetIds"); run_ctx.column_ids_ = &column_ids; ctx.mvcc_acc_ctx_.abs_lock_timeout_ = ObTablet::get_lock_wait_timeout(abs_lock_timeout, dml_param.timeout_); while (OB_SUCCESS == ret && OB_SUCC(row_iter->get_next_row(row))) { ObRelativeTable &relative_table = run_ctx.relative_table_; bool is_exists = true; if (ObTimeUtility::current_time() > dml_param.timeout_) { ret = OB_TIMEOUT; int64_t cur_time = ObTimeUtility::current_time(); LOG_WARN("query timeout", K(cur_time), K(dml_param), K(ret)); } else if (GCONF.enable_defensive_check() && OB_FAIL(check_old_row_legitimacy(tablet_handle, run_ctx, *row))) { LOG_WARN("check row legitimacy failed", K(ret), KPC(row)); } else if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_nullable_value(col_desc, relative_table, *row))) { LOG_WARN("check lock row nullable failed", K(ret)); } else if (FALSE_IT(timeguard.click("Check"))) { } else if (OB_FAIL(tablet_handle.get_obj()->lock_row(run_ctx.relative_table_, ctx, *row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("failed to lock row", K(*row), K(ret)); } } else { ++afct_num; } timeguard.click("Lock"); } if (OB_ITER_END == ret) { ret = OB_SUCCESS; affected_rows = afct_num; } } } NG_TRACE(S_lock_rows_end); return ret; } int ObLSTabletService::lock_row( ObStoreCtx &ctx, const ObDMLBaseParam &dml_param, const int64_t abs_lock_timeout, const ObNewRow &row, const ObLockFlag lock_flag, const bool is_sfu) { UNUSEDx(lock_flag, is_sfu); int ret = OB_SUCCESS; const ObTabletID &data_tablet_id = ctx.tablet_id_; ObTimeGuard timeguard(__func__, 3 * 1000 * 1000); ObTabletHandle tablet_handle; int64_t afct_num = 0; ObColDescArray col_desc; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("tablet service is not initialized", K(ret)); } else if (OB_UNLIKELY(!ctx.is_valid() || !dml_param.is_valid() || !row.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ctx), K(dml_param), K(row)); } else if (OB_FAIL(get_tablet_with_timeout( ctx.tablet_id_, tablet_handle, dml_param.timeout_))) { LOG_WARN("failed to check and get tablet", K(ret), K(ctx.tablet_id_)); } else { ObDMLRunningCtx run_ctx(ctx, dml_param, ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), ctx.mvcc_acc_ctx_.mem_ctx_->get_query_allocator(), ObDmlFlag::DF_LOCK); if (OB_FAIL(prepare_dml_running_ctx(nullptr, nullptr, tablet_handle, run_ctx))) { LOG_WARN("failed to prepare dml running ctx", K(ret)); } else if (OB_FAIL(run_ctx.relative_table_.get_rowkey_column_ids(col_desc))) { LOG_WARN("Fail to get column desc", K(ret)); } else { ctx.mvcc_acc_ctx_.abs_lock_timeout_ = ObTablet::get_lock_wait_timeout(abs_lock_timeout, dml_param.timeout_); if (ObTimeUtility::current_time() > dml_param.timeout_) { ret = OB_TIMEOUT; int64_t cur_time = ObTimeUtility::current_time(); LOG_WARN("query timeout", K(cur_time), K(dml_param), K(ret)); } else if (OB_FAIL(tablet_handle.get_obj()->lock_row(run_ctx.relative_table_, ctx, row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("failed to lock row", K(row), K(ret)); } } else { ++afct_num; } } } return ret; } int ObLSTabletService::trim_rebuild_tablet( const ObTabletID &tablet_id, const bool is_rollback) { int ret = OB_SUCCESS; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(tablet_id)); } else if (is_rollback && OB_FAIL(rollback_rebuild_tablet(tablet_id))) { LOG_WARN("failed to rollback tablet rebuilt", K(ret), K(is_rollback), K(tablet_id)); } else if (!is_rollback && OB_FAIL(trim_old_tablets(tablet_id))) { LOG_WARN("failed to trim old tablets", K(ret), K(is_rollback), K(tablet_id)); } return ret; } int ObLSTabletService::create_or_update_migration_tablet( const ObMigrationTabletParam &mig_tablet_param, const bool is_transfer) { // TODO: is_transfer may be redundant, temporarily unused UNUSEDx(is_transfer); int ret = OB_SUCCESS; const share::ObLSID &ls_id = mig_tablet_param.ls_id_; const common::ObTabletID &tablet_id = mig_tablet_param.tablet_id_; ObTabletHandle tablet_handle; bool b_exist = false; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!mig_tablet_param.is_valid()) || OB_UNLIKELY(ls_id != ls_->get_ls_id())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(mig_tablet_param), K_(ls)); } else if (OB_FAIL(has_tablet(ls_id, tablet_id, b_exist))) { LOG_WARN("failed to check tablet existence", K(ls_id), K(tablet_id)); } else if (b_exist && OB_FAIL(migrate_update_tablet(mig_tablet_param))) { LOG_WARN("failed to update tablet meta", K(ret), K(tablet_id), K(mig_tablet_param)); } else if (!b_exist && OB_FAIL(migrate_create_tablet(mig_tablet_param, tablet_handle))) { LOG_WARN("failed to migrate create tablet", K(ret), K(mig_tablet_param)); } return ret; } int ObLSTabletService::rebuild_create_tablet( const ObMigrationTabletParam &mig_tablet_param, const bool keep_old) { int ret = OB_SUCCESS; const share::ObLSID &ls_id = mig_tablet_param.ls_id_; const common::ObTabletID &tablet_id = mig_tablet_param.tablet_id_; bool b_exist = false; ObTabletHandle new_tablet_handle; ObTabletHandle old_tablet_handle; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!mig_tablet_param.is_valid()) || OB_UNLIKELY(ls_id != ls_->get_ls_id())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(mig_tablet_param), K_(ls)); } else if (OB_FAIL(has_tablet(ls_id, tablet_id, b_exist))) { LOG_WARN("fail to check tablet existence", K(ls_id), K(tablet_id)); } else if (!b_exist && OB_FAIL(migrate_create_tablet(mig_tablet_param, new_tablet_handle))) { LOG_WARN("failed to rebuild create tablet", K(ret), K(tablet_id), K(mig_tablet_param)); } else if (b_exist && !keep_old && OB_FAIL(migrate_update_tablet(mig_tablet_param))) { LOG_WARN("failed to rebuild create tablet", K(ret), K(tablet_id), K(mig_tablet_param)); } else if (b_exist && keep_old) { if (OB_FAIL(check_and_get_tablet(tablet_id, old_tablet_handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { LOG_WARN("failed to check and get tablet", K(ret), K(tablet_id)); } else if (OB_UNLIKELY(old_tablet_handle.get_obj()->get_tablet_meta().has_next_tablet_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("number of nodes on list exceeds 2", K(ret)); } else if (OB_FAIL(rebuild_tablet_with_old(mig_tablet_param, old_tablet_handle))) { LOG_WARN("failed to rebuild tablet and maintain linked list", K(ret), K(tablet_id)); } } return ret; } int ObLSTabletService::create_migration_sstable( const blocksstable::ObMigrationSSTableParam &mig_sstable_param, const ObSSTableStatus status, ObTableHandleV2 &table_handle) { int ret = OB_SUCCESS; ObTabletCreateSSTableParam create_param; ObTabletHandle tablet_handle; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!mig_sstable_param.is_valid() || (ObSSTableStatus::SSTABLE_READY_FOR_REMOTE_LOGICAL_READ != status && ObSSTableStatus::SSTABLE_READY_FOR_REMOTE_PHYTSICAL_READ != status && ObSSTableStatus::SSTABLE_WRITE_BUILDING != status))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(mig_sstable_param), K(status)); } else if (OB_FAIL(build_create_sstable_param_for_migration(mig_sstable_param, create_param))) { LOG_WARN("fail to build create sstable param", K(ret), K(mig_sstable_param)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::create_sstable(create_param, table_handle))) { LOG_WARN("fail to create sstable", K(ret), K(create_param)); } else if (ObSSTableStatus::SSTABLE_WRITE_BUILDING != status) { ObSSTable *sstable = nullptr; if (OB_FAIL(table_handle.get_sstable(sstable))) { LOG_WARN("fail to get sstable", K(ret), KP(sstable)); } else if (OB_ISNULL(sstable)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, sstable is nullptr", K(ret), KP(sstable)); } else if (OB_FAIL(sstable->set_status_for_read(status))) { LOG_WARN("fail to set status for sstable", K(ret), K(status), KPC(sstable)); } } return ret; } int ObLSTabletService::finish_copy_migration_sstable( const ObTabletID &tablet_id, const ObITable::TableKey &sstable_key) { int ret = OB_SUCCESS; const ObTabletMapKey key(ls_->get_ls_id(), tablet_id); ObMetaDiskAddr disk_addr; ObTabletHandle tablet_handle; ObTableHandleV2 sstable_handle; ObSSTable *sstable = nullptr; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || !sstable_key.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tablet_id), K(sstable_key)); } else if (OB_FAIL(check_and_get_tablet(tablet_id, tablet_handle))) { LOG_WARN("fail to check and get tablet", K(ret), K(tablet_id)); } else if (OB_FAIL(tablet_handle.get_obj()->get_table_store().get_table( sstable_key, sstable_handle))) { LOG_WARN("fail to get sstable", K(ret), K(tablet_id), K(sstable_key)); } else if (OB_FAIL(sstable_handle.get_sstable(sstable))) { LOG_WARN("fail to get sstable", K(ret), KP(sstable)); } else if (OB_ISNULL(sstable)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, sstable is nullptr", K(ret), KP(sstable)); } else if (OB_FAIL(sstable->set_status_for_read(ObSSTableStatus::SSTABLE_READY_FOR_READ))) { LOG_WARN("fail to set status for sstable", K(ret), KPC(sstable)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("fail to write update tablet slog", K(ret), K(tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, tablet_handle, tablet_handle))) { LOG_ERROR("failed to create tablet", K(ret), K(key), K(disk_addr), K(tablet_handle), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } else { LOG_INFO("copy migration sstable succeed", K(ret), K(key), K(disk_addr)); } return ret; } int ObLSTabletService::build_ha_tablet_new_table_store( const ObTabletID &tablet_id, const ObBatchUpdateTableStoreParam ¶m) { int ret = OB_SUCCESS; ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); ObMetaDiskAddr disk_addr; ObFreezer *freezer = nullptr; memtable::ObIMemtable *imemtable = nullptr; ObTableHandleV2 memtable_handle; bool is_tablet_freeze = false; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(is_stopped_)) { ret = OB_NOT_RUNNING; LOG_WARN("tablet service stopped", K(ret)); } else if (OB_UNLIKELY(!tablet_id.is_valid() || !param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_id), K(param)); } else if (OB_ISNULL(freezer = ls_->get_freezer())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("freezer should not be NULL", K(ret), KP(freezer), KPC(ls_)); } else { ObTabletHandle old_tablet_handle; ObTabletHandle new_tablet_handle; ObBucketHashWLockGuard lock_guard(bucket_lock_, tablet_id.hash()); if (OB_FAIL(direct_get_tablet(tablet_id, old_tablet_handle))) { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } else { ObTablet *old_tablet = old_tablet_handle.get_obj(); ObTablet *new_tablet = nullptr; const share::ObLSID &ls_id = ls_->get_ls_id(); const ObTabletMapKey key(ls_id, tablet_id); ObTabletTxMultiSourceDataUnit tx_data; ObTabletBindingInfo ddl_data; ObTabletAutoincSeq autoinc_seq; //In order to merge tablet meta //it is necessary to make the left side of the newly created memtable start from clog_checkpinoit_ts //the new memtable can be stuck during the creation of the tablet, it is safe here // try tablet freeze if (!tablet_id.is_ls_inner_tablet()) { if (nullptr != param.tablet_meta_ && old_tablet->get_clog_checkpoint_ts() < param.tablet_meta_->clog_checkpoint_ts_) { if (OB_FAIL(freezer->tablet_freeze_for_replace_tablet_meta(tablet_id, memtable_handle))) { LOG_WARN("failed to freeze tablet", K(ret), K(tablet_id), KPC(old_tablet)); } else { is_tablet_freeze = true; } } if (OB_FAIL(ret)) { } else if (!is_tablet_freeze) { } else if (!memtable_handle.is_valid()) { } else if (OB_FAIL(memtable_handle.get_memtable(imemtable))) { LOG_WARN("failed to get memtable", K(ret), K(tablet_id)); } else { memtable::ObMemtable *memtable = static_cast(imemtable); if (OB_FAIL(memtable->resolve_right_boundary_for_migration())) { LOG_WARN("failed to resolve right boundary", K(ret), K(tablet_id)); } } } if (OB_FAIL(ret)) { } else if (OB_FAIL(old_tablet->get_tx_data(tx_data))) { LOG_WARN("failed to get tx data from old tablet", K(ret), K(tablet_id)); } else if (OB_FAIL(old_tablet->get_ddl_data(ddl_data))) { LOG_WARN("failed to get tx data from old tablet", K(ret), K(tablet_id)); } else if (OB_FAIL(old_tablet->get_latest_autoinc_seq(autoinc_seq))) { LOG_WARN("failed to get autoinc seq from old tablet", K(ret)); } else if (OB_FAIL(ObTabletCreateDeleteHelper::acquire_tablet(key, new_tablet_handle, true/*only acquire*/))) { LOG_WARN("failed to acquire tablet", K(ret), K(key)); } else if (FALSE_IT(new_tablet = new_tablet_handle.get_obj())) { } else if (OB_FAIL(new_tablet->init(param, *old_tablet, tx_data, ddl_data, autoinc_seq))) { LOG_WARN("failed to init tablet", K(ret), KPC(old_tablet)); } else { common::ObSArray memtables; const int64_t clog_checkpoint_ts = new_tablet->get_clog_checkpoint_ts(); const int64_t snapshot_version = new_tablet->get_snapshot_version(); if (OB_FAIL(new_tablet->get_memtables(memtables, true/*need_active*/))) { LOG_WARN("failed to get memtables", K(ret), K(key)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) { const memtable::ObMemtable *memtable = static_cast(memtables.at(i)); if (OB_UNLIKELY(memtable->get_end_log_ts() < clog_checkpoint_ts)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("memtable end log ts is smaller than clog checkpoint ts", K(ret), "end_log_ts", memtable->get_end_log_ts(), K(clog_checkpoint_ts)); } else if (memtable->get_end_log_ts() == clog_checkpoint_ts) { if (memtable->get_snapshot_version() <= snapshot_version) { ret = OB_ERR_UNEXPECTED; LOG_WARN("memtable snapshot version is no bigger than tablet snapshot version", K(ret), "memtable_snapshot_version", memtable->get_snapshot_version(), K(snapshot_version)); } } } } } if (OB_FAIL(ret)) { } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(new_tablet_handle, disk_addr))) { LOG_WARN("fail to write update tablet slog", K(ret), K(new_tablet_handle), K(disk_addr)); } else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, old_tablet_handle, new_tablet_handle))) { LOG_ERROR("failed to compare and swap tablet", K(ret), K(key), K(disk_addr)); ob_usleep(1000 * 1000); ob_abort(); } else if (OB_FAIL(old_tablet->set_memtable_clog_checkpoint_ts(param.tablet_meta_))) { LOG_WARN("failed to set memtable clog checkpoint ts", K(ret), KPC(old_tablet), K(param)); } else { LOG_INFO("succeed to build ha tablet new table store", K(ret), K(key), K(disk_addr), K(param)); } } } if (is_tablet_freeze) { int tmp_ret = OB_SUCCESS; if (OB_TMP_FAIL(freezer->handle_frozen_memtable_for_replace_tablet_meta(tablet_id, memtable_handle))) { LOG_WARN("failed to handle_frozen_memtable_for_replace_tablet_meta", K(tmp_ret), K(tablet_id), K(param)); ret = OB_SUCC(ret) ? tmp_ret : ret; } } return ret; } int ObLSTabletService::verify_tablets( const obrpc::ObBatchCreateTabletArg &arg, NonLockedHashSet &existed_tablet_id_set) { int ret = OB_SUCCESS; const share::ObLSID &ls_id = arg.id_; bool b_exist = true; ObTabletHandle tablet_handle; for (int64_t i = 0; OB_SUCC(ret) && i < arg.tablets_.count(); ++i) { const ObCreateTabletInfo &tablet_arg = arg.tablets_[i]; const common::ObSArray &tablet_ids = tablet_arg.tablet_ids_; const common::ObTabletID &data_tablet_id = tablet_arg.data_tablet_id_; bool is_pure_index = true; bool is_data_tablet_exist = true; for (int64_t k = 0; is_pure_index && k < tablet_ids.count(); ++k) { if (tablet_ids[k] == data_tablet_id) { is_pure_index = false; } } if (is_pure_index) { ObTabletHandle data_tablet_handle; if (OB_FAIL(direct_get_tablet(data_tablet_id, data_tablet_handle))) { if (OB_TABLET_NOT_EXIST == ret) { is_data_tablet_exist = false; ret = OB_SUCCESS; // TODO (bowen.gbw): data tablet might be removed when replaying index tablet creation // so that we should skip such clog to avoid replaying failure. // Temp fix: consider those index tablets as existed ones and skip in replaying // Final solution: Multi Data Source Transaction FLOG_INFO("data tablet does not exist, need skip create pure index tablets", K(ret), K(data_tablet_id)); } else { LOG_WARN("failed to get tablet", K(ret), K(data_tablet_id)); } } } for (int64_t j = 0; OB_SUCC(ret) && j < tablet_ids.count(); ++j) { const common::ObTabletID &tablet_id = tablet_ids[j]; if (is_pure_index && !is_data_tablet_exist) { b_exist = true; } else if (OB_FAIL(direct_get_tablet(tablet_id, tablet_handle))) { if (OB_TABLET_NOT_EXIST == ret) { b_exist = false; ret = OB_SUCCESS; LOG_INFO("tablet does not exist", K(ret), K(tablet_id)); } else { LOG_WARN("failed to get tablet", K(ret), K(tablet_id)); } } else { b_exist = true; } if (OB_SUCC(ret) && b_exist) { FLOG_INFO("tablet already exists", K(ls_id), K(tablet_id)); if (OB_FAIL(existed_tablet_id_set.set_refactored(tablet_id))) { LOG_WARN("failed to insert into hash set", K(ret), K(tablet_id)); } } } } return ret; } int ObLSTabletService::need_check_old_row_legitimacy(ObDMLRunningCtx &run_ctx, bool &need_check, bool &is_udf) { int ret = OB_SUCCESS; ObRelativeTable &data_table = run_ctx.relative_table_; need_check = false; const ObDMLBaseParam &dml_param = run_ctx.dml_param_; is_udf = false; // TODO(jingxing): setting this to true if (OB_FAIL(data_table.has_udf_column(need_check))) { LOG_WARN("check has udf column failed", K(ret)); } else if (need_check) { is_udf = true; ObTableStoreIterator &table_iter = data_table.tablet_iter_.table_iter_; while (OB_SUCC(ret) && !need_check) { ObITable *table_ptr = nullptr; if (OB_FAIL(table_iter.get_next(table_ptr))) { if (OB_ITER_END != ret) { LOG_WARN("get next table failed", K(ret)); } } else if (OB_ISNULL(table_ptr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("error unexpected, table ptr must not be nullptr", K(ret)); } else { need_check = table_ptr->is_major_sstable(); } } } else if (dml_param.is_batch_stmt_ && !data_table.is_index_table()) { //batch stmt execution dependency defensive check to check //if the same row was modified multiple times need_check = true; } else if (GCONF.enable_defensive_check()) { need_check = true; if (data_table.is_index_table() && !data_table.can_read_index()) { //index can not be read during building index, so does not check old index row need_check = false; } } return ret; } int ObLSTabletService::construct_table_rows( const ObNewRow *rows, ObStoreRow *tbl_rows, int64_t row_count) { int ret = OB_SUCCESS; if (row_count <= 0) { ret = OB_ERR_WRONG_VALUE_COUNT_ON_ROW; LOG_WARN("row count should be bigger than 0", K(row_count), K(ret)); } else { for (int64_t i = 0; i < row_count; i++) { tbl_rows[i].row_val_ = rows[i]; } } return ret; } int ObLSTabletService::check_old_row_legitimacy( ObTabletHandle &data_tablet_handle, ObDMLRunningCtx &run_ctx, const common::ObNewRow &old_row) { int ret = OB_SUCCESS; ObRelativeTable &data_table = run_ctx.relative_table_; ObStoreRowkey rowkey; bool need_check = false; bool is_udf = false; rowkey.assign(old_row.cells_, data_table.get_rowkey_column_num()); if (OB_UNLIKELY(rowkey.get_obj_cnt() > old_row.count_) || OB_ISNULL(run_ctx.column_ids_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("old row is invalid", K(ret), K(old_row), K(rowkey.get_obj_cnt()), KP(run_ctx.column_ids_)); } else if (OB_FAIL(need_check_old_row_legitimacy(run_ctx, need_check, is_udf))) { LOG_WARN("identify need check old row legitimacy", K(ret)); } else if (need_check) { //the vertical partition is no longer maintained, //and the defense check skips the vertical partition function const ObDMLBaseParam &dml_param = run_ctx.dml_param_; ObArenaAllocator scan_allocator(ObModIds::OB_TABLE_SCAN_ITER); ObIAllocator *allocator = &scan_allocator; ObSingleRowGetter old_row_getter(*allocator, *data_tablet_handle.get_obj()); ObNewRow *storage_old_row = nullptr; //check old row whether different with SQL.old_row, ObDatumRowkey datum_rowkey; ObDatumRowkeyHelper rowkey_helper; const ObIArray &column_ids = *run_ctx.column_ids_; if (OB_FAIL(rowkey_helper.convert_datum_rowkey(rowkey.get_rowkey(), datum_rowkey))) { STORAGE_LOG(WARN, "Failed to transfer datum rowkey", K(ret), K(rowkey)); } else if (OB_FAIL(old_row_getter.init_dml_access_ctx(run_ctx.store_ctx_, dml_param))) { LOG_WARN("init dml access ctx failed", K(ret)); } else if (OB_FAIL(old_row_getter.init_dml_access_param(data_table, dml_param, column_ids))) { LOG_WARN("init dml access param failed", K(ret)); } else if (OB_FAIL(old_row_getter.open(datum_rowkey, true))) { LOG_WARN("open old row getter failed", K(ret), K(rowkey)); } else if (OB_FAIL(old_row_getter.get_next_row(storage_old_row))) { if (OB_ITER_END == ret) { ret = OB_ERR_DEFENSIVE_CHECK; LOG_WARN("old row in storage is not exists", K(ret)); } else { LOG_WARN("get next row from old_row_iter failed", K(ret), KPC(run_ctx.column_ids_), K(old_row)); } } else if (OB_ISNULL(storage_old_row)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected error, storage old row is NULL", K(ret)); } else if (storage_old_row->get_count() != old_row.get_count()) { ret = OB_ERR_DEFENSIVE_CHECK; LOG_WARN("storage old row is not matched with sql old row", K(ret)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < old_row.get_count(); ++i) { const ObObj &storage_val = storage_old_row->get_cell(i); const ObObj &sql_val = old_row.get_cell(i); int cmp = 0; if (OB_UNLIKELY(ObLongTextType == storage_val.get_type() && sql_val.is_lob_locator())) { //skip check lob column type when do the normal sql write check } else if (OB_UNLIKELY(storage_val.is_nop_value())) { bool is_nop = false; if (OB_FAIL(data_table.is_nop_default_value(column_ids.at(i), is_nop))) { LOG_WARN("check column whether has nop default value failed", K(ret), K(column_ids.at(i))); } else if (!is_nop) { ret = OB_ERR_DEFENSIVE_CHECK; LOG_WARN("storage old row is not matched with sql old row", K(ret), K(i), K(column_ids.at(i)), K(storage_val), K(sql_val)); } } else if (sql_val.is_nop_value()) { //this column is nop val, means that this column does not be touched by DML //just ignore it } else if (OB_FAIL(storage_val.compare(sql_val, cmp)) || 0 != cmp) { LOG_WARN("storage_val is not equal with sql_val, maybe catch a bug", K(ret), K(storage_val), K(sql_val), K(cmp), K(column_ids.at(i))); ret = OB_ERR_DEFENSIVE_CHECK; } } } if (OB_ERR_DEFENSIVE_CHECK == ret && dml_param.is_batch_stmt_) { // 批量删除的时候可能索引表的删除在主表前边,所以所有的表在batch删除的时候出现4377,都可能是重复删导致的 ret = OB_BATCHED_MULTI_STMT_ROLLBACK; LOG_TRACE("batch stmt execution has a correctness error, needs rollback", K(ret), "column_id", column_ids, KPC(storage_old_row), "sql_old_row", old_row, "dml_param", run_ctx.dml_param_, "dml_type", run_ctx.dml_flag_); } if (OB_ERR_DEFENSIVE_CHECK == ret) { if (is_udf) { ret = OB_ERR_INDEX_KEY_NOT_FOUND; LOG_WARN("index key not found on udf column", K(ret), K(old_row)); } else { ObString func_name = ObString::make_string("check_old_row_legitimacy"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), "column_id", column_ids, KPC(storage_old_row), "sql_old_row", old_row, "dml_param", run_ctx.dml_param_, "dml_flag", run_ctx.dml_flag_, "store_ctx", run_ctx.store_ctx_, "relative_table", run_ctx.relative_table_); LOG_ERROR("Dump data table info", K(ret), K(data_table)); run_ctx.store_ctx_.force_print_trace_log(); } } } return ret; } int ObLSTabletService::check_new_row_legitimacy( ObDMLRunningCtx &run_ctx, const common::ObNewRow &new_row) { int ret = OB_SUCCESS; ObRelativeTable &data_table = run_ctx.relative_table_; int64_t data_table_cnt = data_table.get_column_count(); if (OB_ISNULL(run_ctx.column_ids_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("column ids is nullptr", K(ret)); } else if (OB_FAIL(check_new_row_nullable_value(*run_ctx.column_ids_, data_table, new_row))) { LOG_WARN("check new row nullable value failed", K(ret), "dml_param", run_ctx.dml_param_, "dml_type", run_ctx.dml_flag_); } else if (OB_FAIL(check_new_row_shadow_pk(*run_ctx.column_ids_, data_table, new_row))) { LOG_WARN("check new row nullable value failed", K(ret), "dml_param", run_ctx.dml_param_, "dml_type", run_ctx.dml_flag_); } return ret; } int ObLSTabletService::insert_rows_to_tablet( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const ObNewRow * const rows, const int64_t row_count, ObRowsInfo &rows_info, ObStoreRow *tbl_rows, int64_t &afct_num, int64_t &dup_num) { int ret = OB_SUCCESS; ObStoreCtx &ctx = run_ctx.store_ctx_; const ObDMLBaseParam &dml_param = run_ctx.dml_param_; ObRelativeTable &data_table = run_ctx.relative_table_; if (OB_FAIL(ret)) { } else if (ObTimeUtility::current_time() > dml_param.timeout_) { ret = OB_TIMEOUT; int64_t cur_time = ObTimeUtility::current_time(); LOG_WARN("query timeout", K(cur_time), K(dml_param), K(ret)); } else if (OB_FAIL(construct_table_rows(rows, tbl_rows, row_count))) { LOG_WARN("fail to construct table rows", K(ret)); } else if (OB_FAIL(rows_info.check_duplicate(tbl_rows, row_count, data_table))) { if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret) { char rowkey_buffer[OB_TMP_BUF_SIZE_256]; if (OB_SUCCESS != extract_rowkey(data_table, rows_info.get_duplicate_rowkey(), rowkey_buffer, OB_TMP_BUF_SIZE_256, run_ctx.dml_param_.tz_info_)) { LOG_WARN("extract rowkey failed"); } else { ObString index_name = "PRIMARY"; if (data_table.is_index_table()) { data_table.get_index_name(index_name); } LOG_USER_ERROR(OB_ERR_PRIMARY_KEY_DUPLICATE, rowkey_buffer, index_name.length(), index_name.ptr()); } } else { LOG_WARN("fail to check duplicate", K(ret)); } } else if (OB_FAIL(insert_lob_tablet_rows(tablet_handle, run_ctx, tbl_rows, row_count))) { LOG_WARN("failed to insert rows to lob tablet", K(ret)); } else if (OB_FAIL(insert_tablet_rows(tablet_handle, run_ctx, tbl_rows, row_count, rows_info))) { LOG_WARN("failed to insert rows to data tablet", K(ret)); } else { afct_num = afct_num + row_count; } return ret; } int ObLSTabletService::insert_tablet_rows( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, ObStoreRow *rows, const int64_t row_count, ObRowsInfo &rows_info) { int ret = OB_SUCCESS; ObRelativeTable &table = run_ctx.relative_table_; bool exists = false; const bool check_exists = !table.is_storage_index_table() || table.is_unique_index(); if (check_exists && OB_FAIL(tablet_handle.get_obj()->rowkeys_exists( run_ctx.store_ctx_, table, rows_info, exists))) { LOG_WARN("fail to check the existence of rows", K(ret), K(rows_info), K(exists)); } else if (exists) { ret = OB_ERR_PRIMARY_KEY_DUPLICATE; LOG_WARN("rowkeys already exist", K(ret), K(table), K(rows_info)); } for (int64_t k = 0; OB_SUCC(ret) && k < row_count; k++) { ObStoreRow &tbl_row = rows[k]; if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_legitimacy(run_ctx, tbl_row.row_val_))) { LOG_WARN("check new row legitimacy failed", K(ret), K(tbl_row.row_val_)); } else if (OB_FAIL(tablet_handle.get_obj()->insert_row_without_rowkey_check(table, run_ctx.store_ctx_, *run_ctx.col_descs_, tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("fail to insert row to data tablet", K(ret), K(tbl_row)); } } } if (OB_ERR_PRIMARY_KEY_DUPLICATE == ret && !run_ctx.dml_param_.is_ignore_) { int tmp_ret = OB_SUCCESS; char rowkey_buffer[OB_TMP_BUF_SIZE_256]; ObString index_name = "PRIMARY"; if (OB_TMP_FAIL(extract_rowkey(table, rows_info.get_duplicate_rowkey(), rowkey_buffer, OB_TMP_BUF_SIZE_256, run_ctx.dml_param_.tz_info_))) { LOG_WARN("failed to extract rowkey", K(ret), K(tmp_ret)); } if (table.is_index_table()) { // maybe here data table is a global index table if (OB_TMP_FAIL(table.get_index_name(index_name))) { LOG_WARN("get_index_name failed", K(tmp_ret)); } } else if (lib::is_oracle_mode() && OB_TMP_FAIL(table.get_primary_key_name(index_name))) { LOG_WARN("failed to get pk name", K(ret), K(tmp_ret)); } LOG_USER_ERROR(OB_ERR_PRIMARY_KEY_DUPLICATE, rowkey_buffer, index_name.length(), index_name.ptr()); } return ret; } int ObLSTabletService::insert_lob_col( ObDMLRunningCtx &run_ctx, const ObColDesc &column, ObObj &obj, ObLobAccessParam *del_param, ObLobCommon *lob_common) { int ret = OB_SUCCESS; uint64_t lob_id; ObLobManager *lob_mngr = MTL(ObLobManager*); if (OB_ISNULL(lob_mngr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]failed to get lob manager handle.", K(ret)); } else if (!column.col_type_.is_lob_v2() || obj.is_nop_value() || obj.is_null()) { // do nothing } else { ObString data = obj.get_string(); // init lob access param ObLobAccessParam lob_param; lob_param.tx_desc_ = run_ctx.store_ctx_.mvcc_acc_ctx_.tx_desc_; lob_param.snapshot_ = run_ctx.dml_param_.snapshot_; lob_param.is_total_quantity_log_ = run_ctx.dml_param_.is_total_quantity_log_; if (lob_param.snapshot_.is_none_read()) { // NOTE: // lob_insert need table_scan, the snapshot already generated in // run_ctx.store_ctx, use it as an LS ReadSnapshot lob_param.snapshot_.init_ls_read(run_ctx.store_ctx_.ls_id_, run_ctx.store_ctx_.mvcc_acc_ctx_.snapshot_); } lob_param.tx_id_ = lob_param.tx_desc_->get_tx_id(); lob_param.sql_mode_ = run_ctx.dml_param_.sql_mode_; lob_param.ls_id_ = run_ctx.store_ctx_.ls_id_; lob_param.tablet_id_ = run_ctx.relative_table_.get_tablet_id(); lob_param.coll_type_ = column.col_type_.get_collation_type(); lob_param.allocator_ = &run_ctx.lob_allocator_; lob_param.lob_common_ = lob_common; if (OB_NOT_NULL(del_param)) { lob_param.handle_size_ = del_param->handle_size_; lob_param.checksum_ = del_param->checksum_; lob_param.total_seq_cnt_ = del_param->total_seq_cnt_; lob_param.used_seq_cnt_ = del_param->used_seq_cnt_; lob_param.seq_no_st_ = del_param->seq_no_st_; } lob_param.timeout_ = run_ctx.dml_param_.timeout_; lob_param.scan_backward_ = false; lob_param.offset_ = 0; lob_param.len_ = ObCharset::strlen_char(lob_param.coll_type_, data.ptr(), data.length()); if (OB_FAIL(lob_mngr->append(lob_param, data))) { LOG_WARN("[STORAGE_LOB]lob append failed.", K(ret)); } else { ObLobCommon *res_lob_common = lob_param.lob_common_; obj.set_lob_value(obj.get_type(), res_lob_common, res_lob_common->get_handle_size(lob_param.byte_size_)); LOG_DEBUG("[STORAGE_LOB]write ob lob data.", K(lob_param), KPC(res_lob_common), K(res_lob_common->get_handle_size(lob_param.byte_size_)), K(column.col_type_.get_collation_type())); } } return ret; } int ObLSTabletService::check_lob_tablet_valid(ObTabletHandle &data_tablet) { int ret = OB_SUCCESS; bool is_valid_aux_lob_table = false; ObTabletBindingInfo ddl_data; if (OB_FAIL(data_tablet.get_obj()->get_ddl_data(ddl_data))) { LOG_WARN("failed to get ddl data from tablet", K(ret), K(data_tablet)); } else { is_valid_aux_lob_table = ddl_data.lob_meta_tablet_id_.is_valid() && ddl_data.lob_piece_tablet_id_.is_valid(); if (!is_valid_aux_lob_table) { ObTabletTxMultiSourceDataUnit tx_data; if (OB_FAIL(data_tablet.get_obj()->get_tx_data(tx_data))) { LOG_WARN("fail to get tx data", K(ret), K(data_tablet)); } else if (tx_data.is_in_tx()) { ret = OB_SCHEMA_EAGAIN; LOG_WARN("do retry for data tablet is in tx, maybe wait for lob tablet finish", K(ret), K(ddl_data)); } else { // maybe has committed, refresh binding info and check once if (OB_FAIL(data_tablet.get_obj()->get_ddl_data(ddl_data))) { LOG_WARN("failed to get ddl data from tablet", K(ret), K(data_tablet)); } else { is_valid_aux_lob_table = ddl_data.lob_meta_tablet_id_.is_valid() && ddl_data.lob_piece_tablet_id_.is_valid(); if (!is_valid_aux_lob_table) { ret = OB_ERR_UNEXPECTED; LOG_WARN("aux lob table must valid when lob column exist", K(ret), K(ddl_data)); } } } } } return ret; } int ObLSTabletService::insert_lob_tablet_row( ObTabletHandle &data_tablet, ObDMLRunningCtx &run_ctx, ObStoreRow &row) { int ret = OB_SUCCESS; int64_t col_cnt = run_ctx.col_descs_->count(); ObLobManager *lob_mngr = MTL(ObLobManager*); bool check_lob = false; if (OB_ISNULL(lob_mngr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]failed to get lob manager handle.", K(ret)); } else if (row.row_val_.count_ != col_cnt) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]column count invalid", K(ret), K(col_cnt), K(row.row_val_.count_)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { const ObColDesc &column = run_ctx.col_descs_->at(i); ObObj &obj = row.row_val_.get_cell(i); if (!column.col_type_.is_lob_v2() || obj.is_null() || obj.is_nop_value()) { // do nothing } else { if (!check_lob) { if (OB_FAIL(check_lob_tablet_valid(data_tablet))) { LOG_WARN("failed to check_lob_tablet_valid", K(ret), K(data_tablet)); } else { check_lob = true; } } if (OB_FAIL(ret)) { } else if(OB_FAIL(insert_lob_col(run_ctx, column, obj, nullptr, nullptr))) { LOG_WARN("[STORAGE_LOB]failed to insert lob col.", K(ret), K(row), K(i)); } } } } return ret; } int ObLSTabletService::insert_lob_tablet_rows( ObTabletHandle &data_tablet, ObDMLRunningCtx &run_ctx, ObStoreRow *rows, int64_t row_count) { int ret = OB_SUCCESS; // DEBUG_SYNC(DELAY_INDEX_WRITE); ObLobManager *lob_mngr = MTL(ObLobManager*); if (OB_ISNULL(lob_mngr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]failed to get lob manager handle.", K(ret)); } else { int64_t col_cnt = run_ctx.col_descs_->count(); for (int64_t k = 0; OB_SUCC(ret) && k < row_count; k++) { if (OB_FAIL(insert_lob_tablet_row(data_tablet, run_ctx, rows[k]))) { LOG_WARN("[STORAGE_LOB]failed to insert lob row.", K(ret)); } } } return ret; } int ObLSTabletService::extract_rowkey( const ObRelativeTable &table, const blocksstable::ObDatumRowkey &rowkey, char *buffer, const int64_t buffer_len, const ObTimeZoneInfo *tz_info) { int ret = OB_SUCCESS; common::ObSEArray rowkey_cols; ObStoreRowkey store_rowkey; ObDatumRowkeyHelper rowkey_helper; if (OB_FAIL(table.get_rowkey_column_ids(rowkey_cols))) { STORAGE_LOG(WARN, "Failed to get rowkey cols", K(ret), K(table)); } else if (OB_FAIL(rowkey_helper.convert_store_rowkey(rowkey, rowkey_cols, store_rowkey))) { STORAGE_LOG(WARN, "Failed to convert store rowkey", K(ret), K(rowkey)); } else { ret = extract_rowkey(table, store_rowkey, buffer, buffer_len, tz_info); } return ret; } int ObLSTabletService::extract_rowkey( const ObRelativeTable &table, const common::ObStoreRowkey &rowkey, char *buffer, const int64_t buffer_len, const ObTimeZoneInfo *tz_info) { int ret = OB_SUCCESS; if (!table.is_valid() || !rowkey.is_valid() || OB_ISNULL(buffer) || buffer_len <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(table), K(rowkey), K(buffer), K(buffer_len), K(tz_info)); } else { const int64_t rowkey_size = table.get_rowkey_column_num(); int64_t pos = 0; int64_t valid_rowkey_size = 0; uint64_t column_id = OB_INVALID_ID; for (int64_t i = 0; OB_SUCC(ret) && i < rowkey_size; i++) { if (OB_FAIL(table.get_rowkey_col_id_by_idx(i, column_id))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Failed to get rowkey column description", K(i), K(ret)); } else if (column_id <= OB_MIN_SHADOW_COLUMN_ID) { valid_rowkey_size ++; } } for (int64_t i = 0; OB_SUCC(ret) && i < valid_rowkey_size; ++i) { const ObObj &obj = rowkey.get_obj_ptr()[i]; if (OB_FAIL(obj.print_plain_str_literal(buffer, buffer_len - 1, pos, tz_info))) { LOG_WARN("fail to print_plain_str_literal", K(ret)); } else if (i < valid_rowkey_size - 1) { if (OB_FAIL(databuff_printf(buffer, buffer_len - 1, pos, "-"))) { LOG_WARN("databuff print failed", K(ret)); } } } if (buffer != nullptr) { buffer[pos++] = '\0'; } } return ret; } int ObLSTabletService::get_next_rows( ObNewRowIterator *row_iter, ObNewRow *&rows, int64_t &row_count) { row_count = 1; return row_iter->get_next_rows(rows, row_count); } int ObLSTabletService::construct_update_idx( const int64_t schema_rowkey_cnt, const share::schema::ColumnMap *col_map, const common::ObIArray &upd_col_ids, UpdateIndexArray &update_idx) { int ret = OB_SUCCESS; int err = OB_SUCCESS; if (OB_ISNULL(col_map) || upd_col_ids.count() <= 0 || update_idx.count() > 0 || schema_rowkey_cnt <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(col_map), K(upd_col_ids), K(upd_col_ids.count()), K(schema_rowkey_cnt)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < upd_col_ids.count(); ++i) { int32_t idx = -1; const uint64_t &col_id = upd_col_ids.at(i); if (OB_SUCCESS != (err = col_map->get(col_id, idx)) || idx < 0) { ret = OB_ERR_UNEXPECTED; LOG_WARN("column id doesn't exist", K(ret), K(col_id), K(err)); } else if (idx < schema_rowkey_cnt) { // update_idx should not contain rowkey } else if (OB_FAIL(update_idx.push_back(idx))) { LOG_WARN("fail to push idx into update_idx", K(ret), K(idx)); } } if (OB_SUCC(ret) && update_idx.count() > 1) { std::sort(update_idx.begin(), update_idx.end()); } } return ret; } int ObLSTabletService::check_rowkey_change( const ObIArray &update_ids, const ObRelativeTable &relative_table, bool &rowkey_change, bool &delay_new) { int ret = OB_SUCCESS; if (OB_UNLIKELY(update_ids.count() <= 0 || !relative_table.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(update_ids), K(ret)); } else { const int64_t count = update_ids.count(); bool is_rowkey = false; rowkey_change = false; delay_new = false; for (int64_t i = 0; OB_SUCC(ret) && i < count && !rowkey_change; ++i) { if (OB_FAIL(relative_table.is_rowkey_column_id(update_ids.at(i), is_rowkey))) { LOG_WARN("check is_rowkey fail", K(ret), K(update_ids.at(i))); } else { rowkey_change = is_rowkey; } } if (OB_FAIL(ret)) { // do nothing } else if (relative_table.is_unique_index() && !rowkey_change) { uint64_t cid = OB_INVALID_ID; bool innullable = true; for (int64_t j = 0; OB_SUCC(ret) && j < relative_table.get_rowkey_column_num() && !rowkey_change; ++j) { if (OB_FAIL(relative_table.get_rowkey_col_id_by_idx(j, cid))) { LOG_WARN("get rowkey column id fail", K(ret), K(j)); } else if (cid > OB_MIN_SHADOW_COLUMN_ID) { if (innullable) { break; // other_change } else { cid -= OB_MIN_SHADOW_COLUMN_ID; for (int64_t k = 0; OB_SUCC(ret) && k < count; ++k) { if (cid == update_ids.at(k)) { rowkey_change = true; break; } } } } else { bool is_nullable = false; if (OB_FAIL(relative_table.is_column_nullable_for_write(cid, is_nullable))) { LOG_WARN("check nullable fail", K(ret), K(cid), K(relative_table)); } else if (is_nullable) { innullable = false; } } } } if (OB_SUCC(ret) && rowkey_change) { delay_new = true; } LOG_DEBUG("check rowkey change for update", K(rowkey_change), K(delay_new), K(update_ids), K(relative_table)); } return ret; } int ObLSTabletService::check_rowkey_value_change( const common::ObNewRow &old_row, const common::ObNewRow &new_row, const int64_t rowkey_len, bool &rowkey_change) { int ret = OB_SUCCESS; if (OB_UNLIKELY(rowkey_len <= 0) || OB_UNLIKELY(old_row.count_ < rowkey_len) || OB_UNLIKELY(new_row.count_ < rowkey_len)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(old_row), K(new_row), K(rowkey_len), K(ret)); } else { rowkey_change = false; for (int64_t i = 0; !rowkey_change && i < rowkey_len; ++i) { if (old_row.cells_[i] != new_row.cells_[i]) { rowkey_change = true; } } } LOG_DEBUG("check rowkey value for update", K(rowkey_change), K(old_row), K(new_row)); return ret; } int ObLSTabletService::process_lob_row( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const ObIArray &update_idx, bool data_tbl_rowkey_change, ObStoreRow &old_sql_row, ObStoreRow &old_row, ObStoreRow &new_row) { int ret = OB_SUCCESS; bool check_lob = false; if (OB_UNLIKELY(old_row.row_val_.get_count() != new_row.row_val_.get_count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("[STORAGE_LOB]invalid args", K(old_row), K(new_row), K(ret)); } else if (OB_UNLIKELY(old_row.row_val_.get_count() != run_ctx.col_descs_->count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("[STORAGE_LOB]invalid args", K(old_row), K(new_row), KPC(run_ctx.col_descs_)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < old_row.row_val_.get_count(); ++i) { if (run_ctx.col_descs_->at(i).col_type_.is_lob_v2()) { ObObj &old_obj = old_row.row_val_.get_cell(i); ObObj &old_sql_obj = old_sql_row.row_val_.get_cell(i); ObObj &new_obj = new_row.row_val_.get_cell(i); bool is_update = false; for (int64_t j = 0; !is_update && j < update_idx.count(); ++j) { if (update_idx.at(j) == i) { is_update = true; } } if (is_update) { ObLobCommon *lob_common = nullptr; ObLobAccessParam lob_param; lob_param.update_len_ = new_obj.get_string_len(); if (!check_lob) { ObTabletBindingInfo ddl_data; if (OB_FAIL(check_lob_tablet_valid(tablet_handle))) { LOG_WARN("failed to check_lob_tablet_valid", K(ret), K(tablet_handle)); } else { check_lob = true; } } if (OB_FAIL(ret)) { } else if (OB_FAIL(delete_lob_col(run_ctx, run_ctx.col_descs_->at(i), old_obj, old_sql_obj, lob_common, lob_param))) { LOG_WARN("[STORAGE_LOB]failed to erase old lob col", K(ret), K(old_sql_row), K(old_row)); } else if (OB_FAIL(insert_lob_col(run_ctx, run_ctx.col_descs_->at(i), new_obj, &lob_param, lob_common))) { LOG_WARN("[STORAGE_LOB]failed to insert new lob col.", K(ret), K(new_row)); } } else { if (old_obj.is_null()) { new_obj.set_null(); } else if (old_obj.is_nop_value()) { new_obj.set_nop_value(); } else if (new_obj.is_nop_value() || new_obj.is_null()) { // do nothing } else { ObString val_str = old_obj.get_string(); ObLobCommon *lob_common = reinterpret_cast(val_str.ptr()); if (!lob_common->in_row_ && data_tbl_rowkey_change) { if (val_str.length() < ObLobManager::LOB_OUTROW_HEADER_SIZE) { ret = OB_ERR_UNEXPECTED; LOG_WARN("not enough space for lob header", K(ret), K(val_str)); } else { char *buf = reinterpret_cast(run_ctx.lob_allocator_.alloc(val_str.length())); if (OB_ISNULL(buf)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc memory failed.", K(ret), K(val_str)); } else { MEMCPY(buf, val_str.ptr(), val_str.length()); lob_common = reinterpret_cast(buf); ObLobData *lob_data = reinterpret_cast(lob_common->buffer_); ObLobDataOutRowCtx *ctx = reinterpret_cast(lob_data->buffer_); ctx->op_ = ObLobDataOutRowCtx::OpType::EMPTY_SQL; new_obj.set_string(new_obj.get_type(), buf, val_str.length()); } } } else { new_obj.set_string(new_obj.get_type(), val_str); } } } } } } return ret; } int ObLSTabletService::update_row_to_tablet( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const bool rowkey_change, const ObIArray &update_idx, const bool delay_new, const bool lob_update, ObStoreRow &old_tbl_row, ObStoreRow &new_tbl_row, ObRowStore *row_store, bool &duplicate) { int ret = OB_SUCCESS; const ObDMLBaseParam &dml_param = run_ctx.dml_param_; const ObColDescIArray &col_descs = *run_ctx.col_descs_; bool data_tbl_rowkey_change = false; int64_t data_tbl_rowkey_len = run_ctx.relative_table_.get_rowkey_column_num(); ObSQLMode sql_mode = dml_param.sql_mode_; duplicate = false; ObStoreRow old_sql_tbl_row; old_sql_tbl_row.row_val_ = old_tbl_row.row_val_; if (OB_UNLIKELY(col_descs.count() != old_tbl_row.row_val_.get_count() || col_descs.count() != new_tbl_row.row_val_.get_count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(col_descs.count()), K(old_tbl_row.row_val_), K(new_tbl_row.row_val_)); } else if (rowkey_change && OB_FAIL(check_rowkey_value_change(old_tbl_row.row_val_, new_tbl_row.row_val_, data_tbl_rowkey_len, data_tbl_rowkey_change))) { LOG_WARN("failed to check data table rowkey change", K(ret), K(old_tbl_row), K(new_tbl_row), K(data_tbl_rowkey_len)); } else if (OB_FAIL(process_old_row(tablet_handle, run_ctx, data_tbl_rowkey_change, lob_update, old_tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("fail to process old row", K(ret), K(*run_ctx.col_descs_), K(old_tbl_row), K(data_tbl_rowkey_change)); } } else if (OB_FAIL(process_lob_row(tablet_handle, run_ctx, update_idx, data_tbl_rowkey_change, old_sql_tbl_row, old_tbl_row, new_tbl_row))) { LOG_WARN("failed to process lob col change", K(ret), K(old_tbl_row), K(new_tbl_row)); } else if (delay_new && lib::is_oracle_mode()) { // if total quantity log is needed, we should cache both new row and old row, // and the sequence is new_row1, old_row1, new_row2, old_row2...., // if total quantity log isn't needed, just cache new row if (OB_ISNULL(row_store)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("row_store is NULL", K(ret)); } else if (OB_FAIL(row_store->add_row(new_tbl_row.row_val_))) { LOG_WARN("failed to store new row", K(new_tbl_row), K(ret)); } else if (OB_FAIL(row_store->add_row(old_tbl_row.row_val_))) { LOG_WARN("failed to store old row", K(old_tbl_row), K(ret)); } else { LOG_DEBUG("add row store for delay new", K(old_tbl_row), K(new_tbl_row)); } } else if (OB_FAIL(process_new_row(tablet_handle, run_ctx, update_idx, old_tbl_row, new_tbl_row, data_tbl_rowkey_change))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("fail to process new row", K(new_tbl_row), K(ret)); } } return ret; } int ObLSTabletService::process_old_row( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const bool data_tbl_rowkey_change, const bool lob_update, ObStoreRow &tbl_row) { int ret = OB_SUCCESS; ObStoreCtx &store_ctx = run_ctx.store_ctx_; ObRelativeTable &relative_table = run_ctx.relative_table_; bool is_delete_total_quantity_log = run_ctx.dml_param_.is_total_quantity_log_; if (OB_UNLIKELY(!relative_table.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid relative tables", K(ret), K(relative_table)); } else if (OB_UNLIKELY(!store_ctx.is_valid() || nullptr == run_ctx.col_descs_ || run_ctx.col_descs_->count() <= 0 || !tbl_row.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(store_ctx), KP(run_ctx.col_descs_), K(tbl_row), K(is_delete_total_quantity_log)); } else if (OB_FAIL(check_old_row_legitimacy(tablet_handle, run_ctx, tbl_row.row_val_))) { if (OB_ERR_DEFENSIVE_CHECK == ret) { dump_diag_info_for_old_row_loss(relative_table, store_ctx, tbl_row); } LOG_WARN("check old row legitimacy failed", K(tbl_row.row_val_)); } else if (OB_FAIL(process_old_row_lob_col(tablet_handle, run_ctx, tbl_row))){ LOG_WARN("failed to process old row lob col", K(ret), K(tbl_row)); } else { ObColDescIArray &col_descs = const_cast(*run_ctx.col_descs_); const uint64_t &table_id = relative_table.get_table_id(); int64_t rowkey_size = relative_table.get_rowkey_column_num(); ObStoreRowkey store_rowkey; ObDatumRowkey datum_rowkey; ObDatumRowkeyHelper rowkey_helper(run_ctx.allocator_); if (OB_UNLIKELY(run_ctx.dml_param_.prelock_)) { bool locked = false; if (OB_FAIL(store_rowkey.assign(tbl_row.row_val_.cells_, rowkey_size))) { LOG_WARN("Failed to assign rowkey", K(ret), K(tbl_row), K(rowkey_size)); } else if (OB_FAIL(rowkey_helper.convert_datum_rowkey(store_rowkey.get_rowkey(), datum_rowkey))) { STORAGE_LOG(WARN, "Failed to transfer datum rowkey", K(ret), K(store_rowkey), K(rowkey_size), K(tbl_row)); } else if (OB_FAIL(check_row_locked_by_myself(tablet_handle, relative_table, store_ctx, datum_rowkey, locked))) { LOG_WARN("fail to check row locked", K(ret), K(tbl_row)); } else if (!locked) { ret = OB_ERR_ROW_NOT_LOCKED; LOG_DEBUG("row has not been locked", K(ret), K(tbl_row)); } } if (OB_FAIL(ret)) { } else if (data_tbl_rowkey_change) { ObStoreRow del_row(tbl_row); del_row.flag_.set_flag(ObDmlFlag::DF_DELETE); if (!is_delete_total_quantity_log) { if (OB_FAIL(tablet_handle.get_obj()->insert_row_without_rowkey_check(relative_table, run_ctx.store_ctx_, col_descs, del_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to write data tablet row", K(ret), K(del_row)); } } } else { ObStoreRow new_tbl_row; new_tbl_row.flag_.set_flag(ObDmlFlag::DF_DELETE); new_tbl_row.row_val_ = tbl_row.row_val_; del_row.flag_.set_flag(ObDmlFlag::DF_UPDATE); ObSEArray update_idx; if (OB_FAIL(tablet_handle.get_obj()->update_row(relative_table, run_ctx.store_ctx_, col_descs, update_idx, del_row, new_tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to write data tablet row", K(ret), K(del_row), K(new_tbl_row)); } } } } else if (lob_update) { // need to lock main table rows that don't need to be deleted if (OB_FAIL(store_rowkey.assign(tbl_row.row_val_.cells_, rowkey_size))) { LOG_WARN("Failed to assign rowkey", K(ret), K(tbl_row), K(rowkey_size)); } else if (OB_FAIL(rowkey_helper.convert_datum_rowkey(store_rowkey.get_rowkey(), datum_rowkey))) { STORAGE_LOG(WARN, "Failed to transfer datum rowkey", K(ret), K(store_rowkey)); } else if (OB_FAIL(tablet_handle.get_obj()->lock_row(relative_table, store_ctx, datum_rowkey))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("lock row failed", K(ret), K(table_id), K(tbl_row), K(rowkey_size)); } } LOG_DEBUG("generate lock node before update lob columns", K(ret), K(table_id), K(tbl_row)); } } return ret; } int ObLSTabletService::process_new_row( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const common::ObIArray &update_idx, const ObStoreRow &old_tbl_row, const ObStoreRow &new_tbl_row, const bool rowkey_change) { int ret = OB_SUCCESS; if (OB_UNLIKELY(update_idx.count() < 0 || !new_tbl_row.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(update_idx), K(new_tbl_row), K(rowkey_change)); } else if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_legitimacy(run_ctx, new_tbl_row.row_val_))) { LOG_WARN("check new row legitimacy failed", K(ret), K(new_tbl_row.row_val_)); } else { // write full column clog needs to construct update_idx and pass to memtable if (OB_FAIL(process_data_table_row(tablet_handle, run_ctx, update_idx, old_tbl_row, new_tbl_row, rowkey_change))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("fail to process data table row", K(ret), K(update_idx), K(old_tbl_row), K(new_tbl_row), K(rowkey_change)); } } } return ret; } int ObLSTabletService::process_data_table_row( ObTabletHandle &data_tablet, ObDMLRunningCtx &run_ctx, const ObIArray &update_idx, const ObStoreRow &old_tbl_row, const ObStoreRow &new_tbl_row, const bool rowkey_change) { int ret = OB_SUCCESS; ObStoreCtx &ctx = run_ctx.store_ctx_; ObRelativeTable &relative_table = run_ctx.relative_table_; bool is_update_total_quantity_log = run_ctx.dml_param_.is_total_quantity_log_; const common::ObTimeZoneInfo *tz_info = run_ctx.dml_param_.tz_info_; if (OB_UNLIKELY(!ctx.is_valid() || !relative_table.is_valid() || nullptr == run_ctx.col_descs_ || run_ctx.col_descs_->count() <= 0 || update_idx.count() < 0 || (is_update_total_quantity_log && !old_tbl_row.is_valid()) || !new_tbl_row.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ctx), KP(run_ctx.col_descs_), K(update_idx), K(old_tbl_row), K(new_tbl_row), K(is_update_total_quantity_log), K(rowkey_change)); } else { const ObColDescIArray &col_descs = *run_ctx.col_descs_; bool exists = false; if (rowkey_change && OB_FAIL(data_tablet.get_obj()->rowkey_exists( relative_table, ctx, new_tbl_row.row_val_, exists))) { LOG_WARN("failed to check rowkey exists", K(ret), K(new_tbl_row)); } else if (exists) { char buffer[OB_TMP_BUF_SIZE_256]; ObStoreRowkey rowkey; ret = OB_ERR_PRIMARY_KEY_DUPLICATE; if (OB_SUCCESS != rowkey.assign(new_tbl_row.row_val_.cells_, relative_table.get_rowkey_column_num())) { LOG_WARN("Failed to assign rowkey", K(new_tbl_row)); } else if (OB_SUCCESS != extract_rowkey(relative_table, rowkey, buffer, OB_TMP_BUF_SIZE_256, tz_info)) { LOG_WARN("extract rowkey failed", K(rowkey)); } else { ObString index_name = "PRIMARY"; if (relative_table.is_index_table()) { relative_table.get_index_name(index_name); } LOG_USER_ERROR(OB_ERR_PRIMARY_KEY_DUPLICATE, buffer, index_name.length(), index_name.ptr()); } LOG_WARN("rowkey already exists", K(ret), K(new_tbl_row)); } else { ObStoreRow new_row; new_row.flag_.set_flag(rowkey_change ? ObDmlFlag::DF_INSERT : ObDmlFlag::DF_UPDATE); new_row.row_val_ = new_tbl_row.row_val_; if (is_update_total_quantity_log && !rowkey_change) { ObStoreRow old_row; old_row.flag_.set_flag(ObDmlFlag::DF_UPDATE); old_row.row_val_ = old_tbl_row.row_val_; if (OB_FAIL(data_tablet.get_obj()->update_row(relative_table, ctx, col_descs, update_idx, old_row, new_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to update to row", K(ret), K(old_row), K(new_row)); } } } else { if (OB_FAIL(data_tablet.get_obj()->insert_row_without_rowkey_check(relative_table, ctx, col_descs, new_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to update to row", K(ret), K(new_row)); } } } } } return ret; } int ObLSTabletService::check_new_row_nullable_value( const ObIArray &column_ids, ObRelativeTable &data_table, const ObNewRow &new_row) { int ret = OB_SUCCESS; if (OB_UNLIKELY(column_ids.count() != new_row.get_count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("new row is invalid", K(ret), K(new_row.get_count()), K(column_ids.count())); } for (int64_t i = 0; OB_SUCC(ret) && i < column_ids.count(); ++i) { uint64_t column_id = column_ids.at(i); bool is_nullable = false; if (OB_UNLIKELY(is_shadow_column(column_id))) { //the shadow pk is generated internally, //and the nullable attribute check for it is skipped } else if (OB_FAIL(data_table.is_column_nullable_for_write(column_id, is_nullable))) { LOG_WARN("check is_column_nullable_for_write failed", K(ret), K(column_id)); } else if (new_row.get_cell(i).is_null() && !is_nullable) { bool is_hidden = false; bool is_gen_col = false; bool is_nullable_for_read = false; if (OB_FAIL(data_table.is_column_nullable_for_read(column_id, is_nullable_for_read))) { LOG_WARN("check is nullable for read failed", K(ret)); } else if (is_nullable_for_read) { LOG_TRACE("Catch a defensive nullable error, but this column is not null novalidate", K(column_id), K(column_ids), K(new_row), K(data_table)); } else if (OB_FAIL(data_table.is_hidden_column(column_id, is_hidden))) { LOG_WARN("get is hidden column failed", K(ret), K(column_id)); } else if (OB_FAIL(data_table.is_gen_column(column_id, is_gen_col))) { LOG_WARN("get is gen column failed", K(ret), K(column_id)); } else if (is_hidden && !is_gen_col) { ret = OB_BAD_NULL_ERROR; LOG_WARN("Catch a defensive nullable error, " "maybe cause by add column not null default null ONLINE", K(ret), K(column_id), K(column_ids), K(new_row), K(data_table)); } else { ret = OB_ERR_DEFENSIVE_CHECK; ObString func_name = ObString::make_string("check_new_row_nullable_value"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), K(column_id), K(column_ids), K(new_row), K(data_table)); } } else if (new_row.get_cell(i).is_number()) { number::ObNumber num; if (OB_FAIL(new_row.get_cell(i).get_number(num))) { LOG_WARN("get number value from object fail", K(ret), K(new_row.get_cell(i))); } else if (OB_FAIL(num.sanity_check())) { LOG_WARN("sanity check number failed", K(ret), K(new_row.get_cell(i))); } if (OB_SUCCESS != ret) { ret = OB_ERR_DEFENSIVE_CHECK; ObString func_name = ObString::make_string("check_new_row_nullable_value"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), K(column_id), K(column_ids), K(new_row), K(data_table)); } } } return ret; } int ObLSTabletService::check_new_row_nullable_value(const ObIArray &col_descs, ObRelativeTable &relative_table, const ObNewRow &new_row) { int ret = OB_SUCCESS; if (OB_UNLIKELY(col_descs.count() > new_row.get_count())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("new row is invalid", K(ret), K(new_row.get_count()), K(col_descs.count())); } for (int64_t i = 0; OB_SUCC(ret) && i < col_descs.count(); ++i) { uint64_t column_id = col_descs.at(i).col_id_; bool is_nullable = false; if (OB_UNLIKELY(is_shadow_column(column_id))) { //the shadow pk is generated internally, //and the nullable attribute check for it is skipped } else if (OB_FAIL(relative_table.is_column_nullable_for_write(column_id, is_nullable))) { LOG_WARN("check is_column_nullable_for_write failed", K(ret), K(column_id)); } else if (new_row.get_cell(i).is_null() && !is_nullable) { bool is_hidden = false; bool is_gen_col = false; bool is_nullable_for_read = false; if (OB_FAIL(relative_table.is_column_nullable_for_read(column_id, is_nullable_for_read))) { LOG_WARN("check is nullable for read failed", K(ret)); } else if (is_nullable_for_read) { //this column is not null novalidate, maybe the null column come from the old data //so output trace log and ignore it LOG_TRACE("Catch a defensive nullable error, but this column is not null novalidate", K(column_id), K(col_descs), K(new_row), K(relative_table)); } else if (OB_FAIL(relative_table.is_hidden_column(column_id, is_hidden))) { LOG_WARN("get is hidden column failed", K(ret), K(column_id)); } else if (OB_FAIL(relative_table.is_gen_column(column_id, is_gen_col))) { LOG_WARN("get is gen column failed", K(ret), K(column_id)); } else if (is_hidden && !is_gen_col) { ret = OB_BAD_NULL_ERROR; LOG_WARN("Catch a defensive nullable error, " "maybe cause by add column not null default null ONLINE", K(ret), K(column_id), K(col_descs), K(new_row), K(relative_table)); } else { ret = OB_ERR_DEFENSIVE_CHECK; ObString func_name = ObString::make_string("check_new_row_nullable_value"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), K(column_id), K(col_descs), K(new_row), K(relative_table)); } } else if (new_row.get_cell(i).is_number()) { number::ObNumber num; if (OB_FAIL(new_row.get_cell(i).get_number(num))) { LOG_WARN("get number value from object fail", K(ret), K(new_row.get_cell(i))); } else if (OB_FAIL(num.sanity_check())) { LOG_WARN("sanity check number failed", K(ret), K(new_row.get_cell(i))); } if (OB_SUCCESS != ret) { ret = OB_ERR_DEFENSIVE_CHECK; ObString func_name = ObString::make_string("check_new_row_nullable_value"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), K(column_id), K(col_descs), K(new_row), K(relative_table)); } } } return ret; } int ObLSTabletService::check_new_row_shadow_pk( const ObIArray &column_ids, ObRelativeTable &data_table, const ObNewRow &new_row) { int ret = OB_SUCCESS; if (data_table.get_shadow_rowkey_column_num() > 0) { //check shadow pk int64_t rowkey_cnt = data_table.get_rowkey_column_num(); int64_t spk_cnt = data_table.get_shadow_rowkey_column_num(); int64_t index_col_cnt = rowkey_cnt - spk_cnt; bool need_spk = false; if (OB_UNLIKELY(index_col_cnt <= 0) || OB_UNLIKELY(column_ids.count() < rowkey_cnt)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("index column count is invalid", K(ret), K(index_col_cnt), K(rowkey_cnt), K(spk_cnt), K(column_ids.count())); } else if (lib::is_mysql_mode()) { // mysql兼容:只要unique index key中有null列,则需要填充shadow列 bool rowkey_has_null = false; for (int64_t i = 0; !rowkey_has_null && i < index_col_cnt; i++) { rowkey_has_null = new_row.get_cell(i).is_null(); } need_spk = rowkey_has_null; } else { // oracle兼容:只有unique index key全为null列时,才需要填充shadow列 bool is_rowkey_all_null = true; for (int64_t i = 0; is_rowkey_all_null && i < index_col_cnt; i++) { is_rowkey_all_null = new_row.get_cell(i).is_null(); } need_spk = is_rowkey_all_null; } for (int64_t i = index_col_cnt; OB_SUCC(ret) && i < rowkey_cnt; ++i) { uint64_t spk_column_id = column_ids.at(i); uint64_t real_pk_id = spk_column_id - OB_MIN_SHADOW_COLUMN_ID; const ObObj &spk_value = new_row.get_cell(i); int64_t pk_idx = OB_INVALID_INDEX; int cmp = 0; if (OB_LIKELY(!need_spk)) { if (!spk_value.is_null()) { ret = OB_ERR_DEFENSIVE_CHECK; ObString func_name = ObString::make_string("check_new_row_shadow_pk"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), "column_id", column_ids, K(new_row), K(data_table), K(spk_value), K(i), K(spk_column_id), K(real_pk_id)); } } else if (OB_UNLIKELY(!has_exist_in_array(column_ids, real_pk_id, &pk_idx))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("real pk column not exists in column_ids", K(ret), K(column_ids), K(real_pk_id)); } else if (OB_FAIL(new_row.get_cell(pk_idx).compare(spk_value, cmp)) || 0 != cmp) { ret = OB_ERR_DEFENSIVE_CHECK; ObString func_name = ObString::make_string("check_new_row_shadow_pk"); LOG_USER_ERROR(OB_ERR_DEFENSIVE_CHECK, func_name.length(), func_name.ptr()); LOG_ERROR("Fatal Error!!! Catch a defensive error!", K(ret), "column_id", column_ids, K(new_row), K(data_table), K(spk_value), "pk_value", new_row.get_cell(pk_idx), K(pk_idx), K(i), K(spk_column_id), K(real_pk_id)); } } } return ret; } int ObLSTabletService::check_row_locked_by_myself( ObTabletHandle &tablet_handle, ObRelativeTable &relative_table, ObStoreCtx &store_ctx, const ObDatumRowkey &rowkey, bool &locked) { int ret = OB_SUCCESS; ObTablet *tablet = tablet_handle.get_obj(); if (OB_UNLIKELY(nullptr == tablet || !relative_table.is_valid() || !store_ctx.is_valid() || !rowkey.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(tablet_handle), K(relative_table), K(store_ctx), K(rowkey)); } else { ObStorageTableGuard guard(tablet, store_ctx, true); if (OB_FAIL(guard.refresh_and_protect_table(relative_table))) { LOG_WARN("fail to protect table", K(ret), K(tablet_handle)); } else { memtable::ObMemtable *write_memtable = nullptr; const uint64_t table_id = relative_table.get_table_id(); if (OB_FAIL(tablet->prepare_memtable(relative_table, store_ctx, write_memtable))) { LOG_WARN("prepare write memtable fail", K(ret), K(relative_table)); } else if (OB_FAIL(write_memtable->check_row_locked_by_myself( store_ctx, table_id, tablet->get_full_read_info(), rowkey, locked))) { LOG_WARN("failed to lock write memtable", K(ret), K(table_id), K(rowkey)); } } } return ret; } int ObLSTabletService::get_conflict_rows( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const ObInsertFlag flag, const common::ObIArray &out_col_ids, const common::ObNewRow &row, common::ObNewRowIterator *&duplicated_rows) { TRANS_LOG(DEBUG, "get conflict rows", K(flag), K(row), K(lbt())); int ret = OB_SUCCESS; ObRelativeTable &data_table = run_ctx.relative_table_; ObArenaAllocator scan_allocator(ObModIds::OB_TABLE_SCAN_ITER); ObIAllocator *allocator = &scan_allocator; ObTablet *data_tablet = tablet_handle.get_obj(); ObDatumRowkeyHelper rowkey_helper(scan_allocator); ObDatumRowkey datum_rowkey; ObStoreRowkey rowkey; rowkey.assign(row.cells_, data_table.get_rowkey_column_num()); if (OB_ISNULL(data_tablet)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet is null", K(ret), K(tablet_handle)); } else { ObSingleRowGetter single_row_getter(*allocator, *data_tablet); if (OB_FAIL(init_single_row_getter(single_row_getter, run_ctx, out_col_ids, data_table))) { LOG_WARN("failed to init single row getter", K(ret)); } else if (OB_FAIL(rowkey_helper.convert_datum_rowkey(rowkey.get_rowkey(), datum_rowkey))) { STORAGE_LOG(WARN, "Failed to transfer datum rowkey", K(ret), K(rowkey)); } else if (OB_FAIL(single_get_row(single_row_getter, datum_rowkey, duplicated_rows))) { LOG_WARN("failed to single get row", K(ret)); } } if (OB_FAIL(ret)) { if (nullptr != duplicated_rows) { ObQueryIteratorFactory::free_insert_dup_iter(duplicated_rows); duplicated_rows = nullptr; } } return ret; } int ObLSTabletService::init_single_row_getter( ObSingleRowGetter &row_getter, ObDMLRunningCtx &run_ctx, const ObIArray &out_col_ids, ObRelativeTable &relative_table, bool skip_read_lob) { int ret = OB_SUCCESS; if (OB_FAIL(row_getter.init_dml_access_ctx(run_ctx.store_ctx_, run_ctx.dml_param_, skip_read_lob))) { LOG_WARN("init dml access ctx failed", K(ret)); } else if (OB_FAIL(row_getter.init_dml_access_param(relative_table, run_ctx.dml_param_, out_col_ids))) { LOG_WARN("init dml access param failed", K(ret)); } return ret; } int ObLSTabletService::single_get_row( ObSingleRowGetter &row_getter, const ObDatumRowkey &rowkey, ObNewRowIterator *&duplicated_rows) { int ret = OB_SUCCESS; ObNewRow *row = nullptr; if (OB_FAIL(row_getter.open(rowkey))) { LOG_WARN("init single row getter failed", K(ret)); } else if (OB_FAIL(row_getter.get_next_row(row))) { if (OB_ITER_END != ret) { LOG_WARN("get next row from single row getter failed", K(ret)); } else { ret = OB_SUCCESS; } } else if (NULL == duplicated_rows) { ObValueRowIterator *dup_iter = NULL; if (OB_ISNULL(dup_iter = ObQueryIteratorFactory::get_insert_dup_iter())) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("no memory to alloc ObValueRowIterator", K(ret)); } else { duplicated_rows = dup_iter; if (OB_FAIL(dup_iter->init(true))) { LOG_WARN("failed to initialize ObValueRowIterator", K(ret)); } } } if (OB_SUCC(ret) && row != nullptr) { ObValueRowIterator *dup_iter = static_cast(duplicated_rows); if (OB_FAIL(dup_iter->add_row(*row))) { LOG_WARN("failed to store conflict row", K(ret), K(*row)); } else { LOG_DEBUG("get conflict row", KPC(row)); } } return ret; } int ObLSTabletService::convert_row_to_rowkey( ObSingleRowGetter &index_row_getter, ObStoreRowkey &rowkey) { int ret = OB_SUCCESS; ObNewRow *row = nullptr; if (OB_FAIL(index_row_getter.get_next_row(row))) { if (OB_ITER_END != ret) { LOG_WARN("get next row from index row getter failed", K(ret)); } } else { rowkey.assign(row->cells_, row->count_); } return ret; } /* this func is an encapsulation of ObNewRowIterator->get_next_row. * 1. need_copy_cells is true, perform a cells copy, but not a deep copy. * memory for store_row.row_val.cells_ has already allocated before * this func is invoked, no need to alloc memory in this func. * 2. need_copy_cells is false, just perform an assignment, no any copy behavior, */ int ObLSTabletService::get_next_row_from_iter( ObNewRowIterator *row_iter, ObStoreRow &store_row, const bool need_copy_cells) { int ret = OB_SUCCESS; ObNewRow *row = nullptr; if (OB_ISNULL(row_iter)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(row_iter)); } else if (OB_FAIL(row_iter->get_next_row(row))) { if (OB_ITER_END != ret) { LOG_WARN("fail to iterate a row", K(ret), K(row)); } } else { if (need_copy_cells) { // in this situation, store_row.row_val has already hold mem for cells_, // no need to alloc mem here, we copy cells only. store_row.row_val_.count_ = row->count_; for (int64_t i = 0; i < row->count_; ++i) { store_row.row_val_.cells_[i] = row->cells_[i]; } } else { store_row.row_val_ = *row; } } return ret; } int ObLSTabletService::insert_row_to_tablet( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, ObStoreRow &tbl_row) { int ret = OB_SUCCESS; ObStoreCtx &store_ctx = run_ctx.store_ctx_; ObRelativeTable &relative_table = run_ctx.relative_table_; const ObColDescIArray &idx_col_descs = run_ctx.idx_col_descs_; if (OB_UNLIKELY(!store_ctx.is_valid() || !relative_table.is_valid() || nullptr == run_ctx.col_descs_ || run_ctx.col_descs_->count() <= 0 || !tbl_row.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(store_ctx), KP(run_ctx.col_descs_), K(tbl_row), K(ret)); } else if (GCONF.enable_defensive_check() && OB_FAIL(check_new_row_legitimacy(run_ctx, tbl_row.row_val_))) { LOG_WARN("check new row legitimacy failed", K(ret), K(tbl_row.row_val_)); } else if (OB_FAIL(insert_lob_tablet_row(tablet_handle, run_ctx, tbl_row))) { LOG_WARN("failed to write lob tablets rows", K(ret)); } else { const ObColDescIArray &col_descs = *run_ctx.col_descs_; if (OB_FAIL(tablet_handle.get_obj()->insert_row_without_rowkey_check( relative_table, store_ctx, col_descs, tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret) { LOG_WARN("failed to write table row", K(ret), "table id", relative_table.get_table_id(), K(col_descs), K(tbl_row)); } } } return ret; } int ObLSTabletService::process_old_row_lob_col( ObTabletHandle &data_tablet_handle, ObDMLRunningCtx &run_ctx, ObStoreRow &tbl_row) { int ret = OB_SUCCESS; run_ctx.is_old_row_valid_for_lob_ = false; bool has_lob_col = false; int64_t col_cnt = run_ctx.col_descs_->count(); if (tbl_row.row_val_.count_ != col_cnt) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]Invliad row col cnt", K(ret), K(col_cnt), K(tbl_row)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { const ObColDesc &column = run_ctx.col_descs_->at(i); if (column.col_type_.is_lob_v2()) { has_lob_col = true; break; } } } if (OB_SUCC(ret) && has_lob_col) { if (OB_FAIL(table_refresh_row(data_tablet_handle, run_ctx, tbl_row.row_val_))) { LOG_WARN("[STORAGE_LOB]re-read lob col failed", K(ret)); } } return ret; } int ObLSTabletService::table_refresh_row( ObTabletHandle &data_tablet_handle, ObDMLRunningCtx &run_ctx, ObNewRow &row) { int ret = OB_SUCCESS; ObArenaAllocator scan_allocator(ObModIds::OB_LOB_ACCESS_BUFFER); ObTablet *data_tablet = data_tablet_handle.get_obj(); ObRelativeTable &data_table = run_ctx.relative_table_; ObStoreRowkey rowkey; if (OB_FAIL(rowkey.assign(row.cells_, data_table.get_rowkey_column_num()))) { LOG_WARN("get rowkey col num failed", K(ret)); } int64_t col_cnt = run_ctx.col_descs_->count(); ObSEArray out_col_ids; for (int i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { if (OB_FAIL(out_col_ids.push_back(run_ctx.col_descs_->at(i).col_id_))) { LOG_WARN("push col id failed.", K(ret), K(i)); } } ObDatumRowkey datum_rowkey; ObDatumRowkeyHelper rowkey_helper(scan_allocator); ObSingleRowGetter single_row_getter(scan_allocator, *data_tablet); if (OB_FAIL(ret)) { } else if (OB_FAIL(init_single_row_getter(single_row_getter, run_ctx, out_col_ids, data_table, true))) { LOG_WARN("failed to init single row getter", K(ret)); } else if (OB_FAIL(rowkey_helper.convert_datum_rowkey(rowkey.get_rowkey(), datum_rowkey))) { LOG_WARN("Failed to transfer datum rowkey", K(ret), K(rowkey)); } else { ObNewRow *new_row = nullptr; if (OB_FAIL(single_row_getter.open(datum_rowkey))) { LOG_WARN("init single row getter failed", K(ret)); } else if (OB_FAIL(single_row_getter.get_next_row(new_row))) { if (ret == OB_ITER_END) { LOG_DEBUG("re-read old row not exist", K(ret), K(row)); ret = OB_SUCCESS; } else { LOG_WARN("get next row from single row getter failed", K(ret)); } } else if (OB_ISNULL(new_row)) { ret = OB_ERR_NULL_VALUE; LOG_WARN("get next row from single row null", K(ret)); } else if (new_row->get_count() != row.get_count()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get row from single row col count not equal.", K(ret), K(row.get_count()), K(new_row->get_count())); } else { LOG_DEBUG("get new row success.", K(row), KPC(new_row)); if (OB_FAIL(ob_write_row(run_ctx.lob_allocator_, *new_row, row))) { LOG_WARN("failed to deep copy new row", K(ret)); } else { run_ctx.is_old_row_valid_for_lob_ = true; } } } return ret; } int ObLSTabletService::delete_row_in_tablet( ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx, const ObNewRow &row) { int ret = OB_SUCCESS; const ObDMLBaseParam &dml_param = run_ctx.dml_param_; ObStoreCtx &ctx = run_ctx.store_ctx_; ObRelativeTable &relative_table = run_ctx.relative_table_; ObStoreRow &tbl_row = run_ctx.tbl_row_; ObStoreRow new_tbl_row; ObSEArray update_idx; // update_idx is a dummy param here tbl_row.flag_.set_flag(ObDmlFlag::DF_DELETE); tbl_row.row_val_ = row; if (OB_FAIL(check_old_row_legitimacy(tablet_handle, run_ctx, row))) { if (OB_ERR_DEFENSIVE_CHECK == ret) { dump_diag_info_for_old_row_loss(relative_table, ctx, tbl_row); } LOG_WARN("check old row legitimacy failed", K(row)); } else if (OB_FAIL(process_old_row_lob_col(tablet_handle, run_ctx, tbl_row))) { LOG_WARN("failed to process old row lob col", K(ret), K(tbl_row)); } else if (!dml_param.is_total_quantity_log_) { if (OB_FAIL(tablet_handle.get_obj()->insert_row_without_rowkey_check(relative_table, ctx, *run_ctx.col_descs_, tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to set row", K(ret), K(*run_ctx.col_descs_), K(tbl_row)); } } } else if (OB_FAIL(delete_lob_tablet_rows(run_ctx, tablet_handle, tbl_row, row))) { LOG_WARN("failed to delete lob rows.", K(ret), K(tbl_row), K(row)); } else { update_idx.reset(); // update_idx is a dummy param here new_tbl_row.reset(); new_tbl_row.flag_.set_flag(ObDmlFlag::DF_DELETE); new_tbl_row.row_val_ = tbl_row.row_val_; tbl_row.flag_.set_flag(ObDmlFlag::DF_UPDATE); if (OB_FAIL(tablet_handle.get_obj()->update_row(relative_table, ctx, *run_ctx.col_descs_, update_idx, tbl_row, new_tbl_row))) { if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) { LOG_WARN("failed to set row", K(ret), K(*run_ctx.col_descs_), K(tbl_row), K(new_tbl_row)); } } else { LOG_DEBUG("succeeded to del main table row", K(tbl_row), K(new_tbl_row)); } } return ret; } int ObLSTabletService::delete_lob_col( ObDMLRunningCtx &run_ctx, const ObColDesc &column, ObObj &obj, const ObObj &sql_obj, ObLobCommon *&lob_common, ObLobAccessParam &lob_param) { int ret = OB_SUCCESS; ObLobManager *lob_mngr = MTL(ObLobManager*); if (OB_ISNULL(lob_mngr)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]get lob manager instance failed.", K(ret));; } else if (!column.col_type_.is_lob_v2() || obj.is_nop_value() || obj.is_null() || !run_ctx.is_old_row_valid_for_lob_) { // do nothing } else { ObString data = obj.get_string(); ObString sql_data = sql_obj.get_string(); if (data.length() < sizeof(ObLobCommon)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]Invalid Lob data.", K(ret), K(obj), K(data)); } else { void *buf = run_ctx.lob_allocator_.alloc(data.length()); if (OB_ISNULL(buf)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("failed to deep copy lob data.", K(ret), K(data)); } else { MEMCPY(buf, data.ptr(), data.length()); lob_common = reinterpret_cast(buf); lob_param.tx_desc_ = run_ctx.store_ctx_.mvcc_acc_ctx_.tx_desc_; lob_param.snapshot_ = run_ctx.dml_param_.snapshot_; lob_param.tx_id_ = lob_param.tx_desc_->get_tx_id(); lob_param.sql_mode_ = run_ctx.dml_param_.sql_mode_; lob_param.is_total_quantity_log_ = run_ctx.dml_param_.is_total_quantity_log_; lob_param.ls_id_ = run_ctx.store_ctx_.ls_id_; lob_param.tablet_id_ = run_ctx.relative_table_.get_tablet_id(); lob_param.coll_type_ = column.col_type_.get_collation_type(); lob_param.allocator_ = &run_ctx.lob_allocator_; lob_param.lob_common_ = lob_common; lob_param.handle_size_ = data.length(); lob_param.byte_size_ = lob_param.lob_common_->get_byte_size(data.length()); lob_param.timeout_ = run_ctx.dml_param_.timeout_; lob_param.scan_backward_ = false; lob_param.offset_ = 0; lob_param.len_ = ObCharset::strlen_char(lob_param.coll_type_, sql_data.ptr(), sql_data.length()); if (lob_param.byte_size_ < 0) { ret = OB_ERR_UNEXPECTED; LOG_WARN("calc byte size is negative.", K(ret), K(data), K(lob_param)); } else if (OB_FAIL(lob_mngr->erase(lob_param))) { LOG_WARN("[STORAGE_LOB]lob erase failed.", K(ret), K(lob_param)); } } } } return ret; } int ObLSTabletService::delete_lob_tablet_rows( ObDMLRunningCtx &run_ctx, ObTabletHandle &data_tablet, ObStoreRow &tbl_row, const ObNewRow &row) { int ret = OB_SUCCESS; int64_t col_cnt = run_ctx.col_descs_->count(); if (tbl_row.row_val_.count_ != col_cnt) { ret = OB_ERR_UNEXPECTED; LOG_WARN("[STORAGE_LOB]Invliad row col cnt", K(col_cnt), K(tbl_row)); } else { ObLobCommon *lob_common = nullptr; for (int64_t i = 0; OB_SUCC(ret) && i < col_cnt; ++i) { const ObColDesc &column = run_ctx.col_descs_->at(i); ObObj &obj = tbl_row.row_val_.get_cell(i); const ObObj &sql_obj = row.get_cell(i); ObLobAccessParam lob_param; if (OB_FAIL(delete_lob_col(run_ctx, column, obj, sql_obj, lob_common, lob_param))) { LOG_WARN("[STORAGE_LOB]failed to erase lob col.", K(ret), K(i), K(tbl_row)); } } } return ret; } int ObLSTabletService::prepare_scan_table_param( ObTableScanParam ¶m, share::schema::ObMultiVersionSchemaService &schema_service) { int ret = OB_SUCCESS; if (NULL == param.table_param_ || OB_INVALID_ID == param.table_param_->get_table_id()) { void *buf = NULL; ObTableParam *table_param = NULL; ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = NULL; const uint64_t tenant_id = MTL_ID(); const bool check_formal = param.index_id_ > OB_MAX_CORE_TABLE_ID; if (OB_FAIL(schema_service.get_tenant_schema_guard(tenant_id, schema_guard))) { LOG_WARN("failed to get schema manager", K(ret), K(tenant_id)); } else if (check_formal && OB_FAIL(schema_guard.check_formal_guard())) { LOG_WARN("Fail to check formal schema, ", K(param.index_id_), K(ret)); } else if (OB_FAIL(schema_guard.get_table_schema(tenant_id, param.index_id_, table_schema))) { LOG_WARN("Fail to get table schema", K(param.index_id_), K(ret)); } else if (NULL == table_schema) { ret = OB_TABLE_NOT_EXIST; LOG_WARN("table not exist", K(param.index_id_), K(ret)); } else { if (NULL == (buf = param.allocator_->alloc(sizeof(ObTableParam)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Fail to allocate memory, ", K(ret)); } else { //TODO table param should not generate twice!!!! table_param = new (buf) ObTableParam(*param.allocator_); if (OB_FAIL(table_param->convert(*table_schema, param.column_ids_))) { LOG_WARN("Fail to convert table param, ", K(ret)); } else { param.table_param_ = table_param; } } } } return ret; } void ObLSTabletService::dump_diag_info_for_old_row_loss( ObRelativeTable &data_table, ObStoreCtx &store_ctx, const ObStoreRow &tbl_row) { int ret = OB_SUCCESS; ObArenaAllocator allocator; ObTableAccessParam access_param; ObTableAccessContext access_ctx; ObSEArray out_col_pros; ObStoreRowkey rowkey; ObDatumRowkey datum_rowkey; ObDatumRowkeyHelper rowkey_helper(allocator); const int64_t schema_rowkey_cnt = data_table.get_rowkey_column_num(); ObTableStoreIterator &table_iter = data_table.tablet_iter_.table_iter_; const ObTableReadInfo &full_read_info = data_table.tablet_iter_.tablet_handle_.get_obj()->get_full_read_info(); ObQueryFlag query_flag(ObQueryFlag::Forward, false, /*is daily merge scan*/ false, /*is read multiple macro block*/ false, /*sys task scan, read one macro block in single io*/ false /*is full row scan?*/, false, false); common::ObVersionRange trans_version_rang; trans_version_rang.base_version_ = 0; trans_version_rang.multi_version_start_ = 0; trans_version_rang.snapshot_version_ = store_ctx.mvcc_acc_ctx_.get_snapshot_version(); for (int64_t i = 0; OB_SUCC(ret) && i < full_read_info.get_request_count(); i++) { if (OB_FAIL(out_col_pros.push_back(i))) { STORAGE_LOG(WARN, "Failed to push back col project", K(ret), K(i)); } } if (OB_FAIL(ret)) { } else if (OB_FAIL(rowkey.assign(tbl_row.row_val_.cells_, schema_rowkey_cnt))) { LOG_WARN("Failed to assign rowkey", K(ret)); } else if (OB_FAIL(rowkey_helper.convert_datum_rowkey(rowkey.get_rowkey(), datum_rowkey))) { STORAGE_LOG(WARN, "Failed to transfer datum rowkey", K(ret), K(rowkey)); } else if (OB_FAIL(access_ctx.init(query_flag, store_ctx, allocator, trans_version_rang))) { LOG_WARN("Fail to init access ctx", K(ret)); } else { access_param.is_inited_ = true; access_param.iter_param_.table_id_ = data_table.get_table_id(); access_param.iter_param_.tablet_id_ = data_table.tablet_iter_.tablet_handle_.get_obj()->get_tablet_meta().tablet_id_; access_param.iter_param_.read_info_ = &full_read_info; access_param.iter_param_.full_read_info_ = &full_read_info; access_param.iter_param_.out_cols_project_ = &out_col_pros;; access_param.iter_param_.tablet_handle_ = data_table.tablet_iter_.tablet_handle_; ObStoreRowIterator *getter = nullptr; ObITable *table = nullptr; const ObDatumRow *row = nullptr; FLOG_INFO("Try to find the specified rowkey within all the sstable", K(tbl_row), K(table_iter)); FLOG_INFO("Prepare the diag env to dump the rows", K(store_ctx), K(rowkey), K(datum_rowkey), K(access_ctx.trans_version_range_)); table_iter.resume(); while (OB_SUCC(ret)) { if (OB_FAIL(table_iter.get_next(table))) { if (OB_ITER_END != ret) { LOG_WARN("failed to get next tables", K(ret)); } } else if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table must not be null", K(ret), K(table_iter)); } else if (OB_FAIL(table->get(access_param.iter_param_, access_ctx, datum_rowkey, getter))) { LOG_WARN("Failed to get param", K(ret), KPC(table)); } else if (OB_FAIL(getter->get_next_row(row))) { LOG_WARN("Failed to get next row", K(ret), KPC(table)); } else if (row->row_flag_.is_not_exist() || row->row_flag_.is_delete()){ FLOG_INFO("Cannot found rowkey in the table", KPC(row), KPC(table)); } else if (table->is_sstable()) { FLOG_INFO("Found rowkey in the sstable", KPC(row), KPC(reinterpret_cast(table))); } else { FLOG_INFO("Found rowkey in the memtable", KPC(row), KPC(reinterpret_cast(table))); } // ignore error in the loop if (OB_FAIL(ret) && OB_ITER_END != ret) { ret = OB_SUCCESS; } if (OB_NOT_NULL(getter)) { getter->~ObStoreRowIterator(); getter = nullptr; } } if (OB_ITER_END == ret) { ret = OB_SUCCESS; } if (OB_SUCC(ret)) { FLOG_INFO("prepare to use single merge to find row", K(rowkey), K(datum_rowkey), K(access_param)); ObSingleMerge *get_merge = nullptr; ObGetTableParam get_table_param; ObDatumRow *row = nullptr; get_table_param.tablet_iter_ = data_table.tablet_iter_; void *buf = nullptr; if (OB_ISNULL(buf = allocator.alloc(sizeof(ObSingleMerge)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to alloc memory for single merge", K(ret)); } else if (FALSE_IT(get_merge = new(buf)ObSingleMerge())) { } else if (OB_FAIL(get_merge->init(access_param, access_ctx, get_table_param))) { LOG_WARN("Failed to init single get merge", K(ret)); } else if (OB_FAIL(get_merge->open(datum_rowkey))) { LOG_WARN("Failed to open single merge", K(ret)); } else if (FALSE_IT(get_merge->disable_fill_default())) { } else { while (OB_SUCC(get_merge->get_next_row(row))) { FLOG_INFO("Found one row for the rowkey", KPC(row)); } FLOG_INFO("Finish to find rowkey with single merge", K(ret), K(rowkey), K(datum_rowkey)); } if (OB_NOT_NULL(get_merge)) { get_merge->~ObSingleMerge(); get_merge = nullptr; } } #ifdef ENABLE_DEBUG_LOG // print single row check info if (store_ctx.mvcc_acc_ctx_.tx_id_.is_valid()) { transaction::ObTransService *trx = MTL(transaction::ObTransService *); if (OB_NOT_NULL(trx) && NULL != trx->get_defensive_check_mgr()) { (void)trx->get_defensive_check_mgr()->dump(store_ctx.mvcc_acc_ctx_.tx_id_); } } #endif } } int ObLSTabletService::prepare_dml_running_ctx( const common::ObIArray *column_ids, const common::ObIArray *upd_col_ids, ObTabletHandle &tablet_handle, ObDMLRunningCtx &run_ctx) { int ret = OB_SUCCESS; if (OB_FAIL(run_ctx.init( column_ids, upd_col_ids, MTL(ObTenantSchemaService*)->get_schema_service(), tablet_handle))) { LOG_WARN("failed to init run ctx", K(ret)); } return ret; } int ObLSTabletService::get_ls_min_end_log_ts_in_old_tablets(int64_t &end_log_ts) { int ret = OB_SUCCESS; const ObLSID &ls_id = ls_->get_ls_id(); ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_FAIL(t3m->get_min_end_log_ts_for_ls(ls_id, end_log_ts))) { LOG_WARN("fail to get ls min end log ts in all of old tablets", K(ret), K(ls_id)); } return ret; } int ObLSTabletService::get_multi_ranges_cost( const ObTabletID &tablet_id, const common::ObIArray &ranges, int64_t &total_size) { int ret = OB_SUCCESS; ObTabletTableIterator iter; const int64_t max_snapshot_version = INT64_MAX; AllowToReadMgr::AllowToReadInfo read_info; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (FALSE_IT(allow_to_read_mgr_.load_allow_to_read_info(read_info))) { } else if (!read_info.allow_to_read()) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("ls is not allow to read", K(ret), KPC(ls_)); } else if (OB_FAIL(get_read_tables(tablet_id, max_snapshot_version, iter))) { LOG_WARN("fail to get all read tables", K(ret), K(tablet_id), K(max_snapshot_version)); } else { ObPartitionMultiRangeSpliter spliter; if (OB_FAIL(spliter.get_multi_range_size( ranges, iter.tablet_handle_.get_obj()->get_index_read_info(), iter.table_iter_, total_size))) { LOG_WARN("fail to get multi ranges cost", K(ret), K(ranges)); } } return ret; } int ObLSTabletService::split_multi_ranges( const ObTabletID &tablet_id, const ObIArray &ranges, const int64_t expected_task_count, common::ObIAllocator &allocator, ObArrayArray &multi_range_split_array) { int ret = OB_SUCCESS; ObTabletTableIterator iter; const int64_t max_snapshot_version = INT64_MAX; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(get_read_tables(tablet_id, max_snapshot_version, iter))) { LOG_WARN("fail to get all read tables", K(ret), K(tablet_id), K(max_snapshot_version)); } else { ObPartitionMultiRangeSpliter spliter; if (OB_FAIL(spliter.get_split_multi_ranges( ranges, expected_task_count, iter.tablet_handle_.get_obj()->get_index_read_info(), iter.table_iter_, allocator, multi_range_split_array))) { LOG_WARN("fail to get splitted ranges", K(ret), K(ranges), K(expected_task_count)); } } return ret; } int ObLSTabletService::estimate_row_count( const ObTableScanParam ¶m, const ObTableScanRange &scan_range, ObIArray &est_records, int64_t &logical_row_count, int64_t &physical_row_count) { int ret = OB_SUCCESS; ObPartitionEst batch_est; ObTabletTableIterator tablet_iter; common::ObSEArray tables; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!param.is_estimate_valid() || !scan_range.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(param), K(scan_range)); } else if (scan_range.is_empty()) { } else { const int64_t snapshot_version = -1 == param.frozen_version_ ? GET_BATCH_ROWS_READ_SNAPSHOT_VERSION : param.frozen_version_; if (OB_FAIL(get_read_tables(param.tablet_id_, snapshot_version, tablet_iter, false))) { LOG_WARN("failed to get tablet_iter", K(ret), K(snapshot_version), K(param)); } else { while(OB_SUCC(ret)) { ObITable *table = nullptr; if (OB_FAIL(tablet_iter.table_iter_.get_next(table))) { if (OB_ITER_END != ret) { LOG_WARN("failed to get next table", K(ret), K(tablet_iter.table_iter_)); } else { ret = OB_SUCCESS; break; } } else if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("table shoud not be null", K(ret), K(tablet_iter.table_iter_)); } else if (table->is_sstable() && static_cast(table)->get_meta().is_empty()) { LOG_DEBUG("cur sstable is empty", K(ret), K(*table)); continue; } else if (OB_FAIL(tables.push_back(table))) { LOG_WARN("failed to push back table", K(ret), K(tables)); } } } if (OB_SUCC(ret) && tables.count() > 0) { ObTableEstimateBaseInput base_input(param.scan_flag_, param.index_id_, tables, tablet_iter.tablet_handle_); if (scan_range.is_get()) { if (OB_FAIL(ObTableEstimator::estimate_row_count_for_get(base_input, scan_range.get_rowkeys(), batch_est))) { LOG_WARN("failed to estimate row count", K(ret), K(param), K(scan_range)); } } else if (OB_FAIL(ObTableEstimator::estimate_row_count_for_scan(base_input, scan_range.get_ranges(), batch_est, est_records))) { LOG_WARN("failed to estimate row count", K(ret), K(param), K(scan_range)); } } } if (OB_SUCC(ret)) { logical_row_count = batch_est.logical_row_count_; physical_row_count = batch_est.physical_row_count_; } LOG_DEBUG("estimate result", K(ret), K(batch_est), K(est_records)); return ret; } int ObLSTabletService::estimate_block_count( const common::ObTabletID &tablet_id, int64_t ¯o_block_count, int64_t µ_block_count) { int ret = OB_SUCCESS; macro_block_count = 0; micro_block_count = 0; ObTabletTableIterator tablet_iter; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_FAIL(get_read_tables(tablet_id, INT64_MAX, tablet_iter, false/*allow_no_ready_read*/))) { LOG_WARN("failed to get read tables", K(ret)); } ObITable *table = nullptr; ObSSTable *sstable = nullptr; int64_t total_sample_table_cnt = 2; int64_t sample_table_cnt = 0; int64_t sampled_table_row_cnt = 0; int64_t total_row_count = 0; while (OB_SUCC(ret)) { if (OB_FAIL(tablet_iter.table_iter_.get_next(table))) { if (OB_ITER_END != ret) { LOG_WARN("failed to get next tables", K(ret)); } else { ret = OB_SUCCESS; break; } } else if (OB_ISNULL(table)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null table", K(ret), K(tablet_iter.table_iter_)); } else if (!table->is_sstable()) { break; } else { sstable = static_cast(table); macro_block_count += sstable->get_meta().get_basic_meta().get_data_macro_block_count(); micro_block_count += sstable->get_meta().get_basic_meta().get_data_micro_block_count(); total_row_count += sstable->get_meta().get_row_count(); if (sample_table_cnt++ < total_sample_table_cnt) { sampled_table_row_cnt += sstable->get_meta().get_row_count(); } } } return ret; } int ObLSTabletService::get_tx_data_memtable_mgr(ObMemtableMgrHandle &mgr_handle) { mgr_handle.reset(); return mgr_handle.set_memtable_mgr(&tx_data_memtable_mgr_); } int ObLSTabletService::get_tx_ctx_memtable_mgr(ObMemtableMgrHandle &mgr_handle) { mgr_handle.reset(); return mgr_handle.set_memtable_mgr(&tx_ctx_memtable_mgr_); } int ObLSTabletService::get_lock_memtable_mgr(ObMemtableMgrHandle &mgr_handle) { mgr_handle.reset(); return mgr_handle.set_memtable_mgr(&lock_memtable_mgr_); } int ObLSTabletService::get_bf_optimal_prefix(int64_t &prefix) { int ret = OB_SUCCESS; prefix = 0; return ret; } int ObLSTabletService::on_prepare_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.prepare_create_tablets(arg, trans_flags))) { LOG_WARN("failed to prepare create tablets", K(ret), K(arg), K(trans_flags)); } } return ret; } int ObLSTabletService::on_redo_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.redo_create_tablets(arg, trans_flags))) { LOG_WARN("failed to redo create tablets", K(ret), K(arg), K(trans_flags)); } } return ret; } int ObLSTabletService::on_commit_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { common::ObSArray tablet_id_array; ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.commit_create_tablets(arg, trans_flags, tablet_id_array))) { LOG_WARN("failed to commit create tablets", K(ret), K(trans_flags)); } else { report_tablet_to_rs(tablet_id_array); } } return ret; } int ObLSTabletService::on_tx_end_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.tx_end_create_tablets(arg, trans_flags))) { LOG_WARN("failed to tx end create tablets", K(ret), K(trans_flags)); } } return ret; } int ObLSTabletService::on_abort_create_tablets( const obrpc::ObBatchCreateTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.abort_create_tablets(arg, trans_flags))) { LOG_WARN("failed to abort create tablets", K(ret), K(trans_flags)); } } return ret; } int ObLSTabletService::on_prepare_remove_tablets( const obrpc::ObBatchRemoveTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.prepare_remove_tablets(arg, trans_flags))) { LOG_WARN("failed to prepare remove tablets", K(ret), K(arg), K(trans_flags)); } } return ret; } int ObLSTabletService::on_redo_remove_tablets( const obrpc::ObBatchRemoveTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.redo_remove_tablets(arg, trans_flags))) { LOG_WARN("failed to redo remove tablets", K(ret), K(arg), K(trans_flags)); } } return ret; } int ObLSTabletService::on_commit_remove_tablets( const obrpc::ObBatchRemoveTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.commit_remove_tablets(arg, trans_flags))) { LOG_WARN("failed to commit remove tablets", K(ret), K(trans_flags)); } } return ret; } int ObLSTabletService::on_tx_end_remove_tablets( const obrpc::ObBatchRemoveTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.tx_end_remove_tablets(arg, trans_flags))) { LOG_WARN("failed to tx end remove tablets", K(ret), K(trans_flags)); } } return ret; } int ObLSTabletService::on_abort_remove_tablets( const obrpc::ObBatchRemoveTabletArg &arg, const transaction::ObMulSourceDataNotifyArg &trans_flags) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else { ObTabletCreateDeleteHelper helper(*ls_, tablet_id_set_); if (OB_FAIL(helper.abort_remove_tablets(arg, trans_flags))) { LOG_WARN("failed to abort remove tablets", K(ret), K(trans_flags)); } } return ret; } int ObLSTabletService::create_ls_inner_tablet( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id, const int64_t memstore_version, const int64_t frozen_timestamp, const share::schema::ObTableSchema &table_schema, const lib::Worker::CompatMode &compat_mode, const int64_t create_scn) { int ret = OB_SUCCESS; const int64_t data_version = ObVersion(memstore_version - 1, 0); const int64_t snapshot_version = frozen_timestamp; const common::ObSArray empty_index_tablet_array; bool b_exist = false; ObTabletHandle tablet_handle; common::ObTabletID empty_tablet_id; ObMetaDiskAddr disk_addr; // no use if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!ls_id.is_valid()) || OB_UNLIKELY(!tablet_id.is_valid()) || OB_UNLIKELY(OB_INVALID_VERSION == memstore_version) || OB_UNLIKELY(OB_INVALID_TIMESTAMP == frozen_timestamp) || OB_UNLIKELY(!table_schema.is_valid()) || OB_UNLIKELY(lib::Worker::CompatMode::INVALID == compat_mode) /*|| OB_UNLIKELY(create_scn <= OB_INVALID_TIMESTAMP)*/) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ls_id), K(tablet_id), K(frozen_timestamp), K(table_schema), K(compat_mode), K(create_scn)); } else if (OB_UNLIKELY(ls_id != ls_->get_ls_id())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls id is unexpected", K(ret), "arg_ls_id", ls_id, "ls_id", ls_->get_ls_id()); } else if (OB_UNLIKELY(!tablet_id.is_ls_inner_tablet())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet id is not ls inner tablet", K(ret), K(tablet_id)); } else if (OB_FAIL(has_tablet(ls_id, tablet_id, b_exist))) { LOG_WARN("failed to check tablet existence", K(ret), K(ls_id), K(tablet_id)); } else if (OB_UNLIKELY(b_exist)) { ret = OB_TABLET_EXIST; LOG_WARN("tablet already exists", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(do_create_tablet(ls_id, tablet_id, tablet_id/*data_tablet_id*/, empty_index_tablet_array, create_scn, snapshot_version, table_schema, compat_mode, empty_tablet_id, empty_tablet_id, tablet_handle))) { LOG_WARN("failed to do create tablet", K(ret), K(ls_id), K(tablet_id), K(create_scn), K(snapshot_version), K(table_schema), K(compat_mode)); } else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(tablet_handle, disk_addr))) { LOG_WARN("failed to write create tablet slog", K(ret), K(tablet_handle)); } else if (OB_FAIL(tablet_id_set_.set(tablet_id))) { LOG_ERROR("fail to set tablet id set", K(ret), K(tablet_id), K(lbt())); ob_usleep(1000 * 1000); ob_abort(); } return ret; } int ObLSTabletService::remove_ls_inner_tablet( const share::ObLSID &ls_id, const common::ObTabletID &tablet_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); } else if (OB_UNLIKELY(!ls_id.is_valid()) || OB_UNLIKELY(!tablet_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(ls_id), K(tablet_id)); } else if (OB_UNLIKELY(ls_id != ls_->get_ls_id())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ls id is unexpected", K(ret), "arg_ls_id", ls_id, "ls_id", ls_->get_ls_id()); } else if (OB_FAIL(do_remove_tablet(ls_id, tablet_id))) { LOG_WARN("failed to remove tablet", K(ret), K(ls_id), K(tablet_id)); } return ret; } int ObLSTabletService::build_tablet_iter(ObLSTabletIterator &iter) { int ret = common::OB_SUCCESS; GetAllTabletIDOperator op(iter.tablet_ids_); iter.ls_tablet_service_ = this; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "not inited", K(ret), K_(is_inited)); } else if (OB_FAIL(tablet_id_set_.foreach(op))) { STORAGE_LOG(WARN, "fail to get all tablet ids from set", K(ret)); } else if (OB_UNLIKELY(!iter.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("iter is invalid", K(ret), K(iter)); } if (OB_FAIL(ret)) { iter.reset(); } return ret; } int ObLSTabletService::build_tablet_iter(ObHALSTabletIDIterator &iter) { int ret = common::OB_SUCCESS; GetAllTabletIDOperator op(iter.tablet_ids_); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "not inited", K(ret), K_(is_inited)); } else if (OB_FAIL(tablet_id_set_.foreach(op))) { STORAGE_LOG(WARN, "fail to get all tablet ids from set", K(ret)); } else if (OB_UNLIKELY(!iter.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("iter is invalid", K(ret), K(iter)); } if (OB_FAIL(ret)) { iter.reset(); } return ret; } int ObLSTabletService::set_allow_to_read_(ObLS *ls) { int ret = OB_SUCCESS; ObMigrationStatus migration_status = ObMigrationStatus::OB_MIGRATION_STATUS_MAX; share::ObLSRestoreStatus restore_status; if (OB_ISNULL(ls)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("set allow to read get invalid argument", K(ret), KP(ls)); } else { if (OB_FAIL(ls->get_migration_and_restore_status(migration_status, restore_status))) { LOG_WARN("failed to get ls migration and restore status", K(ret), KPC(ls)); } else if (ObMigrationStatus::OB_MIGRATION_STATUS_NONE != migration_status && ObLSRestoreStatus::RESTORE_MAJOR_DATA != restore_status && ObLSRestoreStatus::RESTORE_NONE != restore_status) { allow_to_read_mgr_.disable_to_read(); FLOG_INFO("set ls do not allow to read", KPC(ls), K(migration_status), K(restore_status)); } else { allow_to_read_mgr_.enable_to_read(); } } return ret; } void ObLSTabletService::enable_to_read() { allow_to_read_mgr_.enable_to_read(); } void ObLSTabletService::disable_to_read() { allow_to_read_mgr_.disable_to_read(); } ObLSTabletService::DeleteTabletInfo::DeleteTabletInfo() : delete_data_tablet_(false), old_data_tablet_handle_(), new_data_tablet_handle_(), delete_index_tablet_ids_() { } ObLSTabletService::DeleteTabletInfo::DeleteTabletInfo(const ObLSTabletService::DeleteTabletInfo &other) : delete_data_tablet_(other.delete_data_tablet_), old_data_tablet_handle_(other.old_data_tablet_handle_), new_data_tablet_handle_(other.new_data_tablet_handle_), delete_index_tablet_ids_(other.delete_index_tablet_ids_) { } ObLSTabletService::DeleteTabletInfo &ObLSTabletService::DeleteTabletInfo::operator=( const ObLSTabletService::DeleteTabletInfo &other) { if (this != &other) { delete_data_tablet_ = other.delete_data_tablet_; old_data_tablet_handle_ = other.old_data_tablet_handle_; new_data_tablet_handle_ = other.new_data_tablet_handle_; delete_index_tablet_ids_ = other.delete_index_tablet_ids_; } return *this; } ObLSTabletService::HashMapTabletDeleteFunctor::HashMapTabletDeleteFunctor(ObLS *ls) : ls_(ls), slog_params_() { } ObLSTabletService::HashMapTabletDeleteFunctor::~HashMapTabletDeleteFunctor() { destroy(); } void ObLSTabletService::HashMapTabletDeleteFunctor::destroy() { for (int64_t i = 0; i < slog_params_.count(); i++) { ObIBaseStorageLogEntry *data = slog_params_.at(i).data_; if (nullptr != data) { OB_DELETE(ObIBaseStorageLogEntry, "", data); data = nullptr; } } } bool ObLSTabletService::HashMapTabletDeleteFunctor::operator()( const common::ObTabletID &tablet_id, ObLSTabletService::DeleteTabletInfo &info) { bool b_ret = true; int ret = OB_SUCCESS; if (info.delete_data_tablet_ && OB_FAIL(handle_remove_data_tablet(info.old_data_tablet_handle_, info.delete_index_tablet_ids_))) { // delete data tablet and all index tablets LOG_WARN("failed to handle remove data tablet", K(ret)); } else if (!info.delete_data_tablet_) { ret = OB_INVALID_ARGUMENT; LOG_WARN("failed to handle alter index tablet", K(ret)); } if (OB_FAIL(ret)) { b_ret = false; } return b_ret; } int ObLSTabletService::HashMapTabletDeleteFunctor::handle_remove_data_tablet( ObTabletHandle &data_tablet_handle, const common::ObIArray &delete_index_tablet_ids) { int ret = OB_SUCCESS; const int32_t delete_tablet_cmd = ObIRedoModule::gen_cmd(ObRedoLogMainType::OB_REDO_LOG_TENANT_STORAGE, ObRedoLogSubType::OB_REDO_LOG_DELETE_TABLET); const share::ObLSID &ls_id = data_tablet_handle.get_obj()->get_tablet_meta().ls_id_; const common::ObTabletID &data_tablet_id = data_tablet_handle.get_obj()->get_tablet_meta().tablet_id_; // delete local index tablet for (int64_t i = 0; OB_SUCC(ret) && i < delete_index_tablet_ids.count(); ++i) { const common::ObTabletID &index_tablet_id = delete_index_tablet_ids.at(i); ObDeleteTabletLog *slog_entry = OB_NEW(ObDeleteTabletLog, "delete tablet", ls_id, index_tablet_id); ObStorageLogParam slog_param; slog_param.cmd_ = delete_tablet_cmd; slog_param.data_ = slog_entry; if (OB_FAIL(slog_params_.push_back(slog_param))) { LOG_WARN("failed to push back index tablet slog param", K(ret), K(slog_param)); } } // delete data tablet if (OB_SUCC(ret)) { ObStorageLogParam slog_param; slog_param.cmd_ = delete_tablet_cmd; slog_param.data_ = OB_NEW(ObDeleteTabletLog, "delete tablet", ls_id, data_tablet_id); if (OB_FAIL(slog_params_.push_back(slog_param))) { LOG_WARN("failed to push back data tablet slog param", K(ret), K(slog_param)); } } return ret; } bool ObLSTabletService::HashMapTabletGetFunctor::operator()( const common::ObTabletID &tablet_id, ObLSTabletService::DeleteTabletInfo &info) { bool b_ret = true; int ret = OB_SUCCESS; common::ObIArray &index_tablet_ids = info.delete_index_tablet_ids_; // always push back data tablet id if (OB_FAIL(tablet_id_hash_array_.push_back(tablet_id.hash()))) { LOG_WARN("failed to push back data tablet id", K(ret), K(tablet_id)); } for (int64_t i = 0; OB_SUCC(ret) && i < index_tablet_ids.count(); ++i) { const common::ObTabletID &index_tablet_id = index_tablet_ids.at(i); if (OB_FAIL(tablet_id_hash_array_.push_back(tablet_id.hash()))) { LOG_WARN("failed to push back index tablet id", K(ret), K(tablet_id)); } } if (OB_FAIL(ret)) { b_ret = false; } return b_ret; } int ObLSTabletService::GetAllTabletIDOperator::operator()(const common::ObTabletID &tablet_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!tablet_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tablet_id)); } else if (OB_FAIL(tablet_ids_.push_back(tablet_id))) { LOG_WARN("failed to push back tablet id", K(ret), K(tablet_id)); } return ret; } int ObLSTabletService::DestroyMemtableAndMemberOperator::operator()(const common::ObTabletID &tablet_id) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; ObTabletHandle handle; const uint64_t tenant_id = MTL_ID(); cur_tablet_id_ = tablet_id; if (OB_UNLIKELY(!tablet_id.is_valid()) || OB_ISNULL(tablet_svr_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), K(tablet_id), K(tablet_svr_)); } else if (OB_FAIL(tablet_svr_->get_tablet(tablet_id, handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { if (OB_TABLET_NOT_EXIST == ret) { LOG_WARN("failed to get tablet, skip clean memtable", K(ret), K(tablet_id)); ret = OB_SUCCESS; } else { LOG_ERROR("failed to get tablet", K(ret), K(tablet_id)); } } else if (OB_FAIL(handle.get_obj()->release_memtables())) { LOG_WARN("failed to release memtables", K(tenant_id), K(tablet_id)); } else if (!tablet_id.is_ls_inner_tablet() && OB_FAIL(handle.get_obj()->reset_storage_related_member())) { LOG_WARN("failed to destroy storage related member", K(ret), K(tenant_id), K(tablet_id)); } return ret; } void ObLSTabletService::AllowToReadMgr::disable_to_read() { AllowToReadInfo read_info; AllowToReadInfo next_read_info; do { LOAD128(read_info, &read_info_); if (!read_info.allow_to_read()) { break; } else { next_read_info.info_.allow_to_read_ = 0; next_read_info.info_.seq_ = read_info.info_.seq_ + 1; } } while (!CAS128(&read_info_, read_info, next_read_info)); } void ObLSTabletService::AllowToReadMgr::enable_to_read() { AllowToReadInfo read_info; AllowToReadInfo next_read_info; do { LOAD128(read_info, &read_info_); if (read_info.allow_to_read()) { break; } else { next_read_info.info_.allow_to_read_ = 1; next_read_info.info_.seq_ = read_info.info_.seq_; } } while (!CAS128(&read_info_, read_info, next_read_info)); } void ObLSTabletService::AllowToReadMgr::load_allow_to_read_info( AllowToReadInfo &read_info) { LOAD128(read_info, &read_info_); } void ObLSTabletService::AllowToReadMgr::check_read_info_same( const AllowToReadInfo &read_info, bool &is_same) { AllowToReadInfo current_read_info; LOAD128(current_read_info, &read_info_); is_same = read_info == current_read_info; } } // namespace storage } // namespace oceanbase