未验证 提交 a2e86378 编写于 作者: H hiddenbomb 提交者: GitHub

adapt to 4k align size for slog and clog (#94)

上级 a474a9cf
......@@ -255,16 +255,20 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset)
ObLogFileTrailer trailer;
int64_t pos = 0;
const file_id_t phy_file_id = file_id_ + 1;
char* buf = shm_data_buf_;
// build trailer from last 512 byte offset (4096-512)
int64_t trailer_pos = CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE;
char *buf = shm_data_buf_ + trailer_pos;
reset_buf();
if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret));
CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset),
LITERAL_K(CLOG_TRAILER_OFFSET));
} else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, phy_file_id, pos))) {
CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), K(info_block_offset), K_(file_id), K(phy_file_id));
CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), LITERAL_K(CLOG_DIO_ALIGN_SIZE),
K(info_block_offset), K_(file_id), K(phy_file_id));
} else {
buf_write_pos_ += (uint32_t)CLOG_TRAILER_SIZE;
buf_write_pos_ += (uint32_t)CLOG_DIO_ALIGN_SIZE;
}
return ret;
......@@ -275,12 +279,13 @@ int ObCLogBaseFileWriter::flush_trailer_entry()
int ret = OB_SUCCESS;
if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), "CLOG_TRAILER_OFFSET", CLOG_TRAILER_OFFSET);
} else if (CLOG_TRAILER_SIZE != buf_write_pos_) {
CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET));
} else if (CLOG_DIO_ALIGN_SIZE != buf_write_pos_) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), "CLOG_TRAILER_SIZE", CLOG_TRAILER_SIZE);
} else if (OB_FAIL(store_->write(shm_data_buf_, buf_write_pos_, file_offset_))) {
CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset), K(errno));
CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), LITERAL_K(CLOG_DIO_ALIGN_SIZE));
} else if (OB_FAIL(store_->write(shm_data_buf_, buf_write_pos_, CLOG_TRAILER_ALIGN_WRITE_OFFSET))) {
CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset),
LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), K(errno));
}
return ret;
}
......@@ -381,16 +386,18 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size)
return ret;
}
int ObCLogBaseFileWriter::cache_buf(ObLogCache* log_cache)
int ObCLogBaseFileWriter::cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len)
{
int ret = OB_SUCCESS;
char* buf = shm_data_buf_;
if (buf_write_pos_ > 0) {
if (OB_ISNULL(buf) || 0 == buf_len) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid args", K(ret), KP(buf), K(buf_len));
} else {
const common::ObAddr addr = GCTX.self_addr_;
if (OB_FAIL(log_cache->append_data(addr, buf, file_id_, file_offset_, buf_write_pos_))) {
CLOG_LOG(WARN, "fail to cache buf, ", K(ret), K_(file_id), K_(file_offset), K_(buf_write_pos));
if (OB_FAIL(log_cache->append_data(addr, buf, file_id_, file_offset_, buf_len))) {
CLOG_LOG(WARN, "fail to cache buf, ", K(ret), K_(file_id), K_(file_offset), K(buf_len));
} else {
file_offset_ += buf_write_pos_;
file_offset_ += buf_len;
}
}
return ret;
......@@ -659,7 +666,7 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob
CLOG_LOG(WARN, "fail to add info block", K(ret), K(info_getter));
} else if (OB_FAIL(flush_buf())) {
CLOG_LOG(WARN, "fail to flush info block", K(ret));
} else if (OB_FAIL(cache_buf(log_cache))) {
} else if (OB_FAIL(cache_buf(log_cache, shm_data_buf_, buf_write_pos_))) {
CLOG_LOG(WARN, "fail to cache info block", K(ret));
}
}
......@@ -673,16 +680,17 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob
// - Flush trailer entry to log file
// - Cache trailer entry to log cache
char *trailer_buf = shm_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE;
if (OB_SUCC(ret)) {
if (OB_FAIL(append_trailer_entry(info_block_offset))) {
CLOG_LOG(WARN, "fail to add trailer", K(ret));
} else if (OB_FAIL(flush_trailer_entry())) {
CLOG_LOG(WARN, "fail to flush trailer", K(ret));
} else if (OB_FAIL(cache_buf(log_cache))) {
CLOG_LOG(WARN, "fail to cache trailer", K(ret));
} else if (OB_FAIL(cache_buf(log_cache, trailer_buf, CLOG_TRAILER_SIZE))) {
CLOG_LOG(WARN, "fail to cache trailer", K(ret), KP(trailer_buf), LITERAL_K(CLOG_TRAILER_SIZE));
} else if (CLOG_FILE_SIZE != file_offset_) { // Defense code
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(WARN, "file_offset_ mismatch file size", K(ret));
CLOG_LOG(WARN, "file_offset_ mismatch file size", K(ret), K_(file_offset));
} else {
tail->advance(file_id_ + 1, 0);
reset_buf();
......@@ -713,7 +721,7 @@ int ObCLogLocalFileWriter::cache_last_padding_entry(ObLogCache* log_cache)
padding_size = ObPaddingEntry::get_padding_size(file_offset_, align_size_);
if (OB_FAIL(append_padding_entry(padding_size))) {
CLOG_LOG(WARN, "inner add padding entry error", K(ret), K(padding_size));
} else if (OB_FAIL(cache_buf(log_cache))) {
} else if (OB_FAIL(cache_buf(log_cache, shm_data_buf_, buf_write_pos_))) {
CLOG_LOG(WARN, "fail to cache last padding", K(ret));
}
}
......
......@@ -96,7 +96,7 @@ class ObCLogBaseFileWriter {
int append_trailer_entry(const uint32_t info_block_offset);
int flush_trailer_entry();
// append all data in buffer to log cache
int cache_buf(ObLogCache* log_cache);
int cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len);
OB_INLINE bool need_align() const
{
......
......@@ -29,19 +29,22 @@
namespace oceanbase {
namespace clog {
#define CLOG_DIO_ALIGN_SIZE 4096
#define TMP_SUFFIX ".tmp"
typedef uint32_t file_id_t;
typedef int32_t offset_t;
const int64_t CLOG_RPC_TIMEOUT = 3000 * 1000 - 100 * 1000;
const int64_t CLOG_TRAILER_SIZE = 512;
const int64_t CLOG_TRAILER_OFFSET = CLOG_FILE_SIZE - CLOG_TRAILER_SIZE; // 512B for the trailer block
const int64_t CLOG_TRAILER_ALIGN_WRITE_OFFSET = CLOG_FILE_SIZE -
CLOG_DIO_ALIGN_SIZE; // 4k aligned write
const int64_t CLOG_MAX_DATA_OFFSET = CLOG_TRAILER_OFFSET - common::OB_MAX_LOG_BUFFER_SIZE;
const int64_t CLOG_CACHE_SIZE = 64 * 1024;
const int64_t CLOG_REPLAY_CHECKSUM_WINDOW_SIZE = 1 << 9;
const int64_t CLOG_INFO_BLOCK_SIZE_LIMIT = 1 << 22;
const offset_t OB_INVALID_OFFSET = -1;
#define CLOG_DIO_ALIGN_SIZE 4096
#define TMP_SUFFIX ".tmp"
inline bool is_valid_log_id(const uint64_t log_id)
{
......
......@@ -1073,10 +1073,13 @@ int ObLogDirectReader::read_trailer(
ObReadRes res;
ObReadParam trailer_param;
trailer_param.file_id_ = param.file_id_;
trailer_param.offset_ = CLOG_TRAILER_OFFSET;
trailer_param.read_len_ = CLOG_TRAILER_SIZE;
trailer_param.offset_ = CLOG_TRAILER_ALIGN_WRITE_OFFSET; // 4k aligned write, but data is in last 512bytes
trailer_param.read_len_ = CLOG_DIO_ALIGN_SIZE;
trailer_param.timeout_ = param.timeout_;
const char *trailer_buf = NULL;
int64_t trailer_len = 0;
// always read trailed from disk, handling error code specially
if (OB_SUCCESS != (ret = read_data_direct_impl(trailer_param, rbuf, res, cost))) {
if (OB_READ_NOTHING == ret) {
......@@ -1084,8 +1087,15 @@ int ObLogDirectReader::read_trailer(
} else {
CLOG_LOG(WARN, "read trailer data error", K(ret), K(trailer_param));
}
} else if (OB_FAIL(trailer.deserialize(res.buf_, res.data_len_, pos))) {
CLOG_LOG(WARN, "trailer deserialize fail", K(ret), K(res), K(pos));
} else {
trailer_buf = res.buf_ + (CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE);
trailer_len = CLOG_TRAILER_SIZE;
}
if (OB_FAIL(ret)) {
CLOG_LOG(WARN, "fail to read trailer data", K(ret));
} else if (OB_FAIL(trailer.deserialize(trailer_buf, trailer_len, pos))) {
CLOG_LOG(WARN, "trailer deserialize fail", K(ret), KP(trailer_buf), K(trailer_len), K(res), K(pos));
} else if (OB_UNLIKELY(trailer.get_file_id() != trailer_param.file_id_ + 1)) {
ret = OB_INVALID_DATA;
CLOG_LOG(WARN,
......
......@@ -54,8 +54,7 @@ int ObStorageLogReader::init(const char* log_dir, const uint64_t log_file_id_sta
if (OB_SUCC(ret)) {
if (NULL == log_buffer_.get_data()) {
ObMemAttr attr(OB_SERVER_TENANT_ID, ObModIds::OB_LOG_READER);
char* buf =
static_cast<char*>(ob_malloc_align(OB_DIRECT_IO_ALIGN, ObStorageLogWriter::LOG_ITEM_MAX_LENGTH, attr));
char *buf = static_cast<char*>(ob_malloc_align(DIO_READ_ALIGN_SIZE, ObStorageLogWriter::LOG_ITEM_MAX_LENGTH, attr));
if (OB_ISNULL(buf)) {
ret = OB_ERROR;
STORAGE_REDO_LOG(WARN, "ob_malloc for log_buffer_ failed", K(ret));
......@@ -391,10 +390,18 @@ int ObStorageLogReader::get_next_cursor(common::ObLogCursor& cursor) const
int ObStorageLogReader::load_buf()
{
int ret = OB_SUCCESS;
if ((0 != log_buffer_.get_capacity() % DIO_ALIGN_SIZE) || (0 != pread_pos_ % DIO_ALIGN_SIZE) ||
(log_buffer_.get_remain_data_len() < 0) || (log_buffer_.get_remain_data_len() > pread_pos_)) { // Defense code
if ((0 != log_buffer_.get_capacity() % DIO_READ_ALIGN_SIZE)
|| (log_buffer_.get_remain_data_len() < 0)
|| (log_buffer_.get_remain_data_len() > pread_pos_)) { // Defense code
ret = OB_LOG_NOT_ALIGN;
STORAGE_REDO_LOG(WARN, "buf or read pos are not aligned", K(ret), K_(log_buffer), K_(pread_pos));
} else if (0 != pread_pos_ % DIO_READ_ALIGN_SIZE) {
// pread_pos_ should be 4k aligned because file handler returned read size is always 4k aligned,
// if pread_pos_ is not aligned, it means file reaches end and file size is not 4k aligned,
// then we have no need to load buf again
ret = OB_READ_NOTHING;
STORAGE_REDO_LOG(INFO, "pread_pos_ reaches the end of file, and file size is not 4k aligned",
K(ret), K_(pread_pos));
} else if (log_buffer_.get_remain_data_len() == log_buffer_.get_capacity()) {
// do nothing if buf hasn't been consumed
STORAGE_REDO_LOG(WARN, "buf remains same", K(ret), K_(log_buffer), K_(pread_pos));
......@@ -404,9 +411,10 @@ int ObStorageLogReader::load_buf()
// Move the next log entry to the beginning of the buffer so that need to adjust pread_pos_
// back to align the DIO read.
pread_pos_ = lower_align(pread_pos_ - remain_size, OB_DIRECT_IO_ALIGN);
pread_pos_ = lower_align(pread_pos_ - remain_size, DIO_READ_ALIGN_SIZE);
log_buffer_.get_limit() = 0;
log_buffer_.get_position() = (0 == remain_size) ? 0 : upper_align(remain_size, OB_DIRECT_IO_ALIGN) - remain_size;
log_buffer_.get_position() = (0 == remain_size) ? 0
: upper_align(remain_size, DIO_READ_ALIGN_SIZE) - remain_size;
if (OB_FAIL(file_store_->read(log_buffer_.get_data(), log_buffer_.get_capacity(), pread_pos_, read_size))) {
STORAGE_REDO_LOG(ERROR,
......
......@@ -75,9 +75,9 @@ class ObStorageLogItem : public common::ObIBaseLogItem {
class ObStorageLogWriter : public common::ObBaseLogWriter {
public:
static const int64_t LOG_FILE_ALIGN_SIZE = 1 << common::OB_DIRECT_IO_ALIGN_BITS;
static const int64_t LOG_FILE_ALIGN_SIZE = 4 * 1024; // 4KB
static const int64_t LOG_BUF_RESERVED_SIZE = 3 * LOG_FILE_ALIGN_SIZE; // NOP + switch_log
static const int64_t LOG_ITEM_MAX_LENGTH = 32 << 20; // 32MB
static const int64_t LOG_ITEM_MAX_LENGTH = 32 << 20; // 32MB
ObStorageLogWriter();
virtual ~ObStorageLogWriter();
......@@ -194,7 +194,6 @@ class ObStorageLogWriter : public common::ObBaseLogWriter {
common::ObIBaseLogItem** items, const int64_t item_cnt, int64_t& sync_idx, const int64_t cur_idx);
int aggregate_logs_to_buffer(common::ObIBaseLogItem** items, const int64_t item_cnt, const int64_t sync_idx,
const int64_t cur_idx, char*& write_buf, int64_t& write_len);
;
int advance_log_items(common::ObIBaseLogItem** items, const int64_t item_cnt, const int64_t cur_idx);
int advance_single_item(const int64_t cur_file_id, ObStorageLogItem& log_item);
......
......@@ -18,3 +18,4 @@ storage_unittest(test_bloom_filter_data)
storage_unittest(test_micro_block_index_cache)
storage_unittest(test_ref_cnt)
storage_unittest(test_macro_block_id)
storage_unittest(test_storage_log_reader_writer slog/test_storage_log_reader_writer.cpp)
......@@ -368,7 +368,7 @@ TEST_F(TestStorageLogReaderWriter, large_item_batch_write)
{
int ret = OB_SUCCESS;
const char LOG_DIR[512] = "./test_storage_log_rw";
const int64_t LOG_FILE_SIZE = 4 << 10; // 4K
const int64_t LOG_FILE_SIZE = 12 * 1024; // 12K
const int64_t CONCURRENT_TRANS_CNT = 128;
const int64_t LOG_BUFFER_SIZE = 512 * 1024; // 512K
......@@ -451,12 +451,13 @@ TEST_F(TestStorageLogReaderWriter, revise)
start_cursor.log_id_ = 1;
start_cursor.offset_ = 0;
char write_data[800];
MEMSET(write_data, 1, 800);
const int data_size = 5000;
char write_data[data_size];
MEMSET(write_data, 1, data_size);
ObBaseStorageLogBuffer log_buf;
ret = log_buf.assign(write_data, 800);
ret = log_buf.assign(write_data, data_size);
ASSERT_EQ(OB_SUCCESS, ret);
ret = log_buf.set_pos(800);
ret = log_buf.set_pos(data_size);
ASSERT_EQ(OB_SUCCESS, ret);
ObStorageLogWriter writer;
......@@ -465,7 +466,7 @@ TEST_F(TestStorageLogReaderWriter, revise)
ret = writer.start_log(start_cursor);
ASSERT_EQ(OB_SUCCESS, ret);
// write 3 logs so that valid data length is 3K
// write 3 logs so that valid data length is 4K * 3 = 12288
for (int64_t i = 0; i < 3; ++i) {
start_cursor.reset();
ret = writer.flush_log(LogCommand::OB_LOG_DUMMY_LOG, log_buf, start_cursor);
......@@ -476,7 +477,7 @@ TEST_F(TestStorageLogReaderWriter, revise)
}
// truncate the file so that last log is incomplete
ASSERT_TRUE(0 == ::truncate("./test_storage_log_rw/1", 2560));
ASSERT_TRUE(0 == ::truncate("./test_storage_log_rw/1", 20480));
// revise log
ObStorageLogReader reader;
......@@ -489,7 +490,7 @@ TEST_F(TestStorageLogReaderWriter, revise)
int64_t revise_size = 0;
ret = FileDirectoryUtils::get_file_size("./test_storage_log_rw/1", revise_size);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(2048, revise_size);
ASSERT_EQ(16384, revise_size);
}
// the last log file has switch file entry at the end
......@@ -498,7 +499,7 @@ TEST_F(TestStorageLogReaderWriter, switch_file_revise)
{
int ret = OB_SUCCESS;
const char LOG_DIR[512] = "./test_storage_log_rw";
const int64_t LOG_FILE_SIZE = 2048; // 2KB
const int64_t LOG_FILE_SIZE = 16 * 1024; // 16KB
const int64_t CONCURRENT_TRANS_CNT = 8;
const int64_t LOG_BUFFER_SIZE = 1966080L; // 1.875MB
......@@ -545,14 +546,14 @@ TEST_F(TestStorageLogReaderWriter, switch_file_revise)
int64_t revise_size = 0;
ret = FileDirectoryUtils::get_file_size("./test_storage_log_rw/1", revise_size);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(2048, revise_size);
ASSERT_EQ(3 * 4096, revise_size); // truncate last 4k
}
TEST_F(TestStorageLogReaderWriter, errsim_io_hung)
{
int ret = OB_SUCCESS;
const char LOG_DIR[512] = "./test_storage_log_rw";
const int64_t LOG_FILE_SIZE = 2048; // 2KB
const int64_t LOG_FILE_SIZE = 16 * 1024; // 16KB
const int64_t CONCURRENT_TRANS_CNT = 8;
const int64_t LOG_BUFFER_SIZE = 1966080L; // 1.875MB
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册