提交 9b1f5bb0 编写于 作者: O obdev 提交者: OB-robot

Split logstream_freeze and tablet_freeze

上级 4a31e464
......@@ -296,7 +296,7 @@ LATCH_DEF(TENANT_MEM_USAGE_LOCK, 285, "tenant memory usage lock" , LATCH_FIFO, 2
LATCH_DEF(TX_TABLE_LOCK, 286, "tx table lock", LATCH_FIFO, 2000, 0, TX_TABLE_LOCK_WAIT, "tx table lock")
LATCH_DEF(MEMTABLE_STAT_LOCK, 287, "metmable stat lock", LATCH_FIFO, 2000, 0, MEMTABLE_STAT_LOCK_WAIT, "memtable stat lock")
LATCH_DEF(DEADLOCK_DETECT_LOCK, 288, "deadlock detect lock", LATCH_FIFO, 2000, 0, DEADLOCK_DETECT_LOCK_WAIT, "deadlock detect lock")
LATCH_DEF(FREEZE_THREAD_POOL_LOCK, 289, "freeze thread pool lock", LATCH_FIFO, 2000, 0, FREEZE_THREAD_POOL_WAIT, "freeze thread pool lock")
LATCH_DEF(LATCH_END, 99999, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end")
#endif
......
......@@ -345,6 +345,7 @@ WAIT_EVENT_DEF(REPLAY_STATUS_WAIT, 16051, "replay status lock wait", "", "", "",
WAIT_EVENT_DEF(REPLAY_STATUS_TASK_WAIT, 16052, "replay status task lock wait", "", "", "", CONCURRENCY, "REPLAY_STATUS_TASK_WAIT", true)
WAIT_EVENT_DEF(MAX_APPLY_SCN_WAIT, 16053, "max apply scn lock wait", "", "", "", CONCURRENCY, "MAX_APPLY_SCN_WAIT", true)
WAIT_EVENT_DEF(GC_HANDLER_WAIT, 16054, "gc handler lock wait", "", "", "", CONCURRENCY, "GC_HANDLER_WAIT", true)
WAIT_EVENT_DEF(FREEZE_THREAD_POOL_WAIT, 16055, "freeze thread pool wait", "", "", "", CONCURRENCY, "FREEZE_THREAD_POOL_WAIT", true)
//replication group
WAIT_EVENT_DEF(RG_TRANSFER_LOCK_WAIT, 17000, "transfer lock wait", "src_rg", "dst_rg", "transfer_pkey", CONCURRENCY, "transfer lock wait", false)
......
......@@ -276,6 +276,17 @@ bool ObDataCheckpoint::is_flushing() const
return !ls_freeze_finished_;
}
bool ObDataCheckpoint::is_empty()
{
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObSpinLockGuard guard(lock_);
return new_create_list_.is_empty() &&
active_list_.is_empty() &&
prepare_list_.is_empty() &&
ls_frozen_list_.is_empty();
}
static inline bool task_reach_time_interval(int64_t i, int64_t &last_time)
{
bool bret = false;
......@@ -355,35 +366,51 @@ void ObDataCheckpoint::ls_frozen_to_active_(int64_t &last_time)
bool ls_frozen_list_is_empty = false;
do {
{
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->is_active_checkpoint()) {
ObSpinLockGuard guard(lock_);
// avoid new active ob_freeze_checkpoint block minor merge
// push back to new_create_list and wait next freeze
if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) {
STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed",
K(ret), K(*ob_freeze_checkpoint));
}
} else {
ObSpinLockGuard guard(lock_);
if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) {
STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint));
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_UNLIKELY(ls_->is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta));
} else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
} else {
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->is_active_checkpoint()) {
ObSpinLockGuard guard(lock_);
// avoid new active ob_freeze_checkpoint block minor merge
// push back to new_create_list and wait next freeze
if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) {
STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed",
K(ret), K(*ob_freeze_checkpoint));
}
} else {
ObSpinLockGuard guard(lock_);
if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) {
STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint));
}
}
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (OB_NOT_RUNNING == ret && is_empty()) {
ls_frozen_list_is_empty = true;
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (!ls_frozen_list_is_empty) {
ob_usleep(LOOP_TRAVERSAL_INTERVAL_US);
if (task_reach_time_interval(3 * 1000 * 1000, last_time)) {
STORAGE_LOG(WARN, "cost too much time in ls_frozen_list_", K(ls_->get_ls_id()));
STORAGE_LOG(WARN, "cost too much time in ls_frozen_list_", K(ret), K(ls_->get_ls_id()));
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
print_list_(ls_frozen_list_);
}
......@@ -401,28 +428,44 @@ void ObDataCheckpoint::ls_frozen_to_prepare_(int64_t &last_time)
bool ls_frozen_list_is_empty = false;
do {
{
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int tmp_ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->ready_for_flush()) {
if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) {
STORAGE_LOG(WARN, "finish freeze failed", K(ret));
}
} else if (ob_freeze_checkpoint->is_active_checkpoint()) {
// avoid active ob_freeze_checkpoint block minor merge
// push back to active_list and wait next freeze
ObSpinLockGuard guard(lock_);
if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) {
STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed",
K(tmp_ret), K(*ob_freeze_checkpoint));
int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA;
int64_t write_lock = 0;
ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock);
if (OB_UNLIKELY(ls_->is_stopped_)) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta));
} else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) {
ret = OB_NOT_RUNNING;
STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta));
} else {
// traversal list once
ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_);
ObCheckpointIterator iterator;
ls_frozen_list_.get_iterator(iterator);
while (iterator.has_next()) {
int tmp_ret = OB_SUCCESS;
auto ob_freeze_checkpoint = iterator.get_next();
if (ob_freeze_checkpoint->ready_for_flush()) {
if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) {
STORAGE_LOG(WARN, "finish freeze failed", K(ret));
}
} else if (ob_freeze_checkpoint->is_active_checkpoint()) {
// avoid active ob_freeze_checkpoint block minor merge
// push back to active_list and wait next freeze
ObSpinLockGuard guard(lock_);
if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) {
STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed",
K(tmp_ret), K(*ob_freeze_checkpoint));
}
}
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (OB_NOT_RUNNING == ret && is_empty()) {
ls_frozen_list_is_empty = true;
}
ls_frozen_list_is_empty = ls_frozen_list_.is_empty();
}
if (!ls_frozen_list_is_empty) {
......
......@@ -118,6 +118,8 @@ public:
bool has_prepared_flush_checkpoint();
bool is_empty();
private:
// traversal prepare_list to flush memtable
// case1: some memtable flush failed when ls freeze
......
此差异已折叠。
......@@ -22,6 +22,7 @@
#include "storage/checkpoint/ob_freeze_checkpoint.h"
#include "logservice/ob_log_handler.h"
#include "lib/container/ob_array_serialization.h"
#include "share/ob_occam_thread_pool.h"
namespace oceanbase
{
......@@ -183,27 +184,10 @@ public:
public:
ObFreezer();
ObFreezer(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
logservice::ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id);
ObFreezer(ObLS *ls);
~ObFreezer();
int init(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
logservice::ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id);
void set(ObLSWRSHandler *ls_loop_worker,
ObLSTxService *ls_tx_svr,
ObLSTabletService *ls_tablet_svr,
checkpoint::ObDataCheckpoint *data_checkpoint,
logservice::ObILogHandler *ob_loghandler,
const share::ObLSID &ls_id,
uint32_t freeze_flag = 0);
int init(ObLS *ls);
void reset();
void offline() { enable_ = false; }
void online() { enable_ = true; }
......@@ -222,10 +206,12 @@ public:
uint32_t get_freeze_clock() { return ATOMIC_LOAD(&freeze_flag_) & (~(1 << 31)); }
/* ls info */
share::ObLSID &get_ls_id() { return ls_id_; }
checkpoint::ObDataCheckpoint *get_data_checkpoint() { return data_checkpoint_; }
ObLSTxService *get_ls_tx_svr() { return ls_tx_svr_; }
ObLSTabletService *get_ls_tablet_svr() { return ls_tablet_svr_; }
share::ObLSID get_ls_id();
checkpoint::ObDataCheckpoint *get_ls_data_checkpoint();
ObLSTxService *get_ls_tx_svr();
ObLSTabletService *get_ls_tablet_svr();
logservice::ObILogHandler *get_ls_log_handler();
ObLSWRSHandler *get_ls_wrs_handler();
/* freeze_snapshot_version */
share::SCN get_freeze_snapshot_version() { return freeze_snapshot_version_; }
......@@ -285,15 +271,19 @@ private:
/* inner subfunctions for freeze process */
int inner_logstream_freeze();
int submit_log_for_freeze();
void ls_freeze_task();
int tablet_freeze_task(memtable::ObIMemtable *imemtable);
int submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable = nullptr);
void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable);
int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable);
int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable);
int create_memtable_if_no_active_memtable(ObTablet *tablet);
int try_set_tablet_freeze_begin_();
void set_tablet_freeze_begin_();
void set_tablet_freeze_end_();
void set_ls_freeze_begin_();
void set_ls_freeze_end_();
int check_ls_state(); // must be used under the protection of ls_lock
private:
// flag whether the logsteram is freezing
// the first bit: 1, freeze; 0, not freeze
......@@ -307,14 +297,8 @@ private:
// log ts before which will be smaller than the log ts in the latter memtables
share::SCN max_decided_scn_;
ObLSWRSHandler *ls_wrs_handler_;
ObLSTxService *ls_tx_svr_;
ObLSTabletService *ls_tablet_svr_;
checkpoint::ObDataCheckpoint *data_checkpoint_;
logservice::ObILogHandler *loghandler_;
share::ObLSID ls_id_;
ObLS *ls_;
ObFreezerStat stat_;
int64_t empty_memtable_cnt_;
// make sure ls freeze has higher priority than tablet freeze
......
......@@ -59,7 +59,7 @@ const uint64_t ObLS::INNER_TABLET_ID_LIST[TOTAL_INNER_TABLET_NUM] = {
ObLS::ObLS()
: ls_tx_svr_(this),
replay_handler_(this),
ls_freezer_(&ls_wrs_handler_, &ls_tx_svr_, &ls_tablet_svr_, &data_checkpoint_, &log_handler_, ls_meta_.ls_id_),
ls_freezer_(this),
ls_sync_tablet_seq_handler_(),
ls_ddl_log_handler_(),
is_inited_(false),
......@@ -113,7 +113,7 @@ int ObLS::init(const share::ObLSID &ls_id,
LOG_WARN("failed to init ls meta", K(ret), K(tenant_id), K(ls_id), K(replica_type));
} else {
rs_reporter_ = reporter;
ls_freezer_.init(&ls_wrs_handler_, &ls_tx_svr_, &ls_tablet_svr_, &data_checkpoint_, &log_handler_, ls_meta_.ls_id_);
ls_freezer_.init(this);
transaction::ObTxPalfParam tx_palf_param(get_log_handler());
// tx_table_.init() should after ls_table_svr.init()
......
......@@ -118,6 +118,8 @@ class ObLS : public common::ObLink
{
public:
friend ObLSLockGuard;
friend class ObFreezer;
friend class checkpoint::ObDataCheckpoint;
public:
static constexpr int64_t TOTAL_INNER_TABLET_NUM = 3;
static const uint64_t INNER_TABLET_ID_LIST[TOTAL_INNER_TABLET_NUM];
......
......@@ -241,7 +241,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
if (OB_FAIL(add_memtable_(memtable_handle))) {
LOG_WARN("failed to add memtable", K(ret), K(ls_id), K(tablet_id_), K(memtable_handle));
} else if (FALSE_IT(time_guard.click("add memtable"))) {
} else if (OB_FAIL(memtable->add_to_data_checkpoint(freezer_->get_data_checkpoint()))) {
} else if (OB_FAIL(memtable->add_to_data_checkpoint(freezer_->get_ls_data_checkpoint()))) {
LOG_WARN("add to data_checkpoint failed", K(ret), K(ls_id), KPC(memtable));
clean_tail_memtable_();
} else if (FALSE_IT(time_guard.click("add to data_checkpoint"))) {
......
......@@ -41,6 +41,8 @@ ObTenantFreezer::ObTenantFreezer()
rs_mgr_(nullptr),
config_(nullptr),
allocator_mgr_(nullptr),
freeze_thread_pool_(),
freeze_thread_pool_lock_(common::ObLatchIds::FREEZE_THREAD_POOL_LOCK),
exist_ls_freezing_(false),
last_update_ts_(0)
{}
......@@ -87,6 +89,8 @@ int ObTenantFreezer::init()
K(GCONF.self_addr_));
} else if (OB_FAIL(freeze_trigger_pool_.init_and_start(FREEZE_TRIGGER_THREAD_NUM))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze trigger pool", KR(ret));
} else if (OB_FAIL(freeze_thread_pool_.init_and_start(FREEZE_THREAD_NUM))) {
LOG_WARN("[TenantFreezer] fail to initialize freeze thread pool", KR(ret));
} else if (OB_FAIL(freeze_trigger_timer_.init_and_start(freeze_trigger_pool_,
TIME_WHEEL_PRECISION,
"FrzTrigger"))) {
......
......@@ -37,9 +37,11 @@ class ObTenantTxDataFreezeGuard;
class ObTenantFreezer
{
friend ObTenantTxDataFreezeGuard;
friend class ObFreezer;
const static int64_t TIME_WHEEL_PRECISION = 100_ms;
const static int64_t SLOW_FREEZE_INTERVAL = 30_s;
const static int FREEZE_TRIGGER_THREAD_NUM= 1;
const static int FREEZE_THREAD_NUM= 5;
const static int64_t FREEZE_TRIGGER_INTERVAL = 2_s;
const static int64_t UPDATE_INTERVAL = 100_ms;
// replay use 1G/s
......@@ -172,6 +174,8 @@ private:
common::ObOccamThreadPool freeze_trigger_pool_;
common::ObOccamTimer freeze_trigger_timer_;
common::ObOccamTimerTaskRAIIHandle timer_handle_;
common::ObOccamThreadPool freeze_thread_pool_;
ObSpinLock freeze_thread_pool_lock_;
bool exist_ls_freezing_;
int64_t last_update_ts_;
};
......
......@@ -29,6 +29,7 @@
#include "storage/tx/ob_trans_define_v4.h"
#include "storage/memtable/mvcc/ob_mvcc_row.h"
#include "share/scn.h"
#include "storage/ls/ob_ls.h"
namespace oceanbase
{
......@@ -98,7 +99,7 @@ int ObMvccRow::check_double_insert_(const share::SCN ,
class TestMemtable : public testing::Test
{
public:
TestMemtable() : tenant_base_(1001),tablet_id_(1000),rowkey_cnt_(1) {}
TestMemtable() : tenant_base_(1001),tablet_id_(1000),rowkey_cnt_(1) { freezer_.init(&ls_); }
void SetUp() override {
share::ObTenantEnv::set_tenant(&tenant_base_);
// mock columns
......@@ -177,6 +178,7 @@ public:
read_info_.reset();
}
public:
ObLS ls_;
share::ObTenantBase tenant_base_;
storage::ObFreezer freezer_;
storage::ObTabletMemtableMgr memtable_mgr_;
......
......@@ -302,7 +302,7 @@ int TestCompactionPolicy::mock_memtable(
LOG_WARN("failed to init memtable", K(ret));
} else if (OB_FAIL(mt_mgr->add_memtable_(table_handle))) {
LOG_WARN("failed to add memtable to mgr", K(ret));
} else if (OB_FAIL(memtable->add_to_data_checkpoint(mt_mgr->freezer_->get_data_checkpoint()))) {
} else if (OB_FAIL(memtable->add_to_data_checkpoint(mt_mgr->freezer_->get_ls_data_checkpoint()))) {
LOG_WARN("add to data_checkpoint failed", K(ret), KPC(memtable));
mt_mgr->clean_tail_memtable_();
} else if (OB_MAX_SCN_TS_NS != end_border) { // frozen memtable
......
......@@ -65,12 +65,7 @@ public:
tablet_id_(LS_TX_DATA_TABLET),
ls_id_(1),
tenant_id_(1001),
freezer_((ObLSWRSHandler *)(0x1),
(ObLSTxService *)(0x1),
(ObLSTabletService *)(0x1),
(checkpoint::ObDataCheckpoint *)(0x1),
(logservice::ObILogHandler *)(0x1),
ls_id_),
freezer_(&ls_),
t3m_(common::OB_SERVER_TENANT_ID),
mt_mgr_(nullptr),
ctx_mt_mgr_(nullptr),
......@@ -95,7 +90,7 @@ protected:
virtual void SetUp() override
{
ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01));
freezer_.init(&ls_loop_worker_, &ls_tx_service_, &ls_tablet_service_, &ls_data_checkpoint_, &log_handler_, ls_id_);
freezer_.init(&ls_);
EXPECT_EQ(OB_SUCCESS, t3m_.init());
EXPECT_EQ(OB_SUCCESS,
ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册