提交 5dcc0e59 编写于 作者: H hiddenbomb 提交者: wangzelin.wzl

add disk warning state in log file store for slog

上级 fdef86a6
......@@ -3174,12 +3174,17 @@ int ObCLogMgr::get_election_group_priority(const uint64_t tenant_id, election::O
if (OB_SUCCESS != (tmp_ret = ObIOManager::get_instance().is_disk_error(is_data_disk_error))) {
CLOG_LOG(WARN, "is_data_disk_error failed", K(tmp_ret));
}
const bool is_slog_disk_warning = SLOGGER.is_disk_warning();
if (is_clog_disk_hang) {
priority.set_system_clog_disk_hang();
}
if (is_data_disk_error) {
priority.set_system_data_disk_error();
}
if (is_slog_disk_warning) {
priority.set_system_slog_disk_warning();
}
if (!partition_service_->is_service_started()) {
priority.set_system_service_not_started();
}
......
......@@ -3920,6 +3920,7 @@ int ObPartitionLogService::on_get_election_priority(election::ObElectionPriority
bool is_data_disk_error = false;
bool is_disk_space_enough = log_engine_->is_disk_space_enough();
bool is_clog_disk_hang = log_engine_->is_clog_disk_hang();
const bool is_slog_disk_warning = SLOGGER.is_disk_warning();
const ObReplicaProperty replica_property = mm_.get_replica_property();
const uint64_t log_id = sw_.get_max_confirmed_log_id();
if (OB_SUCCESS != (tmp_ret = ObIOManager::get_instance().is_disk_error(is_data_disk_error))) {
......@@ -3953,6 +3954,9 @@ int ObPartitionLogService::on_get_election_priority(election::ObElectionPriority
if (is_data_disk_error) {
priority.set_system_data_disk_error();
}
if (is_slog_disk_warning) {
priority.set_system_slog_disk_warning();
}
if (is_tenant_out_of_mem) {
priority.set_system_tenant_out_of_memory();
}
......
......@@ -74,6 +74,12 @@ void ObElectionGroupPriority::set_system_data_disk_error()
system_score_ += SYSTEM_SCORE_DATA_DISK_ERROR * 100;
}
void ObElectionGroupPriority::set_system_slog_disk_warning()
{
// slog use the same disk with sstable(data disk)
system_score_ += SYSTEM_SCORE_DATA_DISK_ERROR * 100;
}
void ObElectionGroupPriority::set_system_service_not_started()
{
system_score_ += SYSTEM_SCORE_SERVICE_NOT_STARTED * 100;
......
......@@ -50,6 +50,7 @@ public:
}
void set_system_clog_disk_hang();
void set_system_data_disk_error();
void set_system_slog_disk_warning();
void set_system_service_not_started();
int compare(const ObElectionGroupPriority& priority) const;
......
......@@ -217,6 +217,12 @@ void ObElectionPriority::set_system_data_disk_error()
system_score_ += SYSTEM_SCORE_DATA_DISK_ERROR * 100;
}
void ObElectionPriority::set_system_slog_disk_warning()
{
// slog use the same disk with sstable(data disk)
system_score_ += SYSTEM_SCORE_DATA_DISK_ERROR * 100;
}
void ObElectionPriority::set_system_need_rebuild()
{
system_score_ += SYSTEM_SCORE_NEED_REBUILD * 100;
......
......@@ -80,6 +80,7 @@ public:
void set_system_clog_disk_hang();
void set_system_tenant_out_of_memory();
void set_system_data_disk_error();
void set_system_slog_disk_warning();
void set_system_need_rebuild();
void set_system_in_election_blacklist();
void set_system_service_not_started();
......
......@@ -116,15 +116,17 @@ int ObHeartBeatProcess::init_lease_request(ObLeaseRequest& lease_request)
(err_zone == lease_request.zone_ && enable_disk_error_test) ? LEASE_REQUEST_DATA_DISK_ERROR : 0;
#else
int tmp_ret = OB_SUCCESS;
bool is_disk_error = false;
if (OB_SUCCESS != (tmp_ret = ObIOManager::get_instance().is_disk_error_definite(is_disk_error))) {
bool is_data_disk_error = false;
const bool is_slog_disk_warning = SLOGGER.is_disk_warning();
if (OB_SUCCESS != (tmp_ret = ObIOManager::get_instance().is_disk_error_definite(is_data_disk_error))) {
CLOG_LOG(WARN, "is_disk_error_definite failed", K(tmp_ret));
} else if (OB_UNLIKELY(is_data_disk_error) || OB_UNLIKELY(is_slog_disk_warning)) {
const int64_t PRINT_LOG_INTERVAL_IN_US = 60 * 1000 * 1000; // 1min
if (REACH_TIME_INTERVAL(PRINT_LOG_INTERVAL_IN_US)) {
LOG_WARN("error occurs on data disk or slog disk", K(is_data_disk_error), K(is_slog_disk_warning));
}
}
bool is_slog_ok = true;
if (OB_SUCCESS != (tmp_ret = SLOGGER.is_logger_ok(is_slog_ok))) {
CLOG_LOG(WARN, "is_logger_ok failed", K(tmp_ret));
}
lease_request.server_status_ |= (is_disk_error || !is_slog_ok) ? LEASE_REQUEST_DATA_DISK_ERROR : 0;
lease_request.server_status_ |= (is_data_disk_error || is_slog_disk_warning) ? LEASE_REQUEST_DATA_DISK_ERROR : 0;
#endif
}
return ret;
......
......@@ -248,7 +248,8 @@ int ObILogFileStore::format_file_path(
return ret;
}
ObLogFileStore::ObLogFileStore() : is_inited_(false), disk_mgr_(NULL), write_fd_(), io_ctx_(NULL)
ObLogFileStore::ObLogFileStore()
: is_inited_(false), disk_mgr_(NULL), write_fd_(), io_ctx_(NULL), is_disk_warning_(false)
{
for (int32_t i = 0; i < MAX_DISK_COUNT; i++) {
memset(&io_reqs_[i], 0, sizeof(io_reqs_[i]));
......@@ -388,6 +389,7 @@ int ObLogFileStore::write(void* buf, int64_t count, int64_t offset)
} else if (OB_FAIL(prepare_write_info(buf, count, offset))) {
COMMON_LOG(ERROR, "prepare io info fail", K(ret));
} else {
const int64_t write_begin_ts = common::ObTimeUtility::fast_current_time();
while (need_retry) {
ret = OB_SUCCESS;
new_req_cnt = 0;
......@@ -398,6 +400,17 @@ int ObLogFileStore::write(void* buf, int64_t count, int64_t offset)
} else if (OB_FAIL(process_io_getevents(submitted, io_ctx_, io_events_))) {
COMMON_LOG(ERROR, "process get events fail", K(ret), K(new_req_cnt), K(submitted), K(retry_cnt), K_(write_fd));
}
if (OB_SUCC(ret)) {
if (is_disk_warning()) {
set_disk_warning(false);
}
} else if (!is_disk_warning()) {
const int64_t write_finish_ts = common::ObTimeUtility::fast_current_time();
const int64_t log_write_timeout_us = GCONF.data_storage_warning_tolerance_time;
if (write_finish_ts - write_begin_ts > log_write_timeout_us) {
set_disk_warning(true);
}
}
need_retry = process_retry(ret, retry_cnt);
}
......@@ -439,7 +452,7 @@ int ObLogFileStore::read(void* buf, int64_t count, int64_t offset, int64_t& read
int64_t rd_size = 0;
int64_t rd_offset = 0;
int64_t event_res = 0;
int retry = 0;
int64_t retry = 0;
struct timespec timeout;
for (int32_t i = 0; OB_SUCC(ret) && event_sz < count && i < write_fd_.count(); i++) {
......@@ -518,6 +531,7 @@ void ObLogFileStore::destroy()
ob_io_destroy(io_ctx_);
}
disk_mgr_ = NULL;
set_disk_warning(false);
is_inited_ = false;
}
......
......@@ -14,7 +14,8 @@
#define OB_LOG_FILE_STORE_H_
#include <sys/stat.h>
#include "ob_log_disk_manager.h"
#include "lib/atomic/ob_atomic.h"
#include "share/redolog/ob_log_disk_manager.h"
#include "storage/blocksstable/ob_block_sstable_struct.h"
namespace oceanbase {
......@@ -159,6 +160,8 @@ public:
virtual void try_recycle_file() = 0;
virtual int update_free_quota() = 0;
virtual bool free_quota_warn() const = 0;
virtual bool is_disk_warning() const = 0;
virtual void set_disk_warning(bool disk_warning) = 0;
virtual ObRedoLogType get_redo_log_type() const
{
return log_type_;
......@@ -182,7 +185,7 @@ protected:
static const int64_t CLOG_AIO_TIMEOUT_SECOND = 300;
static const int64_t AIO_RETRY_INTERVAL_US = 100 * 1000; // 100ms
static const int64_t MAX_DISK_COUNT = ObLogDiskManager::MAX_DISK_COUNT;
static const int MAX_IO_RETRY = 3;
static const int64_t MAX_IO_RETRY = LLONG_MAX;
ObRedoLogType log_type_;
};
......@@ -259,6 +262,16 @@ public:
return disk_mgr_->free_quota_warn();
}
virtual bool is_disk_warning() const
{
return ATOMIC_LOAD(&is_disk_warning_);
}
virtual void set_disk_warning(bool disk_warning)
{
ATOMIC_STORE(&is_disk_warning_, disk_warning);
}
private:
int inner_open(const int64_t file_id, const int8_t flag, ObLogFileDescriptor& log_fd);
int inner_close(ObLogFileDescriptor& log_fd);
......@@ -285,6 +298,9 @@ private:
struct iocb* io_req_ptrs_[MAX_DISK_COUNT];
ObLogFileIOInfo pending_wr_[MAX_DISK_COUNT];
// io hang state
bool is_disk_warning_;
DISALLOW_COPY_AND_ASSIGN(ObLogFileStore);
};
} // namespace common
......
......@@ -315,20 +315,15 @@ int ObBaseStorageLogger::abort()
return ret;
}
int ObBaseStorageLogger::is_logger_ok(bool& is_ok)
bool ObBaseStorageLogger::is_disk_warning() const
{
int ret = OB_SUCCESS;
is_ok = true;
if (!is_inited_) {
ret = OB_NOT_INIT;
STORAGE_REDO_LOG(WARN, "The ObBaseStorageLogger has not been inited.", K(ret));
bool b_ret = false;
if (OB_UNLIKELY(!is_inited_)) {
b_ret = false;
} else {
is_ok = log_writer_.is_ok();
if (!is_ok) {
STORAGE_REDO_LOG(WARN, "storage log writer is not ok");
}
b_ret = log_writer_.is_disk_warning();
}
return ret;
return b_ret;
}
int ObBaseStorageLogger::get_active_cursor(ObLogCursor& log_cursor)
......
......@@ -57,7 +57,7 @@ public:
// Thread safe.
// abort a transaction
virtual int abort();
int is_logger_ok(bool& is_ok);
bool is_disk_warning() const;
// Thread safe
virtual int get_active_cursor(common::ObLogCursor& log_cursor) override;
......
......@@ -237,6 +237,7 @@ int ObStorageLogWriter::ObSerializableBuffer::serialize(char* buf, int64_t limit
ObStorageLogWriter::ObStorageLogWriter()
: is_inited_(false),
is_started_(false),
log_buffers_(),
log_buffer_size_(0),
log_item_allocator_(),
......@@ -252,8 +253,6 @@ ObStorageLogWriter::ObStorageLogWriter()
cur_file_id_(-1),
write_cursor_(),
flush_cursor_(),
is_ok_(false),
write_failed_times_(0),
file_store_(nullptr),
batch_write_buf_(nullptr),
batch_write_len_(0),
......@@ -317,9 +316,9 @@ int ObStorageLogWriter::init(
write_cursor_.reset();
flush_cursor_.reset();
set_ok(false);
write_failed_times_ = 0;
is_inited_ = true;
LOG_INFO(
"storage log writer init finished", K(ret), K(log_dir), K(log_file_size), K(max_log_size), K(max_trans_cnt));
}
}
return ret;
......@@ -339,8 +338,6 @@ void ObStorageLogWriter::destroy()
cur_file_id_ = -1;
write_cursor_.reset();
flush_cursor_.reset();
set_ok(false);
write_failed_times_ = 0;
ObLogStoreFactory::destroy(file_store_);
log_item_allocator_.reset();
ObBaseLogWriter::destroy();
......@@ -349,6 +346,7 @@ void ObStorageLogWriter::destroy()
batch_write_buf_ = nullptr;
}
batch_write_len_ = 0;
is_started_ = false;
is_inited_ = false;
}
......@@ -367,8 +365,7 @@ int ObStorageLogWriter::start_log(const ObLogCursor& start_cursor)
build_cursor_ = start_cursor;
write_cursor_ = start_cursor;
flush_cursor_ = start_cursor;
set_ok(true);
write_failed_times_ = 0;
is_started_ = true;
LOG_INFO("start log", K(start_cursor));
}
return ret;
......@@ -380,9 +377,9 @@ int ObStorageLogWriter::flush_log(
int ret = OB_SUCCESS;
ObStorageLogItem* log_item = NULL;
start_cursor.reset();
if (IS_NOT_INIT) {
if (OB_UNLIKELY(!is_started_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
LOG_WARN("log writer not started yet", K(ret), K_(is_started), K_(is_inited));
} else if (log_buffer.is_empty()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(log_buffer));
......@@ -842,44 +839,10 @@ int ObStorageLogWriter::write_logs_local(common::ObIBaseLogItem** items, const i
const int64_t offset = flush_cursor_.offset_;
const int64_t start_ts = ObTimeUtility::fast_current_time();
// retry until success
while (OB_SUCC(ret)) {
ret = file_store_->write(write_buf, write_len, offset);
if (OB_SUCC(ret)) {
if (--write_failed_times_ <= 0) {
write_failed_times_ = 0;
set_ok(true);
}
break;
} else {
if (++write_failed_times_ >= 10) {
write_failed_times_ = 10;
set_ok(false);
}
if (REACH_TIME_INTERVAL(30 * 1000 * 1000)) {
LOG_ERROR("log write failed",
K(ret),
K(write_len),
K(flush_cursor_),
K(cur_file_id_),
K(item_cnt),
K(sync_idx),
K(cur_idx));
} else {
LOG_WARN("log write failed",
K(ret),
K(write_len),
K(flush_cursor_),
K(cur_file_id_),
K(item_cnt),
K(sync_idx),
K(cur_idx));
}
ret = OB_SUCCESS;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(file_store_->write(write_buf, write_len, offset))) {
// should never happen
LOG_ERROR("failed to write slog", K(ret));
} else {
batch_write_len_ = 0;
const int64_t duration = ObTimeUtility::fast_current_time() - start_ts;
if (duration > 10000) {
......@@ -1004,7 +967,7 @@ int ObStorageLogWriter::write_and_sync_logs(
int ObStorageLogWriter::sync_log(common::ObIBaseLogItem** items, int64_t& sync_index, const int64_t write_index)
{
int ret = OB_SUCCESS;
if (NULL == items || write_index < 0 || sync_index < -1) {
if (OB_UNLIKELY(NULL == items || write_index < 0 || sync_index < -1)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguments", K(ret), KP(items), K(write_index), K(sync_index));
} else {
......@@ -1030,5 +993,10 @@ int ObStorageLogWriter::sync_log(common::ObIBaseLogItem** items, int64_t& sync_i
}
return ret;
}
bool ObStorageLogWriter::is_disk_warning() const
{
return is_started_ ? file_store_->is_disk_warning() : false;
}
} // end namespace blocksstable
} // end namespace oceanbase
......@@ -94,16 +94,8 @@ public:
}
int delete_log_file(int64_t file_id);
int get_using_disk_space(int64_t& using_space) const;
OB_INLINE bool is_ok() const
{
return ATOMIC_LOAD(&is_ok_);
}
OB_INLINE void set_ok(bool ok)
{
ATOMIC_STORE(&is_ok_, ok);
}
int get_using_disk_space(int64_t &using_space) const;
bool is_disk_warning() const;
private:
static const int64_t FLUSH_THREAD_IDLE_INTERVAL_US = 10 * 1000; // 10ms
......@@ -201,6 +193,7 @@ private:
private:
bool is_inited_;
bool is_started_;
common::ObFixedQueue<void> log_buffers_;
int64_t log_buffer_size_;
common::ObArenaAllocator log_item_allocator_;
......@@ -221,12 +214,8 @@ private:
common::ObLogCursor write_cursor_;
common::ObLogCursor flush_cursor_;
// indicate whether slog writer is ok for outside checking
bool is_ok_;
int64_t write_failed_times_;
common::ObILogFileStore* file_store_;
char* batch_write_buf_;
common::ObILogFileStore *file_store_;
char *batch_write_buf_;
int64_t batch_write_len_;
int64_t batch_limit_size_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册