See the Mulan PSL v2 for more details. */ // // Created by huhaosheng.hhs on 2022 // #include #include #include "common/log/log.h" #include "storage/clog/clog.h" #include "global_context.h" #include "storage/trx/trx.h" #include "common/io/io.h" using namespace std; using namespace common; /** * @brief 当前的日志使用固定的文件名,而且就这一个文件 */ const char *CLOG_FILE_NAME = "clog"; const char *clog_type_name(CLogType type) { #define DEFINE_CLOG_TYPE(name) case CLogType::name: return #name; switch (type) { DEFINE_CLOG_TYPE_ENUM; default: return "unknown clog type"; } #undef DEFINE_CLOG_TYPE } int32_t clog_type_to_integer(CLogType type) { return static_cast(type); } CLogType clog_type_from_integer(int32_t value) { return static_cast(value); } //////////////////////////////////////////////////////////////////////////////// string CLogRecordHeader::to_string() const { stringstream ss; ss << "lsn:" << lsn_ << ", trx_id:" << trx_id_ << ", type:" << clog_type_name(clog_type_from_integer(type_)) << "(" << type_ << ")" << ", len:" << logrec_len_; return ss.str(); } //////////////////////////////////////////////////////////////////////////////// string CLogRecordCommitData::to_string() const { stringstream ss; ss << "commit_xid:" << commit_xid_; return ss.str(); } //////////////////////////////////////////////////////////////////////////////// const int32_t CLogRecordData::HEADER_SIZE = sizeof(CLogRecordData) - sizeof(CLogRecordData::data_); CLogRecordData::~CLogRecordData() { if (data_ == nullptr) { delete[] data_; } } string CLogRecordData::to_string() const { stringstream ss; ss << "table_id:" << table_id_ << ", rid:{" << rid_.to_string() << "}" << ", len:" << data_len_ << ", offset:" << data_offset_; return ss.str(); } //////////////////////////////////////////////////////////////////////////////// int _align8(int size) { return size / 8 * 8 + ((size % 8 == 0) ? 0 : 8); } CLogRecord *CLogRecord::build_mtr_record(CLogType type, int32_t trx_id) { CLogRecord *log_record = new CLogRecord(); CLogRecordHeader &header = log_record->header_; header.trx_id_ = trx_id; header.type_ = clog_type_to_integer(type); return log_record; } CLogRecord *CLogRecord::build_commit_record(int32_t trx_id, int32_t commit_xid) { CLogRecord *log_record = new CLogRecord(); CLogRecordHeader &header = log_record->header_; header.type_ = clog_type_to_integer(CLogType::MTR_COMMIT); header.trx_id_ = trx_id; header.logrec_len_ = sizeof(CLogRecordCommitData); CLogRecordCommitData &commit_record = log_record->commit_record(); commit_record.commit_xid_ = commit_xid; return log_record; } CLogRecord *CLogRecord::build_data_record(CLogType type, int32_t trx_id, int32_t table_id, const RID &rid, int32_t data_len, int32_t data_offset, const char *data) { CLogRecord *log_record = new CLogRecord(); CLogRecordHeader &header = log_record->header_; header.trx_id_ = trx_id; header.type_ = clog_type_to_integer(type); header.logrec_len_ = CLogRecordData::HEADER_SIZE + data_len; CLogRecordData &data_record = log_record->data_record(); data_record.table_id_ = table_id; data_record.rid_ = rid; data_record.data_len_ = data_len; data_record.data_offset_ = data_offset; if (data_len > 0) { data_record.data_ = new char[data_len]; if (nullptr == data_record.data_) { delete log_record; LOG_WARN("failed to allocate memory while creating clog record. memory size=%d", data_len); return nullptr; } memcpy(data_record.data_, data, data_len); } return log_record; } CLogRecord *CLogRecord::build(const CLogRecordHeader &header, char *data) { CLogRecord *log_record = new CLogRecord(); log_record->header_ = header; if (header.logrec_len_ <= 0) { return log_record; } if (header.type_ == clog_type_to_integer(CLogType::MTR_COMMIT)) { ASSERT(header.logrec_len_ == sizeof(CLogRecordCommitData), "invalid length of mtr commit. expect %d, got %d", sizeof(CLogRecordCommitData), header.logrec_len_); CLogRecordCommitData &commit_record = log_record->commit_record(); memcpy(reinterpret_cast(&commit_record), data, sizeof(CLogRecordCommitData)); LOG_DEBUG("got an commit record %s", log_record->to_string().c_str()); } else { /// 当前日志拥有数据,但是不是COMMIT,就认为是普通的修改数据的日志,简单粗暴 CLogRecordData &data_record = log_record->data_record(); memcpy(reinterpret_cast(&data_record), data, CLogRecordData::HEADER_SIZE); if (header.logrec_len_ > CLogRecordData::HEADER_SIZE) { int data_len = header.logrec_len_ - CLogRecordData::HEADER_SIZE; data_record.data_ = new char[data_len]; memcpy(data_record.data_, data + CLogRecordData::HEADER_SIZE, data_len); } } return log_record; } CLogRecord::~CLogRecord() { } string CLogRecord::to_string() const { if (header_.logrec_len_ <= 0) { return header_.to_string(); } else if (header_.type_ == clog_type_to_integer(CLogType::MTR_COMMIT)) { return header_.to_string() + ", " + commit_record().to_string(); } else { return header_.to_string() + ", " + data_record().to_string(); } } //////////////////////////////////////////////////////////////////////////////// static const int CLOG_BUFFER_SIZE = 4 * 1024 * 1024; CLogBuffer::CLogBuffer() { } CLogBuffer::~CLogBuffer() {} RC CLogBuffer::append_log_record(CLogRecord *log_record) { if (nullptr == log_record) { return RC::INVALID_ARGUMENT; } /// total_size_ 的计算没有考虑日志头 if (total_size_ + log_record->logrec_len() >= CLOG_BUFFER_SIZE) { return RC::LOGBUF_FULL; } lock_guard lock_guard(lock_); log_records_.emplace_back(log_record); total_size_ += log_record->logrec_len(); LOG_DEBUG("append log. log_record={%s}", log_record->to_string().c_str()); return RC::SUCCESS; } RC CLogBuffer::flush_buffer(CLogFile &log_file) { RC rc = RC::SUCCESS; int count = 0; while (!log_records_.empty()) { lock_.lock(); if (log_records_.empty()) { lock_.unlock(); return RC::SUCCESS; } // log buffer 需要支持并发,所以要考虑加锁 // 从队列中取出日志记录然后写入到文件中 unique_ptr log_record = move(log_records_.front()); log_records_.pop_front(); rc = write_log_record(log_file, log_record.get()); // 当前无法处理日志写不完整的情况,所以直接粗暴退出 ASSERT(rc == RC::SUCCESS, "failed to write log record. log_record=%s, rc=%s", log_record->to_string().c_str(), strrc(rc)); lock_.unlock(); total_size_ -= log_record->logrec_len(); count++; } LOG_WARN("flush log buffer done. write log record number=%d", count); return log_file.sync(); } RC CLogBuffer::write_log_record(CLogFile &log_file, CLogRecord *log_record) { // TODO 看起来每种类型的日志自己实现 serialize 接口更好一点 const CLogRecordHeader &header = log_record->header(); RC rc = log_file.write(reinterpret_cast(&header), sizeof(header)); if (rc != RC::SUCCESS) { LOG_WARN("failed to write log record header. size=%d, rc=%s", sizeof(header), strrc(rc)); return rc; } switch (log_record->log_type()) { case CLogType::MTR_BEGIN: case CLogType::MTR_ROLLBACK: { // do nothing } break; case CLogType::MTR_COMMIT: { rc = log_file.write(reinterpret_cast(&log_record->commit_record()), log_record->header().logrec_len_); } break; default: { rc = log_file.write(reinterpret_cast(&log_record->data_record()), CLogRecordData::HEADER_SIZE); if (OB_FAIL(rc)) { LOG_WARN("failed to write data record header. size=%d, rc=%s", CLogRecordData::HEADER_SIZE, strrc(rc)); return rc; } rc = log_file.write(log_record->data_record().data_, log_record->data_record().data_len_); if (OB_FAIL(rc)) { LOG_WARN("failed to write log data. size=%d, rc=%s", log_record->data_record().data_len_, strrc(rc)); return rc; } } break; } return rc; } //////////////////////////////////////////////////////////////////////////////// RC CLogFile::init(const char *path) { RC rc = RC::SUCCESS; std::string clog_file_path = std::string(path) + common::FILE_PATH_SPLIT_STR + CLOG_FILE_NAME; int fd = ::open(clog_file_path.c_str(), O_RDWR | O_APPEND | O_CREAT, S_IRUSR | S_IWUSR); if (fd < 0) { rc = RC::IOERR_OPEN; LOG_WARN("failed to open clog file. filename=%s, error=%s", clog_file_path.c_str(), strerror(errno)); return rc; } filename_ = clog_file_path; fd_ = fd; LOG_INFO("open clog file success. file=%s, fd=%d", filename_.c_str(), fd_); return rc; } CLogFile::~CLogFile() { if (fd_ >= 0) { LOG_INFO("close clog file. file=%s, fd=%d", filename_.c_str(), fd_); ::close(fd_); fd_ = -1; } } RC CLogFile::write(const char *data, int len) { int ret = writen(fd_, data, len); if (0 != ret) { LOG_WARN("failed to write data to file. filename=%s, data len=%d, error=%s", filename_.c_str(), len, strerror(ret)); return RC::IOERR_WRITE; } return RC::SUCCESS; } RC CLogFile::read(char *data, int len) { int ret = readn(fd_, data, len); if (ret != 0) { if (ret == -1) { eof_ = true; LOG_TRACE("file read touch eof. filename=%s", filename_.c_str()); } else { LOG_WARN("failed to read data from file. file=%s, data len=%d, error=%s", filename_.c_str(), len, strerror(ret)); } return RC::IOERR_READ; } return RC::SUCCESS; } RC CLogFile::sync() { int ret = fsync(fd_); if (ret != 0) { LOG_WARN("failed to sync file. file=%s, error=%s", filename_.c_str(), strerror(errno)); return RC::IOERR_SYNC; } return RC::SUCCESS; } RC CLogFile::offset(int64_t &off) const { off_t pos = lseek(fd_, 0, SEEK_CUR); if (pos == -1) { LOG_WARN("failed to seek. error=%s", strerror(errno)); return RC::IOERR_SEEK; } off = static_cast(pos); return RC::SUCCESS; } //////////////////////////////////////////////////////////////////////////////// RC CLogRecordIterator::init(CLogFile &log_file) { log_file_ = &log_file; return RC::SUCCESS; } bool CLogRecordIterator::valid() const { return nullptr != log_record_; } RC CLogRecordIterator::next() { delete log_record_; log_record_ = nullptr; CLogRecordHeader header; RC rc = log_file_->read(reinterpret_cast(&header), sizeof(header)); if (rc != RC::SUCCESS) { if (log_file_->eof()) { return RC::RECORD_EOF; } LOG_WARN("failed to read log header. rc=%s", strrc(rc)); return rc; } char *data = nullptr; int32_t record_size = header.logrec_len_; if (record_size > 0) { data = new char[record_size]; rc = log_file_->read(data, record_size); if (OB_FAIL(rc)) { if (log_file_->eof()) { // TODO 遇到了没有写完整数据的log,应该truncate一部分数据, 但是现在不管 } LOG_WARN("failed to read log data. data size=%d, rc=%s", record_size, strrc(rc)); delete[] data; data = nullptr; return rc; } } delete log_record_; log_record_ = CLogRecord::build(header, data); delete[] data; return rc; } const CLogRecord &CLogRecordIterator::log_record() { return *log_record_; } //////////////////////////////////////////////////////////////////////////////// RC CLogManager::init(const char *path) { log_buffer_ = new CLogBuffer(); log_file_ = new CLogFile(); return log_file_->init(path); } CLogManager::~CLogManager() { if (log_buffer_) { delete log_buffer_; log_buffer_ = nullptr; } if (log_file_ != nullptr) { delete log_file_; log_file_ = nullptr; } } RC CLogManager::append_log(CLogType type, int32_t trx_id, int32_t table_id, const RID &rid, int32_t data_len, int32_t data_offset, const char *data) { CLogRecord *log_record = CLogRecord::build_data_record(type, trx_id, table_id, rid, data_len, data_offset, data); if (nullptr == log_record) { LOG_WARN("failed to create log record"); return RC::NOMEM; } return append_log(log_record); } RC CLogManager::begin_trx(int32_t trx_id) { return append_log(CLogRecord::build_mtr_record(CLogType::MTR_BEGIN, trx_id)); } RC CLogManager::commit_trx(int32_t trx_id, int32_t commit_xid) { RC rc = append_log(CLogRecord::build_commit_record(trx_id, commit_xid)); if (rc != RC::SUCCESS) { LOG_WARN("failed to append trx commit log. trx id=%d, rc=%s", trx_id, strrc(rc)); return rc; } rc = sync(); // 事务提交时需要把当前事务关联的日志,都写入到磁盘中,这样做是保证不丢数据 return rc; } RC CLogManager::rollback_trx(int32_t trx_id) { return append_log(CLogRecord::build_mtr_record(CLogType::MTR_ROLLBACK, trx_id)); } RC CLogManager::append_log(CLogRecord *log_record) { if (nullptr == log_record) { return RC::INVALID_ARGUMENT; } return log_buffer_->append_log_record(log_record); } RC CLogManager::sync() { return log_buffer_->flush_buffer(*log_file_); } RC CLogManager::recover(Db *db) { CLogRecordIterator log_record_iterator; RC rc = log_record_iterator.init(*log_file_); if (OB_FAIL(rc)) { LOG_WARN("failed to init log record iterator. rc=%s", strrc(rc)); return rc; } TrxKit *trx_manager = GCTX.trx_kit_; ASSERT(trx_manager != nullptr, "cannot do recover that trx_manager is null"); /// 遍历所有的日志,然后做redo // 在做redo时,需要记录处理的事务。在所有的日志都重做完成时,如果有事务没有结束,那这些事务就需要回滚 for (rc = log_record_iterator.next(); OB_SUCC(rc) && log_record_iterator.valid(); rc = log_record_iterator.next()) { const CLogRecord &log_record = log_record_iterator.log_record(); LOG_TRACE("begin to redo log={%s}", log_record.to_string().c_str()); switch (log_record.log_type()) { case CLogType::MTR_BEGIN: { Trx *trx = trx_manager->create_trx(log_record.trx_id()); if (trx == nullptr) { LOG_WARN("failed to create trx. log_record={%s}", log_record.to_string().c_str()); return RC::INTERNAL; } } break; case CLogType::MTR_COMMIT: case CLogType::MTR_ROLLBACK: { Trx *trx = trx_manager->find_trx(log_record.trx_id()); if (nullptr == trx) { LOG_WARN("no such trx. trx id=%d, log_record={%s}", log_record.trx_id(), log_record.to_string().c_str()); return RC::INTERNAL; } rc = trx->redo(db, log_record); if (OB_FAIL(rc)) { LOG_WARN("failed to redo log. trx id=%d, log_record={%s}, rc=%s", log_record.trx_id(), log_record.to_string().c_str(), strrc(rc)); return rc; } } break; default: { Trx *trx = GCTX.trx_kit_->find_trx(log_record.trx_id()); ASSERT(trx != nullptr, "cannot find such trx. trx id=%d, log_record={%s}", log_record.trx_id(), log_record.to_string().c_str()); rc = trx->redo(db, log_record); if (rc != RC::SUCCESS) { LOG_WARN("failed to redo log record. log_record={%s}, rc=%s", log_record.to_string().c_str(), strrc(rc)); return rc; } LOG_TRACE("redo one data record done"); } break; } } if (rc == RC::RECORD_EOF) { rc = RC::SUCCESS; } else { LOG_ERROR("failed to redo log iterator. rc=%s", strrc(rc)); return rc; } LOG_TRACE("recover redo log done"); vector uncommitted_trxes; trx_manager->all_trxes(uncommitted_trxes); LOG_INFO("find %d uncommitted trx", uncommitted_trxes.size()); for (Trx *trx : uncommitted_trxes) { trx->rollback(); trx_manager->destroy_trx(trx); } return RC::SUCCESS; }