提交 61b6d056 编写于 作者: D dimstars 提交者: wangzelin.wzl

Support log compression & max retention time (#294)

上级 27b47046
......@@ -136,6 +136,7 @@ ob_set_subtarget(oblib_lib common
oblog/ob_log_module.ipp
oblog/ob_trace_log.cpp
oblog/ob_warning_buffer.cpp
oblog/ob_log_compressor.cpp
ob_name_id_def.cpp
ob_replica_define.cpp
profile/ob_atomic_event.cpp
......@@ -234,6 +235,7 @@ ob_lib_add_pchs(lib
oblog/ob_log.h
oblog/ob_base_log_writer.h
oblog/ob_async_log_struct.h
oblog/ob_log_compressor.h
hash/fnv_hash.h
coro/co_var.h
time/Time.h
......
......@@ -144,6 +144,7 @@ LABEL_ITEM_DEF(OB_BUFFER, Buffer)
LABEL_ITEM_DEF(OB_THREAD_STORE, ThreadStore)
LABEL_ITEM_DEF(OB_LOG_WRITER, LogWriter)
LABEL_ITEM_DEF(OB_LOG_READER, LogReader)
LABEL_ITEM_DEF(OB_LOG_COMPRESSOR, LogCompressor)
LABEL_ITEM_DEF(OB_REGEX, Regex)
LABEL_ITEM_DEF(OB_SLAB, Slab)
LABEL_ITEM_DEF(OB_SLAVE_MGR, SlaveMgr)
......
......@@ -63,10 +63,13 @@ int ObBaseLogWriter::init(const ObBaseLogWriterCfg& log_cfg)
LOG_STDERR("Fail to allocate memory, max_buffer_item_cnt=%lu.\n", log_cfg.max_buffer_item_cnt_);
} else if (0 != pthread_mutex_init(&log_mutex_, NULL)) {
ret = OB_ERR_SYS;
LOG_STDERR("Failed to pthread_mutex_init.\n");
} else if (0 != pthread_cond_init(&log_write_cond_, NULL)) {
ret = OB_ERR_SYS;
LOG_STDERR("Failed to pthread_cond_init.\n");
} else if (0 != pthread_cond_init(&log_flush_cond_, NULL)) {
ret = OB_ERR_SYS;
LOG_STDERR("Failed to pthread_cond_init.\n");
} else {
flush_tid_ = 0;
log_item_push_idx_ = 0;
......
......@@ -31,6 +31,8 @@
#include "lib/container/ob_vector.h"
#include "lib/coro/co.h"
#include "lib/allocator/ob_fifo_allocator.h"
#include "lib/oblog/ob_log_compressor.h"
#include "lib/string/ob_string.h"
using namespace oceanbase::lib;
......@@ -62,6 +64,7 @@ static const int64_t POP_COMPENSATED_TIME[5] = {0, 1, 2, 3, 4}; // for pop time
static int64_t last_check_file_ts = 0; // last file sample timestamps
static int64_t last_check_disk_ts = 0; // last disk sample timestamps
static const int64_t NORMAL_LOG_SIZE = 1 << 10;
static const int64_t FILE_TIME_STR_LEN = 14; // xxxx-xx-xx xx:xx:xx
#if defined TC_REACH_TIME_INTERVAL
#undef TC_REACH_TIME_INTERVAL
......@@ -127,6 +130,41 @@ int process_thread_log_id_level_map(const char* str, const int32_t str_length)
return ret;
}
inline void ob_log_unlink(const char *file_cstr)
{
unlink(file_cstr);
ObString file_name(file_cstr);
unlink(ObLogCompressor::get_compression_file_name(file_name).ptr());
}
time_t get_file_time(const ObString &file_name)
{
time_t ret_time = 0;
ObString time_str;
ObString remaining_string = file_name;
// Walk through the string from back to front until the timestamp found.
while (time_str.empty() && !remaining_string.empty()) {
const char *idx = remaining_string.reverse_find('.');
if (NULL != idx) {
ObString suffix_string = remaining_string.after(idx);
remaining_string = remaining_string.split_on(idx);
if (isdigit(suffix_string[0]) && suffix_string.length() == FILE_TIME_STR_LEN) {
time_str = suffix_string;
}
} else {
remaining_string.reset();
}
}
if (!time_str.empty()) {
struct tm tm;
strptime(time_str.ptr(), "%Y%m%d%H%M%S", &tm);
if (0 > (ret_time = mktime(&tm))) {
ret_time = 0;
}
}
return ret_time;
}
void ObLogIdLevelMap::set_level(const int8_t level)
{
non_mod_level_ = level;
......@@ -410,8 +448,11 @@ const char* const ObLogger::errstr_[] = {"ERROR", "USER_ERR", "WARN", "INFO", "T
ObLogger::ObLogger()
: ObBaseLogWriter(),
log_file_(),
log_compressor_(nullptr),
max_file_size_(DEFAULT_MAX_FILE_SIZE),
max_file_index_(0),
max_file_time_(0),
enable_file_compress_(false),
name_id_map_(),
id_level_map_(),
wf_level_(OB_LOG_LEVEL_WARN),
......@@ -866,6 +907,48 @@ void ObLogger::log_data(const char* mod_name, int32_t level, LogLocation locatio
}
}
void ObLogger::remove_outdated_file(std::deque<std::string> &file_list)
{
if (file_list.size() > 0 && max_file_time_ > 0) {
time_t min_time = time(NULL) - max_file_time_;
if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) {
for (int i = 0; i < file_list.size(); i++) {
ObString file_name(file_list.front().c_str());
time_t file_time = get_file_time(file_name);
if (file_time < min_time) {
file_list.pop_front();
ob_log_unlink(file_name.ptr());
} else {
i = file_list.size();
}
}
(void)pthread_mutex_unlock(&file_index_mutex_);
}
}
}
void ObLogger::update_compression_file(std::deque<std::string> &file_list)
{
if (enable_file_compress_ && NULL != log_compressor_) {
if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) {
for (auto iter = file_list.begin(); iter != file_list.end(); iter++) {
ObString file_name(iter->c_str());
if (isdigit(file_name[file_name.length() - 1])) {
ObString compression_file_name = ObLogCompressor::get_compression_file_name(file_name).ptr();
if (0 != access(file_name.ptr(), F_OK) && 0 == access(compression_file_name.ptr(), F_OK)) {
iter->clear();
iter->assign(compression_file_name.ptr());
} else {
log_compressor_->append_log(file_name);
}
}
}
(void)pthread_mutex_unlock(&file_index_mutex_);
}
}
}
void ObLogger::rotate_log(
const int64_t size, const bool redirect_flag, ObPLogFileStruct& log_struct, const ObPLogFDType fd_type)
{
......@@ -923,12 +1006,12 @@ void ObLogger::rotate_log(const char* filename, const ObPLogFDType fd_type, cons
tm.tm_min,
tm.tm_sec);
if (max_file_index_ > 0) {
if (max_file_index_ > 0 || max_file_time_ > 0) {
if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) {
if (file_list.size() >= max_file_index_) {
if (max_file_index_ > 0 && file_list.size() >= max_file_index_) {
std::string oldFile = file_list.front();
file_list.pop_front();
unlink(oldFile.c_str());
ob_log_unlink(oldFile.c_str());
}
file_list.push_back(old_log_file);
(void)pthread_mutex_unlock(&file_index_mutex_);
......@@ -958,12 +1041,12 @@ void ObLogger::rotate_log(const char* filename, const ObPLogFDType fd_type, cons
}
if (open_wf_flag_ && enable_wf_flag_) {
if (max_file_index_ > 0) {
if (max_file_index_ > 0 || max_file_time_ > 0) {
if (OB_LIKELY(0 == pthread_mutex_lock(&file_index_mutex_))) {
if (wf_file_list.size() >= max_file_index_) {
if (max_file_index_ > 0 && wf_file_list.size() >= max_file_index_) {
std::string old_wf_file = wf_file_list.front();
wf_file_list.pop_front();
unlink(old_wf_file.c_str());
ob_log_unlink(old_wf_file.c_str());
}
wf_file_list.push_back(old_wf_log_file);
(void)pthread_mutex_unlock(&file_index_mutex_);
......@@ -980,6 +1063,20 @@ void ObLogger::rotate_log(const char* filename, const ObPLogFDType fd_type, cons
}
}
}
if (max_file_time_ > 0) {
remove_outdated_file(file_list);
if (open_wf_flag_ && enable_wf_flag_) {
remove_outdated_file(wf_file_list);
}
}
if (enable_file_compress_ && NULL != log_compressor_) {
log_compressor_->append_log(ObString::make_string(old_log_file));
if (open_wf_flag_ && enable_wf_flag_) {
log_compressor_->append_log(ObString::make_string(old_wf_log_file));
}
}
}
}
UNUSED(fd_type);
......@@ -1053,11 +1150,38 @@ int ObLogger::set_max_file_index(int64_t max_file_index)
return ret;
}
int ObLogger::set_max_file_time(int64_t max_file_time)
{
int ret = OB_SUCCESS;
if (max_file_time < 0) {
max_file_time = 0;
}
max_file_time_ = max_file_time / 1000000L; // usecond to second
if (max_file_time_ > 0 && rec_old_file_flag_) {
if (OB_FAIL(record_old_log_file())) {
LOG_WARN("Record old log file error", K(ret));
}
}
return ret;
}
int ObLogger::set_enable_file_compress(bool enable_file_compress)
{
int ret = OB_SUCCESS;
enable_file_compress_ = enable_file_compress;
if (rec_old_file_flag_ && enable_file_compress_) {
if (OB_FAIL(record_old_log_file())) {
LOG_WARN("Record old log file error", K(ret));
}
}
return ret;
}
int ObLogger::set_record_old_log_file(bool rec_old_file_flag)
{
int ret = OB_SUCCESS;
rec_old_file_flag_ = rec_old_file_flag;
if (rec_old_file_flag_ && max_file_index_ > 0) {
if (rec_old_file_flag_ && (max_file_index_ > 0 || max_file_time_ > 0)) {
if (OB_FAIL(record_old_log_file())) {
LOG_WARN("Record old log file error", K(ret));
}
......@@ -1065,6 +1189,17 @@ int ObLogger::set_record_old_log_file(bool rec_old_file_flag)
return ret;
}
int ObLogger::set_log_compressor(ObLogCompressor *log_compressor)
{
int ret = OB_SUCCESS;
if (NULL == log_compressor) {
ret = OB_INVALID_ARGUMENT;
} else {
log_compressor_ = log_compressor;
}
return ret;
}
//@brief string copy with dst's length and src's length checking and src trim.
int64_t str_copy_trim(char* dst, const int64_t dst_length, const char* src, const int64_t src_length)
{
......@@ -1371,7 +1506,7 @@ void ObLogger::log_user_error_line_column(const UserMsgLevel user_msg_level, con
int ObLogger::record_old_log_file()
{
int ret = OB_SUCCESS;
if (max_file_index_ <= 0 || !rec_old_file_flag_) {
if ((max_file_index_ <= 0 && max_file_time_ <= 0) || !rec_old_file_flag_) {
} else {
ObSEArray<FileName, 20> files;
ObSEArray<FileName, 20> wf_files;
......@@ -1509,25 +1644,38 @@ int ObLogger::add_files_to_list(
file_list.clear();
std::string oldFile;
for (int64_t i = 0; OB_SUCC(ret) && i < files_arr->count(); ++i) {
if (file_list.size() >= max_file_index_) {
if (max_file_index_ > 0 && file_list.size() >= max_file_index_) {
oldFile = file_list.front();
file_list.pop_front();
unlink(oldFile.c_str());
ob_log_unlink(oldFile.c_str());
}
file_list.push_back(files_arr->at(i).file_name_);
}
wf_file_list.clear();
std::string old_wf_file;
for (int64_t i = 0; OB_SUCC(ret) && i < wf_files_arr->count(); ++i) {
if (wf_file_list.size() >= max_file_index_) {
if (max_file_index_ > 0 && wf_file_list.size() >= max_file_index_) {
old_wf_file = wf_file_list.front();
wf_file_list.pop_front();
unlink(old_wf_file.c_str());
ob_log_unlink(old_wf_file.c_str());
}
wf_file_list.push_back(wf_files_arr->at(i).file_name_);
}
(void)pthread_mutex_unlock(&file_index_mutex_);
}
if (max_file_time_ > 0) {
remove_outdated_file(file_list);
if (open_wf_flag_ && enable_wf_flag_) {
remove_outdated_file(wf_file_list);
}
}
if (enable_file_compress_) {
update_compression_file(file_list);
if (open_wf_flag_ && enable_wf_flag_) {
update_compression_file(wf_file_list);
}
}
}
return ret;
}
......
......@@ -54,6 +54,8 @@ namespace oceanbase {
namespace common {
class ObFIFOAllocator;
class ObPLogItem;
class ObString;
class ObLogCompressor;
#define OB_LOGGER ::oceanbase::common::ObLogger::get_logger()
#define OB_LOG_NEED_TO_PRINT(level) (OB_UNLIKELY(OB_LOGGER.need_to_print(OB_LOG_LEVEL_##level)))
......@@ -866,10 +868,16 @@ public:
void set_max_file_size(int64_t max_file_size);
//@brief Set the max number of log-files. If max_file_index = 0, no limit.
int set_max_file_index(int64_t max_file_index = 0x0F);
//@brief Set the max retention time of log-files. If max_file_time = 0, no limit.
int set_max_file_time(int64_t max_file_time);
//@brief Set whether compress log-files. If this flag set, will compress all log files.
int set_enable_file_compress(bool enable_file_compress);
//@brief Set whether record old log file. If this flag and max_file_index set,
// will record log files in the directory for log file
int set_record_old_log_file(bool rec_old_file_flag = false);
int set_log_compressor(ObLogCompressor *log_compressor);
//@brief Get current time.
static struct timeval get_cur_tv();
......@@ -1012,6 +1020,9 @@ private:
int add_files_to_list(void* files /*ObIArray<FileName> * */, void* wf_files /*ObIArray<FileName> * */,
std::deque<std::string>& file_list, std::deque<std::string>& wf_file_list);
void remove_outdated_file(std::deque<std::string> &file_list);
void update_compression_file(std::deque<std::string> &file_list);
void rotate_log(
const int64_t size, const bool redirect_flag, ObPLogFileStruct& log_struct, const ObPLogFDType fd_type);
//@brief Rename the log to a filename with fmt. And open a new file with the old, then add old file to file_list.
......@@ -1061,9 +1072,12 @@ private:
static RLOCAL(bool, disable_logging_);
ObPLogFileStruct log_file_[MAX_FD_FILE];
ObLogCompressor *log_compressor_;
int64_t max_file_size_;
int64_t max_file_index_;
int64_t max_file_time_; // max retention time(second) of log-file
int32_t enable_file_compress_; // percentage of log-file to compress
pthread_mutex_t file_size_mutex_;
pthread_mutex_t file_index_mutex_;
......
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include "lib/oblog/ob_log_compressor.h"
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/oblog/ob_log_module.h"
#include "lib/compress/ob_compressor_pool.h"
#include "lib/string/ob_string.h"
#include "lib/ob_define.h"
#include "lib/thread/thread_pool.h"
#include "lib/lock/ob_thread_cond.h"
#include "lib/list/ob_list.h"
#include "lib/lock/ob_mutex.h"
#include "lib/allocator/ob_malloc.h"
#include "lib/oblog/ob_async_log_struct.h"
using namespace oceanbase::lib;
namespace oceanbase {
namespace common {
/* Log files are divided into blocks and then compressed. The default block size is (2M - 1K).*/
static const int32_t DEFAULT_COMPRESSION_BLOCK_SIZE = OB_MALLOC_BIG_BLOCK_SIZE;
/* To prevent extreme cases where the files become larger after compression,
* the size of the decompression buffer needs to be larger than the original data.
* Specific size can refer to the ZSTD code implementation. */
static const int32_t DEFAULT_COMPRESSION_BUFFER_SIZE =
DEFAULT_COMPRESSION_BLOCK_SIZE + DEFAULT_COMPRESSION_BLOCK_SIZE / 128 + 512 + 19;
static const int32_t DEFAULT_FILE_NAME_SIZE = ObPLogFileStruct::MAX_LOG_FILE_NAME_SIZE;
static const int32_t DEFAULT_LOG_QUEUE_DEPTH = 100000;
ObLogCompressor::ObLogCompressor() : is_inited_(false), has_stoped_(true), compressor_(NULL)
{}
ObLogCompressor::~ObLogCompressor()
{}
int ObLogCompressor::init()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_STDERR("The ObLogCompressor has been inited.\n");
} else if (OB_FAIL(file_list_.init(DEFAULT_LOG_QUEUE_DEPTH))) {
ret = OB_ERR_SYS;
LOG_STDERR("Failed to init file_list_.\n");
} else if (OB_FAIL(log_compress_cond_.init(ObWaitEventIds::DEFAULT_COND_WAIT))) {
ret = OB_ERR_SYS;
LOG_STDERR("Failed to init ObThreadCond.\n");
} else {
ObCompressor *ptr = NULL;
if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(ZSTD_1_3_8_COMPRESSOR, ptr))) {
LOG_STDERR("Fail to get_compressor, err_code=%d.\n", ret);
} else {
compressor_ = ptr;
has_stoped_ = false;
if (OB_FAIL(start())) {
ret = OB_ERR_SYS;
LOG_STDERR("Fail to create log compression thread.\n");
} else {
is_inited_ = true;
LOG_STDOUT("Success to create thread.\n");
}
}
if (ret) {
log_compress_cond_.destroy();
}
}
if (!is_inited_) {
destroy();
}
return ret;
}
void ObLogCompressor::destroy()
{
if (is_inited_) {
is_inited_ = false;
log_compress_cond_.lock();
file_list_.destroy();
has_stoped_ = true;
log_compress_cond_.signal();
log_compress_cond_.unlock();
wait();
log_compress_cond_.destroy();
}
}
ObString ObLogCompressor::get_compression_file_name(const ObString &file_name)
{
ObString compression_file_name;
ObString suffix_str = ".zst";
int size = file_name.length();
if (size && 0 == file_name[size - 1]) {
size -= 1;
}
if (size > 0 && size + 1 + suffix_str.length() <= DEFAULT_FILE_NAME_SIZE) {
const char *idx = NULL;
if (size > 4 && NULL != (idx = file_name.reverse_find('.')) && idx != file_name.ptr() &&
0 == file_name.after(--idx).compare(suffix_str)) {
} else {
char *buf = (char *)ob_malloc(size + 1 + suffix_str.length(), ObModIds::OB_LOG_COMPRESSOR);
if (buf) {
compression_file_name.assign_buffer(buf, DEFAULT_FILE_NAME_SIZE);
if (size != compression_file_name.write(file_name.ptr(), size)) {
ob_free(buf);
} else {
compression_file_name.write(".zst\0", 5);
}
}
}
}
return compression_file_name;
}
ObCompressor *ObLogCompressor::get_compressor()
{
return compressor_;
}
int ObLogCompressor::append_log(const ObString &file_name)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_STDERR("The ObLogCompressor has not been inited.\n");
} else if (file_name.empty()) {
ret = OB_INVALID_ARGUMENT;
} else {
char *buf = (char *)ob_malloc(file_name.length() + 1 + sizeof(ObString), ObModIds::OB_LOG_COMPRESSOR);
if (buf) {
ObString *file_name_ptr = (ObString *)buf;
file_name_ptr->assign_buffer(buf + sizeof(ObString), file_name.length() + 1);
if (file_name.length() != file_name_ptr->write(file_name.ptr(), file_name.length()) ||
(0 != file_name[file_name.length() - 1] && 1 != file_name_ptr->write("\0", 1))) {
ob_free(buf);
} else {
log_compress_cond_.lock();
file_list_.push(file_name_ptr);
log_compress_cond_.signal();
log_compress_cond_.unlock();
}
}
}
return ret;
}
int ObLogCompressor::log_compress_block(
char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size)
{
int ret = OB_SUCCESS;
int64_t size = -1;
if (OB_FAIL(((ObCompressor *)compressor_)->compress(src, src_size, dest, dest_size, size))) {
LOG_STDERR("Failed to compress, err_code=%d.\n", ret);
} else {
return_size = size;
}
return ret;
}
void ObLogCompressor::log_compress()
{
int ret = OB_SUCCESS;
static int sleep_us = 100 * 1000; // 100ms
int src_size = DEFAULT_COMPRESSION_BLOCK_SIZE;
int dest_size = DEFAULT_COMPRESSION_BUFFER_SIZE;
char *src_buf = (char *)ob_malloc(src_size + dest_size, ObModIds::OB_LOG_COMPRESSOR);
char *dest_buf = src_buf + src_size;
if (!src_buf) {
LOG_STDERR("Failed to ob_malloc.\n");
} else {
while (!has_stoped_) {
ObString *file_name = NULL;
log_compress_cond_.lock();
while (0 >= file_list_.get_total() && !has_stoped_) {
log_compress_cond_.wait(0);
}
if (!has_stoped_) {
ret = file_list_.pop(file_name);
}
log_compress_cond_.unlock();
if (has_stoped_ || NULL == file_name || file_name->empty() || 0 != access(file_name->ptr(), F_OK)) {
} else {
ObString compression_file_name = get_compression_file_name(*file_name);
FILE *input_file = NULL;
FILE *output_file = NULL;
if (compression_file_name.empty()) {
LOG_STDERR("Failed to get_compression_file_name.\n");
} else if (NULL == (input_file = fopen(file_name->ptr(), "r"))) {
LOG_STDERR("Failed to fopen, err_code=%d.\n", errno);
} else if (NULL == (output_file = fopen(compression_file_name.ptr(), "w"))) {
fclose(input_file);
LOG_STDERR("Failed to fopen, err_code=%d.\n", errno);
} else {
size_t read_size = 0;
size_t write_size = 0;
while (OB_SUCC(ret) && !feof(input_file)) {
if ((read_size = fread(src_buf, 1, src_size, input_file)) > 0) {
if (OB_FAIL(log_compress_block(dest_buf, dest_size, src_buf, read_size, write_size))) {
LOG_STDERR("Failed to log_compress_block, err_code=%d.\n", ret);
} else if (write_size != fwrite(dest_buf, 1, write_size, output_file)) {
ret = OB_ERR_SYS;
LOG_STDERR("Failed to fwrite, err_code=%d.\n", errno);
}
}
usleep(sleep_us);
}
fclose(input_file);
fclose(output_file);
if (0 != access(file_name->ptr(), F_OK) || OB_SUCCESS != ret) {
unlink(compression_file_name.ptr());
} else {
unlink(file_name->ptr());
}
}
}
}
}
if (src_buf) {
ob_free(src_buf);
}
}
void ObLogCompressor::run1()
{
lib::set_thread_name("syslog_compress");
log_compress();
}
int ObLogCompressor::start()
{
ThreadPool::start();
return OB_SUCCESS;
}
void ObLogCompressor::stop()
{
ThreadPool::stop();
}
void ObLogCompressor::wait()
{
ThreadPool::wait();
}
} // namespace common
} // namespace oceanbase
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_LOG_COMPRESSOR_H_
#define OB_LOG_COMPRESSOR_H_
#include "lib/thread/thread_pool.h"
#include "lib/lock/ob_thread_cond.h"
#include "lib/list/ob_list.h"
#include "lib/queue/ob_fixed_queue.h"
namespace oceanbase {
namespace common {
class ObCompressor;
class ObString;
class ObMalloc;
class ObLogCompressor final : public lib::ThreadPool {
public:
ObLogCompressor();
virtual ~ObLogCompressor();
static ObString get_compression_file_name(const ObString &file_name);
int init();
void destroy();
int append_log(const ObString &file_name);
ObCompressor *get_compressor();
private:
int log_compress_block(char *dest, size_t dest_size, const char *src, size_t src_size, size_t &return_size);
void log_compress();
void run1() override;
int start() override;
void stop() override;
void wait() override;
private:
bool is_inited_;
bool has_stoped_;
ObFixedQueue<ObString> file_list_;
ObThreadCond log_compress_cond_;
ObCompressor *compressor_;
};
} // namespace common
} // namespace oceanbase
#endif /* OB_LOG_COMPRESSOR_H_ */
......@@ -5,7 +5,7 @@ target_link_libraries(oblib_testbase INTERFACE -lgmock -lgtest)
function(oblib_addtest mainfile)
get_filename_component(testname ${mainfile} NAME_WE)
add_executable(${testname} ${ARGV})
target_link_libraries(${testname} PRIVATE oblib oblib_testbase -static-libgcc -static-libstdc++)
target_link_libraries(${testname} PRIVATE easy aio -L${DEP_DIR}/lib/mariadb mariadb oblib oblib_testbase -static-libgcc -static-libstdc++)
endfunction()
add_subdirectory(lib)
......
......@@ -92,6 +92,7 @@ oblib_addtest(net/test_ob_addr.cpp)
oblib_addtest(number/test_number_v2.cpp)
oblib_addtest(oblog/test_base_log_buffer.cpp)
oblib_addtest(oblog/test_base_log_writer.cpp)
oblib_addtest(oblog/test_ob_log_compressor.cpp)
oblib_addtest(oblog/test_ob_log_obj.cpp)
oblib_addtest(oblog/test_ob_log_performance.cpp)
oblib_addtest(profile/test_ob_trace_id.cpp)
......
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include "lib/oblog/ob_log_compressor.h"
#include "lib/compress/ob_compressor_pool.h"
#include "lib/ob_errno.h"
#include "lib/string/ob_string.h"
using namespace oceanbase::lib;
namespace oceanbase {
namespace common {
TEST(ObLogCompressor, normal)
{
int ret = OB_SUCCESS;
int test_count = 1000;
int test_size = test_count * sizeof(int);
ObLogCompressor log_compressor;
ObCompressor *compressor;
// normal init
ret = log_compressor.init();
ASSERT_EQ(OB_SUCCESS, ret);
// repeat init
ret = log_compressor.init();
ASSERT_EQ(OB_INIT_TWICE, ret);
// prepare data
ObString file_name = "test_ob_log_compressor_file";
FILE *input_file = fopen(file_name.ptr(), "w");
ASSERT_EQ(true, NULL != input_file);
int data[test_count];
for (int i = 0; i < test_count; i++) {
data[i] = i;
}
ret = fwrite(data, 1, test_size, input_file);
ASSERT_EQ(test_size, ret);
fclose(input_file);
// normal append
ret = log_compressor.append_log(file_name);
ASSERT_EQ(OB_SUCCESS, ret);
// get compression result
sleep(2);
ObString compression_file_name = log_compressor.get_compression_file_name(file_name);
ASSERT_EQ(0, access(compression_file_name.ptr(), F_OK));
FILE *output_file = fopen(compression_file_name.ptr(), "r");
ASSERT_EQ(true, NULL != output_file);
int buf_size = test_size + 512;
int read_size = 0;
void *buf = malloc(buf_size);
ASSERT_EQ(true, NULL != buf);
read_size = fread(buf, 1, buf_size, output_file);
ASSERT_GT(read_size, 0);
fclose(output_file);
// check decompression result
int64_t decomp_size = 0;
int decomp_buf_size = buf_size;
int *decomp_buf = (int *)malloc(decomp_buf_size);
ASSERT_EQ(true, NULL != decomp_buf);
compressor = (ObCompressor *)log_compressor.get_compressor();
ASSERT_EQ(true, NULL != compressor);
ret = compressor->decompress((char *)buf, read_size, (char *)decomp_buf, decomp_buf_size, decomp_size);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(test_size, decomp_size);
for (int i = 0; i < test_count; i++) {
ASSERT_EQ(*(decomp_buf + i), i);
}
// clear environment
free(buf);
free(decomp_buf);
ASSERT_NE(0, access(file_name.ptr(), F_OK));
ASSERT_EQ(0, access(compression_file_name.ptr(), F_OK));
unlink(compression_file_name.ptr());
// destroy and init
log_compressor.destroy();
ret = log_compressor.init();
ASSERT_EQ(OB_SUCCESS, ret);
// repeat destroy
log_compressor.destroy();
log_compressor.destroy();
}
} // namespace common
} // namespace oceanbase
int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
......@@ -74,6 +74,7 @@
#include "observer/ob_server_memory_cutter.h"
#include "share/ob_bg_thread_monitor.h"
#include "observer/omt/ob_tenant_timezone_mgr.h"
#include "lib/oblog/ob_log_compressor.h"
//#include "share/ob_ofs.h"
using namespace oceanbase::lib;
......@@ -172,10 +173,19 @@ int ObServer::init(const ObServerOptions& opts, const ObPLogWriterCfg& log_cfg)
// set large page param
ObLargePageHelper::set_param(config_.use_large_pages);
if (OB_SUCC(ret)) {
if (OB_FAIL(log_compressor_.init())) {
LOG_ERROR("log compressor init error.", K(ret));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(OB_LOGGER.init(log_cfg))) {
LOG_ERROR("async log init error.", K(ret));
ret = OB_ELECTION_ASYNC_LOG_WARN_INIT;
} else if (OB_FAIL(OB_LOGGER.set_log_compressor(&log_compressor_))) {
LOG_ERROR("set log compressor error.", K(ret));
ret = OB_ELECTION_ASYNC_LOG_WARN_INIT;
}
}
......@@ -420,6 +430,8 @@ void ObServer::destroy()
LOG_WARN("memory dump destroyed");
tenant_timezone_mgr_.destroy();
LOG_WARN("tenant timezone manager destroyed");
log_compressor_.destroy();
LOG_WARN("log compressor destroyed");
LOG_WARN("destroy observer end");
has_destroy_ = true;
}
......@@ -951,10 +963,14 @@ int ObServer::init_pre_setting()
// oblog configuration
if (OB_SUCC(ret)) {
const int max_log_cnt = static_cast<int32_t>(config_.max_syslog_file_count);
const int64_t max_log_time = config_.max_syslog_file_time;
const bool enable_log_compress = config_.enable_syslog_file_compress;
const bool record_old_log_file = config_.enable_syslog_recycle;
const bool log_warn = config_.enable_syslog_wf;
const bool enable_async_syslog = config_.enable_async_syslog;
OB_LOGGER.set_max_file_index(max_log_cnt);
OB_LOGGER.set_max_file_time(max_log_time);
OB_LOGGER.set_enable_file_compress(enable_log_compress);
OB_LOGGER.set_record_old_log_file(record_old_log_file);
LOG_INFO("Whether record old log file", K(record_old_log_file));
OB_LOGGER.set_log_warn(log_warn);
......
......@@ -46,6 +46,7 @@
#include "observer/ob_service.h"
#include "observer/ob_server_reload_config.h"
#include "observer/ob_root_service_monitor.h"
#include "lib/oblog/ob_log_compressor.h"
namespace oceanbase {
namespace omt {
......@@ -426,6 +427,7 @@ private:
blocksstable::ObStorageEnv storage_env_;
share::ObSchemaStatusProxy schema_status_proxy_;
ObSignalWorker sig_worker_;
common::ObLogCompressor log_compressor_;
bool is_log_dir_empty_;
sql::ObConnectResourceMgr conn_res_mgr_;
......
......@@ -32,6 +32,10 @@ int ObReloadConfig::reload_ob_logger_set()
K(ret));
} else if (OB_FAIL(OB_LOGGER.set_max_file_index(static_cast<int32_t>(conf_->max_syslog_file_count)))) {
OB_LOG(ERROR, "fail to set_max_file_index", K(conf_->max_syslog_file_count.get()), K(ret));
} else if (OB_FAIL(OB_LOGGER.set_max_file_time(conf_->max_syslog_file_time))) {
OB_LOG(ERROR, "fail to set_max_file_time", K(conf_->max_syslog_file_time.get()), K(ret));
} else if (OB_FAIL(OB_LOGGER.set_enable_file_compress(conf_->enable_syslog_file_compress))) {
OB_LOG(ERROR, "fail to set_enable_file_compress", K(conf_->enable_syslog_file_compress.str()), K(ret));
} else if (OB_FAIL(OB_LOGGER.set_record_old_log_file(conf_->enable_syslog_recycle))) {
OB_LOG(ERROR, "fail to set_record_old_log_file", K(conf_->enable_syslog_recycle.str()), K(ret));
} else {
......
......@@ -203,6 +203,15 @@ DEF_INT(cluster_id, OB_CLUSTER_PARAMETER, "0", "[1,4294901759]", "ID of the clus
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_STR(obconfig_url, OB_CLUSTER_PARAMETER, "", "URL for OBConfig service",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_syslog_file_compress, OB_CLUSTER_PARAMETER, "False",
"specifies whether to compress archive log files"
"Value: True:turned on; False: turned off",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(max_syslog_file_time, OB_CLUSTER_PARAMETER, "0s", "[0s, 3650d]",
"specifies the maximum retention time of the log files. "
"When this value is set to 0s, no log file will be removed due to time. "
"with default 0s. Range: [0s, 3650d]",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_LOG_LEVEL(syslog_level, OB_CLUSTER_PARAMETER, "INFO",
"specifies the current level of logging. There are DEBUG, TRACE, INFO, WARN, USER_ERR, ERROR, six different log "
"levels.",
......@@ -214,7 +223,7 @@ DEF_INT(max_syslog_file_count, OB_CLUSTER_PARAMETER, "0", "[0,]",
"specifies the maximum number of the log files "
"that can co-exist before the log file recycling kicks in. "
"Each log file can occupy at most 256MB disk space. "
"When this value is set to 0, no log file will be removed. Range: [0, +∞) in integer",
"When this value is set to 0, no log file will be removed due to the file count. Range: [0, +∞) in integer",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(enable_async_syslog, OB_CLUSTER_PARAMETER, "True",
"specifies whether use async log for observer.log, elec.log and rs.log",
......
......@@ -98,6 +98,7 @@ enable_separate_sys_clog
enable_smooth_leader_switch
enable_sql_audit
enable_sql_operator_dump
enable_syslog_file_compress
enable_syslog_recycle
enable_syslog_wf
enable_sys_table_ddl
......@@ -145,6 +146,7 @@ max_px_worker_count
max_stale_time_for_weak_consistency
max_string_print_length
max_syslog_file_count
max_syslog_file_time
memory_chunk_cache_size
memory_limit
memory_limit_percentage
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册