提交 5987bcac 编写于 作者: O obdev 提交者: ob-robot

Restore log suspend when log disk full

上级 6b0c6d40
......@@ -172,6 +172,7 @@ ob_set_subtarget(ob_logservice restoreservice
restoreservice/ob_log_restore_allocator.cpp
restoreservice/ob_remote_fetch_context.cpp
restoreservice/ob_log_restore_scheduler.cpp
restoreservice/ob_log_restore_controller.cpp
)
......
......@@ -216,6 +216,12 @@ int PalfHandle::get_begin_scn(SCN &scn) const
return palf_handle_impl_->get_begin_scn(scn);
}
int PalfHandle::get_base_lsn(LSN &lsn) const
{
CHECK_VALID;
return palf_handle_impl_->get_base_lsn(lsn);
}
int PalfHandle::get_base_info(const LSN &lsn,
PalfBaseInfo &palf_base_info)
{
......
......@@ -150,6 +150,9 @@ public:
int get_begin_lsn(LSN &lsn) const;
int get_begin_scn(share::SCN &scn) const;
// return the max recyclable point of Palf
int get_base_lsn(LSN &lsn) const;
// PalfBaseInfo include the 'base_lsn' and the 'prev_log_info' of sliding window.
// @param[in] const LSN&, base_lsn of ls.
// @param[out] PalfBaseInfo&, palf_base_info
......
......@@ -305,6 +305,18 @@ int PalfHandleImpl::get_begin_scn(SCN &scn)
return ret;
}
int PalfHandleImpl::get_base_lsn(LSN &lsn) const
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "PalfHandleImpl not init", K(ret), KPC(this));
} else {
lsn = get_base_lsn_used_for_block_gc();
}
return ret;
}
int PalfHandleImpl::get_base_info(const LSN &base_lsn, PalfBaseInfo &base_info)
{
int ret = OB_SUCCESS;
......
......@@ -417,6 +417,7 @@ public:
virtual int locate_by_lsn_coarsely(const LSN &lsn, share::SCN &result_scn) = 0;
virtual int get_begin_lsn(LSN &lsn) const = 0;
virtual int get_begin_scn(share::SCN &scn) = 0;
virtual int get_base_lsn(LSN &lsn) const = 0;
virtual int get_base_info(const LSN &base_lsn, PalfBaseInfo &base_info) = 0;
virtual int get_min_block_info_for_gc(block_id_t &min_block_id, share::SCN &max_scn) = 0;
......@@ -728,6 +729,7 @@ public:
public:
int get_begin_lsn(LSN &lsn) const override final;
int get_begin_scn(share::SCN &scn) override final;
int get_base_lsn(LSN &lsn) const override final;
int get_base_info(const LSN &base_lsn, PalfBaseInfo &base_info) override final;
int get_min_block_info_for_gc(block_id_t &min_block_id, share::SCN &max_scn) override final;
// return the block length which the previous data was committed
......
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX CLOG
#include <algorithm>
#include "lib/oblog/ob_log_module.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include "lib/atomic/ob_atomic.h"
#include "lib/time/ob_time_utility.h"
#include "share/ob_debug_sync.h" // DEBUG
#include "logservice/ob_log_service.h"
#include "ob_log_restore_controller.h"
namespace oceanbase
{
namespace logservice
{
ObLogRestoreController::ObLogRestoreController() :
inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
log_service_(NULL),
available_capacity_(0),
last_refresh_ts_(common::OB_INVALID_TIMESTAMP)
{}
ObLogRestoreController::~ObLogRestoreController()
{
destroy();
}
int ObLogRestoreController::init(const uint64_t tenant_id, ObLogService *log_service)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("ObLogRestoreController init twice", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
|| OB_ISNULL(log_service)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(tenant_id), K(log_service));
} else {
tenant_id_ = tenant_id;
log_service_ = log_service;
inited_ = true;
}
return ret;
}
void ObLogRestoreController::destroy()
{
inited_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
log_service_ = NULL;
available_capacity_ = 0;
last_refresh_ts_ = OB_INVALID_TIMESTAMP;
}
int ObLogRestoreController::get_quota(const int64_t size, bool &succ)
{
int ret = OB_SUCCESS;
succ = false;
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
} else if (size > ATOMIC_LOAD(&available_capacity_)) {
succ = false;
} else {
ATOMIC_SAF(&available_capacity_, size);
succ = true;
}
return ret;
}
int ObLogRestoreController::update_quota()
{
int ret = OB_SUCCESS;
int64_t used_size = 0;
int64_t total_size = 0;
palf::PalfOptions palf_opt;
auto get_palf_used_size_func = [&](const palf::PalfHandle &palf_handle) -> int {
int ret = OB_SUCCESS;
int64_t palf_id = -1;
palf::LSN base_lsn;
palf::LSN end_lsn;
palf_handle.get_palf_id(palf_id);
if (OB_FAIL(palf_handle.get_base_lsn(base_lsn))) {
CLOG_LOG(WARN, "get palf base_lsn failed", K(palf_id));
} else if (OB_FAIL(palf_handle.get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get palf end_lsn failed", K(palf_id));
} else if (OB_UNLIKELY(base_lsn > end_lsn)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpected base_lsn or end_lsn", K(palf_id), K(base_lsn), K(end_lsn));
} else {
used_size += static_cast<int64_t>(end_lsn - base_lsn);
}
return ret;
};
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogRestoreController not init", K(tenant_id_));
} else if (! need_update_()) {
} else if (OB_FAIL(log_service_->get_palf_options(palf_opt))) {
LOG_WARN("get palf option failed", K(tenant_id_));
} else if (OB_FAIL(log_service_->iterate_palf(get_palf_used_size_func))) {
LOG_WARN("fail to get palf_options", K(tenant_id_));
} else {
total_size = palf_opt.disk_options_.log_disk_usage_limit_size_;
const int64_t capacity = std::max(total_size / 100 * palf_opt.disk_options_.log_disk_utilization_threshold_ - used_size, 0L);
ATOMIC_SET(&available_capacity_, capacity);
last_refresh_ts_ = common::ObTimeUtility::fast_current_time();
LOG_TRACE("update log restore quota succ", K(tenant_id_), K(used_size), K(total_size), K(capacity), K(last_refresh_ts_));
}
return ret;
}
bool ObLogRestoreController::need_update_() const
{
return common::ObTimeUtility::fast_current_time() - last_refresh_ts_ >= LOG_RESTORE_CONTROL_REFRESH_INTERVAL;
}
} // namespace logservice
} // namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_CONTROLLER_H_
#define OCEANBASE_LOGSERVICE_OB_LOG_RESTORE_CONTROLLER_H_
#include "lib/utility/ob_macro_utils.h"
#include <cstdint>
namespace oceanbase
{
namespace logservice
{
class ObLogService;
// Only support single thread get and update log restore quota
class ObLogRestoreController
{
const int64_t LOG_RESTORE_CONTROL_REFRESH_INTERVAL = 1000 * 1000L; // 1s
public:
ObLogRestoreController();
~ObLogRestoreController();
public:
int init(const uint64_t tenant_id, ObLogService *log_service);
void destroy();
int update_quota();
int get_quota(const int64_t size, bool &succ);
private:
bool need_update_() const;
private:
bool inited_;
uint64_t tenant_id_;
ObLogService *log_service_;
int64_t available_capacity_;
int64_t last_refresh_ts_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogRestoreController);
};
} // namespace logservice
} // namespace oceanbase
#endif
......@@ -61,11 +61,13 @@ int ObLogRestoreService::init(rpc::frame::ObReqTransport *transport,
LOG_WARN("invalid argument", K(ret), K(transport), K(ls_svr), K(log_service));
} else if (OB_FAIL(proxy_.init(transport))) {
LOG_WARN("proxy_ init failed", K(ret));
} else if (OB_FAIL(restore_controller_.init(MTL_ID(), log_service))) {
LOG_WARN("restore_controller_ init failed");
} else if (OB_FAIL(location_adaptor_.init(MTL_ID(), ls_svr))) {
LOG_WARN("location_adaptor_ init failed", K(ret));
} else if (OB_FAIL(fetch_log_impl_.init(MTL_ID(), ls_svr, log_service, &fetch_log_worker_))) {
LOG_WARN("fetch_log_impl_ init failed", K(ret));
} else if (OB_FAIL(fetch_log_worker_.init(MTL_ID(), &allocator_, this, ls_svr))) {
} else if (OB_FAIL(fetch_log_worker_.init(MTL_ID(), &allocator_, &restore_controller_, this, ls_svr))) {
LOG_WARN("fetch_log_worker_ init failed", K(ret));
} else if (OB_FAIL(error_reporter_.init(MTL_ID(), ls_svr))) {
LOG_WARN("error_reporter_ init failed", K(ret));
......@@ -87,6 +89,7 @@ void ObLogRestoreService::destroy()
fetch_log_worker_.destroy();
stop();
wait();
restore_controller_.destroy();
location_adaptor_.destroy();
fetch_log_impl_.destroy();
error_reporter_.destroy();
......@@ -158,6 +161,8 @@ void ObLogRestoreService::run1()
void ObLogRestoreService::do_thread_task_()
{
if (is_user_tenant(MTL_ID())) {
update_restore_quota_();
update_upstream_();
schedule_fetch_log_();
......@@ -168,6 +173,11 @@ void ObLogRestoreService::do_thread_task_()
}
}
void ObLogRestoreService::update_restore_quota_()
{
(void)restore_controller_.update_quota();
}
void ObLogRestoreService::update_upstream_()
{
(void)location_adaptor_.update_upstream();
......
......@@ -23,6 +23,7 @@
#include "ob_remote_error_reporter.h" // ObRemoteErrorReporter
#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator
#include "ob_log_restore_scheduler.h" // ObLogRestoreScheduler
#include "ob_log_restore_controller.h" // ObLogRestoreController
namespace oceanbase
{
......@@ -66,6 +67,7 @@ public:
private:
void run1();
void do_thread_task_();
void update_restore_quota_();
void update_upstream_();
void schedule_fetch_log_();
void schedule_resource_();
......@@ -75,6 +77,7 @@ private:
bool inited_;
ObLSService *ls_svr_;
ObLogResSvrRpc proxy_;
ObLogRestoreController restore_controller_;
ObRemoteLocationAdaptor location_adaptor_;
ObRemoteFetchLogImpl fetch_log_impl_;
ObRemoteFetchWorker fetch_log_worker_;
......
......@@ -28,6 +28,7 @@
#include "ob_fetch_log_task.h" // ObFetchLogTask
#include "ob_log_restore_handler.h" // ObLogRestoreHandler
#include "ob_log_restore_allocator.h" // ObLogRestoreAllocator
#include "ob_log_restore_controller.h"
#include "storage/tx_storage/ob_ls_handle.h" // ObLSHandle
#include "logservice/archiveservice/ob_archive_define.h" // archive
#include "storage/tx_storage/ob_ls_map.h" // ObLSIterator
......@@ -59,6 +60,7 @@ using namespace share;
ObRemoteFetchWorker::ObRemoteFetchWorker() :
inited_(false),
tenant_id_(OB_INVALID_TENANT_ID),
restore_controller_(NULL),
restore_service_(NULL),
ls_svr_(NULL),
task_queue_(),
......@@ -73,6 +75,7 @@ ObRemoteFetchWorker::~ObRemoteFetchWorker()
int ObRemoteFetchWorker::init(const uint64_t tenant_id,
ObLogRestoreAllocator *allocator,
ObLogRestoreController *restore_controller,
ObLogRestoreService *restore_service,
ObLSService *ls_svr)
{
......@@ -84,15 +87,18 @@ int ObRemoteFetchWorker::init(const uint64_t tenant_id,
LOG_ERROR("ObRemoteFetchWorker has been initialized", K(ret));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id)
|| OB_ISNULL(allocator)
|| OB_ISNULL(restore_controller)
|| OB_ISNULL(restore_service)
|| OB_ISNULL(ls_svr)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(allocator), K(restore_service), K(ls_svr));
LOG_WARN("invalid argument", K(tenant_id), K(restore_controller),
K(allocator), K(restore_service), K(ls_svr));
} else if (OB_FAIL(task_queue_.init(FETCH_LOG_TASK_LIMIT, "RFLTaskQueue", MTL_ID()))) {
LOG_WARN("task_queue_ init failed", K(ret));
} else {
tenant_id_ = tenant_id;
allocator_ = allocator;
restore_controller_ = restore_controller;
restore_service_ = restore_service;
ls_svr_ = ls_svr;
inited_ = true;
......@@ -122,6 +128,7 @@ void ObRemoteFetchWorker::destroy()
restore_service_ = NULL;
ls_svr_ = NULL;
allocator_ = NULL;
restore_controller_ = NULL;
inited_ = false;
}
}
......@@ -334,6 +341,7 @@ int ObRemoteFetchWorker::submit_entries_(const ObLSID &id,
int64_t size = 0;
LSN lsn;
while (OB_SUCC(ret) && ! has_set_stop()) {
bool quota_done = false;
if (OB_FAIL(iter.next(entry, lsn, buf, size))) {
if (OB_ITER_END != ret) {
LOG_WARN("ObRemoteLogIterator next failed", K(ret), K(iter));
......@@ -345,6 +353,10 @@ int ObRemoteFetchWorker::submit_entries_(const ObLSID &id,
LOG_WARN("entry is invalid", K(ret), K(entry), K(lsn), K(iter));
} else if (base_lsn > lsn) {
LOG_INFO("repeated log, just skip", K(ret), K(id), K(lsn), K(base_lsn), K(entry));
} else if (OB_FAIL(wait_restore_quota_(entry.get_serialize_size(), quota_done))) {
LOG_WARN("wait restore quota failed", K(ret), K(entry));
} else if (! quota_done) {
break;
} else if (OB_FAIL(submit_log_(id, proposal_id, lsn,
entry.get_scn(), buf, entry.get_serialize_size()))) {
LOG_WARN("submit log failed", K(ret), K(iter), K(buf), K(entry), K(lsn));
......@@ -356,6 +368,25 @@ int ObRemoteFetchWorker::submit_entries_(const ObLSID &id,
return ret;
}
int ObRemoteFetchWorker::wait_restore_quota_(const int64_t size, bool &done)
{
int ret = OB_SUCCESS;
done = false;
while (OB_SUCC(ret) && ! done && ! has_set_stop()) {
if (OB_FAIL(restore_controller_->get_quota(size, done))) {
LOG_WARN("get quota failed");
} else if (! done) {
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) {
LOG_INFO("clog disk is not enough, just wait", K(size));
} else {
LOG_TRACE("get quota succ", K(size));
}
usleep(100 * 1000L); // if get quota not done, sleep 100ms
}
}
return ret;
}
int ObRemoteFetchWorker::submit_log_(const ObLSID &id,
const int64_t proposal_id,
const LSN &lsn,
......
......@@ -44,6 +44,7 @@ class ObRemoteSourceGuard;
class ObRemoteLogParent;
class ObLogRestoreService;
class ObLogRestoreAllocator;
class ObLogRestoreController;
using oceanbase::share::ObLSID;
using oceanbase::palf::LSN;
// Remote fetch log worker
......@@ -53,8 +54,11 @@ public:
ObRemoteFetchWorker();
~ObRemoteFetchWorker();
int init(const uint64_t tenant_id, ObLogRestoreAllocator *allocator,
ObLogRestoreService *restore_service, storage::ObLSService *ls_svr);
int init(const uint64_t tenant_id,
ObLogRestoreAllocator *allocator,
ObLogRestoreController *restore_controller,
ObLogRestoreService *restore_service,
storage::ObLSService *ls_svr);
void destroy();
int start();
void stop();
......@@ -78,6 +82,7 @@ private:
ObRemoteLogGroupEntryIterator &iter);
int submit_log_(const ObLSID &id, const int64_t proposal_id, const LSN &lsn,
const share::SCN &scn, const char *buf, const int64_t buf_size);
int wait_restore_quota_(const int64_t size, bool &done);
void mark_if_to_end_(ObFetchLogTask &task, const share::SCN &upper_limit_scn, const share::SCN &scn);
int try_retire_(ObFetchLogTask *&task);
void try_update_location_info_(const ObFetchLogTask &task, ObRemoteLogGroupEntryIterator &iter);
......@@ -94,6 +99,7 @@ private:
private:
bool inited_;
uint64_t tenant_id_;
ObLogRestoreController *restore_controller_;
ObLogRestoreService *restore_service_;
storage::ObLSService *ls_svr_;
common::ObLightyQueue task_queue_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册