You need to sign in or sign up before continuing.
提交 ef2b6227 编写于 作者: O obdev 提交者: wangzelin.wzl

Decouple clog writer and shared memory file

上级 e76ba6bf
此差异已折叠。
......@@ -37,8 +37,8 @@ public:
ObCLogBaseFileWriter();
virtual ~ObCLogBaseFileWriter();
virtual int init(
const char* log_dir, const char* shm_path, const uint32_t align_size, const common::ObILogFileStore* file_store);
virtual int init(const char *log_dir,
const uint32_t align_size, const common::ObILogFileStore *file_store);
virtual void destroy();
// When log engine start, need to flush remaining content in shared memory buffer to log file
......@@ -96,7 +96,7 @@ protected:
int append_trailer_entry(const uint32_t info_block_offset);
int flush_trailer_entry();
// append all data in buffer to log cache
int cache_buf(ObLogCache* log_cache, const char* buf, const uint32_t buf_len);
int cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len);
OB_INLINE bool need_align() const
{
......@@ -109,9 +109,7 @@ protected:
protected:
bool is_inited_;
common::ObBaseLogBufferCtrl* log_ctrl_;
common::ObBaseLogBuffer* shm_buf_;
char* shm_data_buf_;
char *aligned_data_buf_;
uint32_t buf_write_pos_;
uint32_t file_offset_;
// the last aligned part padding size of the buffer
......@@ -134,8 +132,8 @@ public:
destroy();
}
virtual int init(const char* log_dir, const char* shm_path, const uint32_t align_size,
const common::ObILogFileStore* file_store) override;
virtual int init(const char *log_dir,
const uint32_t align_size, const common::ObILogFileStore *file_store) override;
virtual void destroy();
virtual int load_file(uint32_t& file_id, uint32_t& offset, bool enable_pre_creation = false) override;
......
......@@ -179,8 +179,7 @@ void ObCLogWriter::set_clog_writer_thread_name()
void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64_t item_cnt, int64_t& finish_cnt)
{
int ret = OB_SUCCESS;
ObICLogItem* item = NULL;
int64_t sync_mode = ObServerConfig::get_instance().flush_log_at_trx_commit;
ObICLogItem *item = NULL;
int64_t cur_time = 0;
int64_t io_time = 0;
int64_t flush_time = 0;
......@@ -205,13 +204,7 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64
} else {
const bool is_idempotent = false;
const uint64_t write_len = block_meta_len + item->get_data_len();
ObCLogDiskErrorCB* cb = NULL;
if (CLOG_DISK_SYNC != sync_mode && CLOG_MEM_SYNC != sync_mode) {
if (REACH_TIME_INTERVAL(1000 * 1000)) {
CLOG_LOG(WARN, "Not supported sync mode, ", K(sync_mode));
}
sync_mode = CLOG_DISK_SYNC;
}
ObCLogDiskErrorCB *cb = NULL;
lib::ObMutexGuard guard(file_mutex_);
......@@ -241,10 +234,6 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64
CLOG_LOG(ERROR, "fail to add log item to buf, ", K(ret));
}
}
// invoke callback when memory sync
if (OB_SUCC(ret) && CLOG_MEM_SYNC == sync_mode) {
after_flush(item, block_meta_len, ret, file_writer_->get_cur_file_len(), finish_cnt);
}
int64_t flush_start_offset = -1;
if (OB_SUCC(ret)) {
......@@ -268,10 +257,8 @@ void ObCLogWriter::process_log_items(common::ObIBaseLogItem** items, const int64
if (OB_SUCC(ret)) {
io_time = ObTimeUtility::current_time() - cur_time;
// log flush succeed, invoke callback when disk sync
if (CLOG_DISK_SYNC == sync_mode) {
after_flush(item, block_meta_len, ret, flush_start_offset, finish_cnt);
}
//log flush succeed, invoke callback when disk sync
after_flush(item, block_meta_len, ret, flush_start_offset, finish_cnt);
flush_time = ObTimeUtility::current_time() - cur_time - io_time;
if (flush_time + io_time > 100 * 1000) {
......
......@@ -51,8 +51,10 @@ void ObIlogAccessor::destroy()
inited_ = false;
}
int ObIlogAccessor::init(const char* dir_name, const char* shm_path, const int64_t server_seq,
const common::ObAddr& addr, ObLogCache* log_cache)
int ObIlogAccessor::init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache)
{
int ret = OB_SUCCESS;
const bool use_log_cache = true;
......@@ -69,8 +71,9 @@ int ObIlogAccessor::init(const char* dir_name, const char* shm_path, const int64
CSR_LOG(ERROR, "file_store_ init failed", K(ret));
} else if (OB_FAIL(file_id_cache_.init(server_seq, addr, this))) {
CSR_LOG(ERROR, "file_id_cache_ init failed", K(ret));
} else if (OB_FAIL(direct_reader_.init(
dir_name, shm_path, use_log_cache, log_cache, &log_tail_, ObLogWritePoolType::ILOG_WRITE_POOL))) {
} else if (OB_FAIL(direct_reader_.init(dir_name, nullptr/*no shared memory*/, use_log_cache,
log_cache, &log_tail_,
ObLogWritePoolType::ILOG_WRITE_POOL))) {
CSR_LOG(ERROR, "direct_reader_ init failed", K(ret));
} else if (OB_FAIL(buffer_.init(OB_MAX_LOG_BUFFER_SIZE, CLOG_DIO_ALIGN_SIZE, ObModIds::OB_CLOG_INFO_BLK_HNDLR))) {
CSR_LOG(ERROR, "buffer init failed", K(ret));
......@@ -717,9 +720,12 @@ ObIlogStorage::~ObIlogStorage()
destroy();
}
int ObIlogStorage::init(const char* dir_name, const char* shm_path, const int64_t server_seq,
const common::ObAddr& addr, ObLogCache* log_cache, ObPartitionService* partition_service,
ObCommitLogEnv* commit_log_env)
int ObIlogStorage::init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache,
ObPartitionService *partition_service,
ObCommitLogEnv *commit_log_env)
{
int ret = OB_SUCCESS;
......@@ -733,16 +739,9 @@ int ObIlogStorage::init(const char* dir_name, const char* shm_path, const int64_
} else if (OB_ISNULL(dir_name) || OB_ISNULL(log_cache) || OB_ISNULL(partition_service) || OB_ISNULL(commit_log_env) ||
OB_UNLIKELY(server_seq < 0 || !addr.is_valid())) {
ret = OB_INVALID_ARGUMENT;
CSR_LOG(ERROR,
"invalid argument",
KR(ret),
KP(dir_name),
KP(log_cache),
KP(partition_service),
KP(commit_log_env),
K(server_seq),
K(addr));
} else if (OB_FAIL(ObIlogAccessor::init(dir_name, shm_path, server_seq, addr, log_cache))) {
CSR_LOG(ERROR, "invalid argument", KR(ret), KP(dir_name), KP(log_cache), KP(partition_service),
KP(commit_log_env), K(server_seq), K(addr));
} else if (OB_FAIL(ObIlogAccessor::init(dir_name, server_seq, addr, log_cache))) {
CSR_LOG(ERROR, "failed to init ObIlogAccessor", K(ret));
} else if (OB_FAIL(init_next_ilog_file_id_(next_ilog_file_id))) {
CSR_LOG(ERROR, "get_next_ilog_file_id failed", K(ret));
......
......@@ -40,8 +40,10 @@ public:
virtual void destroy();
public:
int init(const char* dir_name, const char* shm_path, const int64_t server_seq, const common::ObAddr& addr,
ObLogCache* log_cache);
int init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache);
int add_partition_needed_to_file_id_cache(
const common::ObPartitionKey& partition_key, const uint64_t last_replay_log_id);
......@@ -100,8 +102,12 @@ public:
~ObIlogStorage();
public:
int init(const char* dir_name, const char* shm_path, const int64_t server_seq, const common::ObAddr& addr,
ObLogCache* log_cache, storage::ObPartitionService* partition_service, ObCommitLogEnv* commit_log_env);
int init(const char *dir_name,
const int64_t server_seq,
const common::ObAddr &addr,
ObLogCache *log_cache,
storage::ObPartitionService *partition_service,
ObCommitLogEnv *commit_log_env);
void destroy();
int start();
void stop();
......
......@@ -44,6 +44,7 @@ const int64_t CLOG_CACHE_SIZE = 64 * 1024;
const int64_t CLOG_REPLAY_CHECKSUM_WINDOW_SIZE = 1 << 9;
const int64_t CLOG_INFO_BLOCK_SIZE_LIMIT = 1 << 22;
const offset_t OB_INVALID_OFFSET = -1;
const int64_t CLOG_MAX_WRITE_BUFFER_SIZE = 2 << 20;
inline bool is_valid_log_id(const uint64_t log_id)
{
......
......@@ -94,10 +94,14 @@ int ObLogEnv::init(const Config& cfg, const ObAddr& self_addr, ObIInfoBlockHandl
} else if (NULL == (file_store_ = ObLogStoreFactory::create(cfg.log_dir_, cfg.file_size_, write_pool_type))) {
ret = OB_INIT_FAIL;
CLOG_LOG(WARN, "create file store failed.", K(ret));
} else if (OB_FAIL(direct_reader_.init(
cfg.log_dir_, cfg.log_shm_path_, use_log_cache, &log_cache_, &log_tail_, write_pool_type))) {
} else if (OB_FAIL(direct_reader_.init(cfg.log_dir_,
nullptr/*no shared memory*/,
use_log_cache,
&log_cache_,
&log_tail_,
write_pool_type))) {
CLOG_LOG(WARN, "direct reader init error", K(ret), K(enable_log_cache), K(write_pool_type));
} else if (OB_FAIL(init_log_file_writer(cfg.log_dir_, cfg.log_shm_path_, file_store_))) {
} else if (OB_FAIL(init_log_file_writer(cfg.log_dir_, file_store_))) {
CLOG_LOG(WARN, "Fail to init log file writer ", K(ret));
} else {
// do nothing
......@@ -334,14 +338,14 @@ bool ObLogEnv::cluster_version_before_2000_() const
return GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_2000;
}
int ObLogEnv::init_log_file_writer(const char* log_dir, const char* shm_path, const ObILogFileStore* file_store)
int ObLogEnv::init_log_file_writer(const char *log_dir, const ObILogFileStore *file_store)
{
int ret = OB_SUCCESS;
if (nullptr ==
(log_file_writer_ = static_cast<ObCLogBaseFileWriter*>(OB_NEW(ObCLogLocalFileWriter, ObModIds::OB_LOG_WRITER)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
CLOG_LOG(WARN, "alloc file writer failed, ", K(ret));
} else if (OB_FAIL(log_file_writer_->init(log_dir, shm_path, CLOG_DIO_ALIGN_SIZE, file_store))) {
} else if (OB_FAIL(log_file_writer_->init(log_dir, CLOG_DIO_ALIGN_SIZE, file_store))) {
CLOG_LOG(WARN, "Fail to init file writer, ", K(ret));
}
......@@ -758,13 +762,13 @@ int ObLogEngine::init(const ObLogEnv::Config& cfg, const ObAddr& self_addr, obrp
} else if (OB_FAIL(ilog_log_cache_.init(
self_addr, cfg.index_cache_name_, cfg.index_cache_priority_, ilog_hot_cache_size))) {
CLOG_LOG(WARN, "failed to init ilog_log_cache", K(ret));
} else if (OB_FAIL(ilog_storage_.init(cfg.index_log_dir_,
cfg.index_log_shm_path_,
server_seq,
self_addr,
&ilog_log_cache_,
partition_service,
&clog_env_))) {
} else if (OB_FAIL(ilog_storage_.init(
cfg.index_log_dir_,
server_seq,
self_addr,
&ilog_log_cache_,
partition_service,
&clog_env_))) {
CLOG_LOG(WARN, "ilog_storage_ init failed", K(ret));
} else {
batch_rpc_ = batch_rpc;
......
......@@ -143,7 +143,7 @@ protected:
// the clog_writer is returned to be busy
static const int64_t BUFFER_ITEM_CONGESTED_PERCENTAGE = 50;
bool cluster_version_before_2000_() const;
int init_log_file_writer(const char* log_dir, const char* shm_path, const ObILogFileStore* file_store);
int init_log_file_writer(const char *log_dir, const ObILogFileStore *file_store);
bool is_inited_;
Config config_;
......
......@@ -167,7 +167,7 @@ DEFINE_DESERIALIZE(ObLogEntry)
CLOG_LOG(TRACE, "header deserialize error", K(ret), K(data_len), K(new_pos));
} else if (data_len - new_pos < header_.get_data_len()) {
ret = OB_DESERIALIZE_ERROR;
CLOG_LOG(TRACE, "buf is not enough to deserialize clog entry buf", K(ret));
CLOG_LOG(TRACE, "buf is not enough to deserialize clog entry buf", K(ret), K(header_), K(data_len), K(new_pos));
} else if (header_.get_data_len() < 0) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN, "get invalid data len", K(ret), "data_len", header_.get_data_len());
......
......@@ -26,6 +26,7 @@
#include "share/cache/ob_kv_storecache.h"
#include "share/ob_tenant_mgr.h"
#include "observer/ob_server_struct.h"
#include "share/redolog/ob_log_file_reader.h"
#include <libaio.h>
#include <gtest/gtest.h>
......@@ -146,7 +147,6 @@ public:
protected:
char log_path_[1024];
char shm_path_[1024];
char* log_buf_;
ObLogDir log_dir_;
ObLogWriteFilePool write_pool_;
......@@ -163,8 +163,6 @@ TestCLogWriter::TestCLogWriter()
{
getcwd(log_path_, 1024);
strcat(log_path_, "/test_clog");
getcwd(shm_path_, 1024);
strcat(shm_path_, "/test_clog/shm_buf");
clog_cfg_.log_file_writer_ = &log_file_writer_;
clog_cfg_.log_cache_ = &log_cache_;
clog_cfg_.tail_ptr_ = &tail_cursor_;
......@@ -189,6 +187,7 @@ void TestCLogWriter::SetUp()
common::ObAddr addr(ObAddr::VER::IPV4, "100.81.152.48", 2828);
GCTX.self_addr_ = addr;
system("rm -rf ./test_clog");
system("mkdir test_clog");
const int64_t hot_cache_size = 1L << 27;
ret = log_cache_.init(addr, "clog_cache", 1, hot_cache_size);
......@@ -200,10 +199,11 @@ void TestCLogWriter::SetUp()
ASSERT_EQ(OB_SUCCESS, ret);
ret = file_store_.init(log_path_, CLOG_FILE_SIZE, clog_cfg_.type_);
ASSERT_EQ(OB_SUCCESS, ret);
ret = log_file_writer_.init(log_path_, shm_path_, CLOG_DIO_ALIGN_SIZE, &file_store_);
ret = log_file_writer_.init(log_path_, CLOG_DIO_ALIGN_SIZE, &file_store_);
ASSERT_EQ(OB_SUCCESS, ret);
ret = clog_writer_.init(clog_cfg_);
ASSERT_EQ(OB_SUCCESS, ret);
ret = OB_LOG_FILE_READER.init();
}
void TestCLogWriter::TearDown()
......@@ -290,16 +290,38 @@ TEST_F(TestCLogWriter, border)
ret = clog_writer_.start(file_id, offset);
ASSERT_EQ(OB_SUCCESS, ret);
const ObPartitionKey partition_key(1, 3001, 1);
// normal write
while (true) {
if (offset > CLOG_MAX_DATA_OFFSET - 2 * 1024 * 1024) {
log_item.data_len_ = 1024 * 1024 + 512 * 1024;
} else {
log_item.data_len_ = ObRandom::rand(1, 1024 * 1024);
log_item.data_len_ = ObRandom::rand(1024, 1024 * 1024);
}
log_item.is_flushed_ = false;
memset(log_item.buf_, (uint8_t)ObRandom::rand(100, 132), log_item.data_len_);
ObLogEntryHeader header;
common::ObProposalID rts;
rts.ts_ = ObTimeUtility::fast_current_time();
int64_t pos = 0;
const int64_t header_size = header.get_serialize_size();
ret = header.generate_header(OB_LOG_SUBMIT, partition_key,
1, log_item.buf_ + header_size, log_item.data_len_ - header_size,
ObTimeUtility::fast_current_time(),
ObTimeUtility::fast_current_time(),
rts,
ObTimeUtility::fast_current_time(),
ObVersion(0), true);
ASSERT_EQ(common::OB_SUCCESS, ret);
ret = header.serialize(log_item.buf_, log_item.data_len_, pos);
ASSERT_EQ(common::OB_SUCCESS, ret);
ASSERT_EQ(header_size, pos);
CLOG_LOG(INFO, "flush one item start", K(file_id), K(offset), K(log_item.data_len_));
ret = clog_writer_.append_log(log_item);
ASSERT_EQ(OB_SUCCESS, ret);
log_item.wait();
......@@ -316,25 +338,43 @@ TEST_F(TestCLogWriter, border)
offset = (uint32_t)log_item.data_len_ + static_cast<offset_t>(block_meta_size);
}
CLOG_LOG(INFO, "flush one item, ", K(offset), K(log_item.offset_), K(log_item.data_len_), K(file_id));
CLOG_LOG(INFO, "flush one item end", K(offset), K(log_item.offset_), K(log_item.data_len_), K(file_id));
if (file_id > 3) {
break;
}
}
// start with earlier pos
ObBaseLogBufferCtrl* log_ctrl = NULL;
ret = ObBaseLogBufferMgr::get_instance().get_buffer(shm_path_, log_ctrl);
ASSERT_EQ(OB_SUCCESS, ret);
clog_writer_.destroy();
log_file_writer_.reset();
log_ctrl->base_buf_->file_flush_pos_.file_offset_ = 0;
int64_t test_lower = lower_align(4096, 4096);
int64_t test_upper = upper_align(4096, 4096);
CLOG_LOG(INFO, "cooper", K(test_lower), K(test_upper));
ASSERT_EQ(4096, test_lower);
ASSERT_EQ(4096, test_upper);
//try locate last time write position
ObLogDirectReader log_reader;
ObLogCache log_cache;
ObTailCursor tail_cursor;
file_id_t restart_file_id = 0;
offset_t restart_file_id_offset = 0;
ret = log_reader.init(log_path_, nullptr, true, &log_cache_, &tail_cursor, ObLogWritePoolType::CLOG_WRITE_POOL);
ASSERT_EQ(OB_SUCCESS, ret);
ret = locate_clog_tail(1000000000, &file_store_, &log_reader, restart_file_id, restart_file_id_offset);
ASSERT_EQ(OB_SUCCESS, ret);
offset = upper_align(offset, CLOG_DIO_ALIGN_SIZE);
ASSERT_EQ(file_id, restart_file_id);
ASSERT_EQ(offset, restart_file_id_offset);
// continue write clog
ret = clog_writer_.init(clog_cfg_);
ASSERT_EQ(OB_SUCCESS, ret);
offset_t new_offset = 0;
ret = clog_writer_.start(file_id, new_offset);
ret = clog_writer_.start(restart_file_id, restart_file_id_offset);
ASSERT_EQ(OB_SUCCESS, ret);
log_item.data_len_ = ObRandom::rand(1, 1024 * 1024);
log_item.is_flushed_ = false;
......@@ -344,6 +384,9 @@ TEST_F(TestCLogWriter, border)
log_item.wait();
ASSERT_EQ(OB_SUCCESS, log_item.err_code_);
ASSERT_EQ(offset + static_cast<offset_t>(block_meta_size), log_item.offset_);
log_reader.destroy();
log_cache.destroy();
}
TEST_F(TestCLogWriter, errsim_aio_timeout)
......@@ -393,75 +436,12 @@ TEST_F(TestCLogWriter, errsim_aio_timeout)
usleep(100 * 1000);
#endif
}
TEST_F(TestCLogWriter, crash)
{
int ret = OB_SUCCESS;
int fd = 0;
ObBaseLogBufferCtrl* log_ctrl = NULL;
ObBaseLogBuffer* shm_buf = NULL;
file_id_t file_id = 1;
offset_t offset = 0;
ObAtomicFilePos file_pos;
int64_t write_len = 4096 * 8;
ret = clog_writer_.start(file_id, offset);
ASSERT_EQ(OB_SUCCESS, ret);
ret = ObBaseLogBufferMgr::get_instance().get_buffer(shm_path_, log_ctrl);
ASSERT_EQ(OB_SUCCESS, ret);
shm_buf = log_ctrl->base_buf_;
// set first 32K data
memset(log_ctrl->data_buf_, (uint8_t)ObRandom::rand(100, 132), write_len);
file_pos.file_id_ = file_id;
file_pos.file_offset_ = (uint32_t)write_len;
ATOMIC_STORE(&shm_buf->file_write_pos_.atomic_, file_pos.atomic_);
clog_writer_.destroy();
log_file_writer_.reset();
ret = clog_writer_.init(clog_cfg_);
ASSERT_EQ(OB_SUCCESS, ret);
file_id = file_pos.file_id_;
offset = file_pos.file_offset_;
ret = clog_writer_.start(file_id, offset);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(shm_buf->file_flush_pos_.file_id_, file_id);
ASSERT_EQ(shm_buf->file_flush_pos_.file_offset_, write_len);
ret = write_pool_.get_fd(file_pos.file_id_, fd);
ASSERT_EQ(OB_SUCCESS, ret);
char* read_buf = (char*)ob_malloc_align(4096, write_len);
ob_pread(fd, read_buf, write_len, 0);
ASSERT_TRUE(0 == memcmp(log_ctrl->data_buf_, read_buf, write_len));
// continue next 32K data and let writer catch up
memset(log_ctrl->data_buf_, (uint8_t)ObRandom::rand(132, 164), write_len);
file_pos.file_id_ = file_id;
file_pos.file_offset_ += (uint32_t)write_len;
ATOMIC_STORE(&shm_buf->file_write_pos_.atomic_, file_pos.atomic_);
clog_writer_.destroy();
log_file_writer_.reset();
ret = clog_writer_.init(clog_cfg_);
ASSERT_EQ(OB_SUCCESS, ret);
file_id = file_pos.file_id_;
offset = file_pos.file_offset_;
ret = clog_writer_.start(file_id, offset);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(shm_buf->file_flush_pos_.file_id_, file_id);
ASSERT_EQ(shm_buf->file_flush_pos_.file_offset_, write_len + write_len);
ob_pread(fd, read_buf, write_len, write_len);
ASSERT_TRUE(0 == memcmp(log_ctrl->data_buf_, read_buf, write_len));
ob_free_align(read_buf);
}
} // end namespace unittest
} // end namespace oceanbase
} // end namespace unittest
} // end namespace oceanbase
int main(int argc, char** argv)
{
system("rm -f test_clog_writer.log*");
OB_LOGGER.set_file_name("test_clog_writer.log", true);
OB_LOGGER.set_log_level("DEBUG");
......
......@@ -401,9 +401,9 @@ TEST_F(TestObRawLogIterator, test_read_data)
int64_t header_len = meta.get_serialize_size() + header_offset;
ObLogDir log_dir;
ObTailCursor* tail = new ObTailCursor();
ObTailCursor tail;
EXPECT_EQ(OB_SUCCESS, log_dir.init(path));
EXPECT_EQ(OB_SUCCESS, reader.init(path, shm_path, true, &cache, tail, ObLogWritePoolType::CLOG_WRITE_POOL));
EXPECT_EQ(OB_SUCCESS, reader.init(path, shm_path, true, &cache, &tail, ObLogWritePoolType::CLOG_WRITE_POOL));
EXPECT_EQ(OB_SUCCESS, write_file());
// Test init failed
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册