提交 ddb6728b 编写于 作者: H hiddenbomb 提交者: wangzelin.wzl

[CP] freeze active memtable before create new version tablet

上级 5697e0f2
......@@ -1019,7 +1019,7 @@ int ObLSTabletService::migrate_create_tablet(
int tmp_ret = OB_SUCCESS;
ObTenantMetaMemMgr *t3m = MTL(ObTenantMetaMemMgr*);
if (OB_TMP_FAIL(tablet_id_set_.erase(tablet_id))) {
if (OB_HASH_NOT_EXIST != ret) {
if (OB_HASH_NOT_EXIST != tmp_ret) {
LOG_ERROR("fail to erase tablet id from set", K(tmp_ret), K(tablet_id));
}
}
......@@ -2907,7 +2907,35 @@ int ObLSTabletService::build_ha_tablet_new_table_store(
//it is necessary to make the left side of the newly created memtable start from clog_checkpinoit_ts
//the new memtable can be stuck during the creation of the tablet, it is safe here
if (OB_FAIL(old_tablet->get_tx_data(tx_data))) {
// try tablet freeze
if (!tablet_id.is_ls_inner_tablet()) {
if (OB_FAIL(old_tablet->set_memtable_clog_checkpoint_ts(param.tablet_meta_))) {
LOG_WARN("failed to set memtable clog checkpoint ts", K(ret), KPC(old_tablet), K(param));
} else if (nullptr != param.tablet_meta_
&& old_tablet->get_clog_checkpoint_ts() < param.tablet_meta_->clog_checkpoint_ts_) {
if (OB_FAIL(freezer->tablet_freeze_for_replace_tablet_meta(tablet_id, imemtable))) {
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to freeze tablet", K(ret), K(tablet_id), KPC(old_tablet));
}
} else {
is_tablet_freeze = true;
}
}
if (OB_FAIL(ret)) {
} else if (!is_tablet_freeze) {
} else if (nullptr != imemtable) {
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(imemtable);
if (OB_FAIL(memtable->resolve_right_boundary_for_migration())) {
LOG_WARN("failed to resolve right boundary", K(ret), K(tablet_id));
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(old_tablet->get_tx_data(tx_data))) {
LOG_WARN("failed to get tx data from old tablet", K(ret), K(tablet_id));
} else if (OB_FAIL(old_tablet->get_ddl_data(ddl_data))) {
LOG_WARN("failed to get tx data from old tablet", K(ret), K(tablet_id));
......@@ -2918,23 +2946,31 @@ int ObLSTabletService::build_ha_tablet_new_table_store(
} else if (FALSE_IT(new_tablet = new_tablet_handle.get_obj())) {
} else if (OB_FAIL(new_tablet->init(param, *old_tablet, tx_data, ddl_data, autoinc_seq))) {
LOG_WARN("failed to init tablet", K(ret), KPC(old_tablet));
} else if (tablet_id.is_ls_inner_tablet()) {
//do nothing
} else if (old_tablet->get_tablet_meta().clog_checkpoint_ts_ < new_tablet->get_tablet_meta().clog_checkpoint_ts_) {
if (OB_FAIL(freezer->tablet_freeze_for_replace_tablet_meta(tablet_id, imemtable))) {
if (OB_ENTRY_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to freeze tablet", K(ret), K(tablet_id), KPC(old_tablet), KPC(new_tablet));
}
} else {
common::ObSArray<ObITable*> memtables;
const int64_t clog_checkpoint_ts = new_tablet->get_clog_checkpoint_ts();
const int64_t snapshot_version = new_tablet->get_snapshot_version();
if (OB_FAIL(new_tablet->get_memtables(memtables, true/*need_active*/))) {
LOG_WARN("failed to get memtables", K(ret), K(key));
} else {
is_tablet_freeze = true;
for (int64_t i = 0; OB_SUCC(ret) && i < memtables.count(); ++i) {
const memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(memtables.at(i));
if (OB_UNLIKELY(memtable->get_end_log_ts() < clog_checkpoint_ts)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable end log ts is smaller than clog checkpoint ts", K(ret),
"end_log_ts", memtable->get_end_log_ts(), K(clog_checkpoint_ts));
} else if (memtable->get_end_log_ts() == clog_checkpoint_ts) {
if (memtable->get_snapshot_version() <= snapshot_version) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable snapshot version is no bigger than tablet snapshot version", K(ret),
"memtable_snapshot_version", memtable->get_snapshot_version(), K(snapshot_version));
}
}
}
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(new_tablet->set_memtable_clog_checkpoint_ts(param.tablet_meta_))) {
LOG_WARN("failed to set memtable clog checkpoint ts", K(ret), KPC(old_tablet), KPC(new_tablet), K(param));
} else if (OB_FAIL(ObTabletSlogHelper::write_create_tablet_slog(new_tablet_handle, disk_addr))) {
LOG_WARN("fail to write update tablet slog", K(ret), K(new_tablet_handle), K(disk_addr));
} else if (OB_FAIL(t3m->compare_and_swap_tablet(key, disk_addr, old_tablet_handle, new_tablet_handle))) {
......
......@@ -545,6 +545,7 @@ private:
const ObTabletTxMultiSourceDataUnit *&tx_data,
const ObTabletBindingInfo *&binding_info,
const share::ObTabletAutoincSeq *&auto_inc_seq);
static int build_create_sstable_param_for_migration(
const blocksstable::ObMigrationSSTableParam &migrate_sstable_param,
ObTabletCreateSSTableParam &create_sstable_param);
......
......@@ -1830,6 +1830,38 @@ int ObMemtable::resolve_right_boundary()
return ret;
}
int ObMemtable::resolve_right_boundary_for_migration()
{
bool bool_ret = false;
int ret = OB_SUCCESS;
share::ObLSID ls_id = freezer_->get_ls_id();
int64_t start_time = ObTimeUtility::current_time();
do {
bool_ret = is_frozen_memtable() && 0 == get_write_ref();
if (bool_ret) {
if (OB_FAIL(resolve_snapshot_version_())) {
TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id));
} else if (OB_FAIL(resolve_max_end_log_ts_())) {
TRANS_LOG(WARN, "fail to resolve snapshot version", K(ret), KPC(this), K(ls_id));
} else {
resolve_right_boundary();
TRANS_LOG(INFO, "resolve_right_boundary_for_migration", K(ls_id), KPC(this));
}
} else {
const int64_t cost_time = ObTimeUtility::current_time() - start_time;
if (cost_time > 5 * 1000 * 1000) {
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) {
TRANS_LOG(WARN, "cannot resolve_right_boundary_for_migration", K(ret), KPC(this), K(ls_id));
}
}
ob_usleep(100);
}
} while (!bool_ret);
return ret;
}
void ObMemtable::resolve_left_boundary(int64_t end_log_ts)
{
set_start_log_ts(end_log_ts);
......
......@@ -434,6 +434,7 @@ public:
int get_tx_table_guard(storage::ObTxTableGuard &tx_table_guard);
int set_migration_clog_checkpoint_ts(const int64_t clog_checkpoint_ts);
int64_t get_migration_clog_checkpoint_ts() { return ATOMIC_LOAD(&migration_clog_checkpoint_ts_); }
int resolve_right_boundary_for_migration();
/* multi source data operations */
virtual int get_multi_source_data_unit(ObIMultiSourceDataUnit *multi_source_data_unit, ObIAllocator *allocator);
......@@ -453,7 +454,7 @@ public:
K_(freeze_clock), K_(max_schema_version), K_(write_ref_cnt), K_(local_allocator),
K_(unsubmitted_cnt), K_(unsynced_cnt),
K_(logging_blocked), K_(unset_active_memtable_logging_blocked), K_(resolve_active_memtable_left_boundary),
K_(contain_hotspot_row), K_(max_end_log_ts), K_(rec_log_ts), K_(snapshot_version),
K_(contain_hotspot_row), K_(max_end_log_ts), K_(rec_log_ts), K_(snapshot_version), K_(migration_clog_checkpoint_ts),
K_(is_tablet_freeze), K_(is_force_freeze), K_(contain_hotspot_row),
K_(read_barrier), K_(is_flushed), K_(freeze_state));
private:
......@@ -588,7 +589,7 @@ int ObMemtable::save_multi_source_data_unit(const T *const multi_source_data_uni
if (OB_FAIL(ret)) {
}
// skip updating max_end_log_ts of frozen memtable for commit/abort when replay clog.
else if ((ObLogTsRange::MAX_TS == get_end_log_ts() || !for_replay || !is_callback)
else if ((!for_replay || !is_callback)
&& OB_FAIL(set_max_end_log_ts(log_ts))) {
TRANS_LOG(WARN, "failed to set max_end_log_ts", K(ret), K(log_ts), KPC(this));
} else if (OB_FAIL(set_rec_log_ts(log_ts))) {
......
......@@ -341,7 +341,7 @@ int ObTablet::init(
// this interface for migration to batch update table store
// use old tablet clog_checkpoint_ts to avoid lose tx data
// use max schema to makesure sstable and schema match
))) {
))) {
LOG_WARN("failed to init tablet meta", K(ret), K(old_tablet), K(param), K(tx_data), K(ddl_data), K(autoinc_seq));
} else if (OB_FAIL(table_store_.build_ha_new_table_store(*allocator_, this, param, old_tablet.table_store_))) {
LOG_WARN("failed to init table store", K(ret), K(old_tablet));
......@@ -2820,34 +2820,36 @@ int ObTablet::set_memtable_clog_checkpoint_ts(
ObTableHandleV2 handle;
memtable::ObMemtable *memtable = nullptr;
if (OB_ISNULL(tablet_meta)) {
//no need to set memtable clog checkpoint ts
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not inited", K(ret), K_(is_inited));
} else if (OB_ISNULL(tablet_meta)) {
// no need to set memtable clog checkpoint ts
} else if (tablet_meta->clog_checkpoint_ts_ <= tablet_meta_.clog_checkpoint_ts_) {
//do nothing
} else if (OB_FAIL(get_memtable_mgr(memtable_mgr))) {
LOG_WARN("failed to get memtable mgr", K(ret));
} else if (tablet_meta_.tablet_id_.is_ls_inner_tablet()) {
if (memtable_mgr->has_memtable()) {
// do nothing
} else if (is_ls_inner_tablet()) {
if (OB_UNLIKELY(memtable_mgr->has_memtable())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ls inner tablet should not has memtable", K(ret), KPC(tablet_meta));
LOG_WARN("ls inner tablet should not have memtable", K(ret), KPC(tablet_meta));
}
} else if (OB_FAIL(memtable_mgr->get_active_memtable(handle))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_WARN("failed to get active memtable for tablet", K(ret), KPC(this), KPC(tablet_meta));
} else {
} else if (OB_FAIL(get_memtable_mgr(memtable_mgr))) {
LOG_WARN("failed to get memtable mgr", K(ret));
} else if (OB_FAIL(memtable_mgr->get_boundary_memtable(handle))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
} else {
LOG_WARN("failed to get boundary memtable for tablet", K(ret), KPC(this), KPC(tablet_meta));
}
} else if (OB_FAIL(handle.get_data_memtable(memtable))) {
LOG_WARN("failed to get memtalbe", K(ret), K(handle));
LOG_WARN("failed to get memtable", K(ret), K(handle));
} else if (OB_ISNULL(memtable)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null memtable", K(ret), KPC(memtable));
} else if (OB_FAIL(memtable->set_migration_clog_checkpoint_ts(tablet_meta->clog_checkpoint_ts_))) {
LOG_WARN("failed to set migration clog checkpoint ts", K(ret), K(handle), KPC(this));
}
return ret;
}
} // namespace storage
} // namespace oceanbase
......@@ -457,6 +457,7 @@ private:
int set_tx_data_in_tablet_pointer(const ObTabletTxMultiSourceDataUnit &tx_data);
int get_max_sync_storage_schema_version(int64_t &max_schema_version) const;
int check_max_sync_schema_version() const;
template<class T>
int dec_unsynced_cnt_for_if_need(
T &multi_source_data_unit,
......@@ -775,7 +776,15 @@ int ObTablet::save_multi_source_data_unit(
} else if (is_callback) {
memtable::ObMemtable *memtable = nullptr;
if (OB_FAIL(memtable_mgr_->get_memtable_for_multi_source_data_unit(memtable, msd->type()))) {
TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_log_ts));
if (OB_ENTRY_NOT_EXIST == ret && for_replay) {
TRANS_LOG(INFO, "clog_checkpoint_ts of ls is bigger than the commit_info log_ts of this multi-trans in replay, failed to get multi source data unit",
K(ret), K(ls_id), K(tablet_id), K(memtable_log_ts));
if (OB_FAIL(save_multi_source_data_unit(msd, memtable_log_ts, for_replay, ref_op, false))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_log_ts), K(ref_op));
}
} else {
TRANS_LOG(WARN, "failed to get multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_log_ts));
}
} else if (OB_FAIL(memtable->save_multi_source_data_unit(msd, memtable_log_ts, for_replay, ref_op, is_callback))) {
TRANS_LOG(WARN, "failed to save multi source data unit", K(ret), K(ls_id), K(tablet_id), K(memtable_log_ts), K(ref_op));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册