提交 95c306f9 编写于 作者: 羽飞's avatar 羽飞

select ok

上级 118ffa6d
......@@ -15,3 +15,4 @@ compile_commands.json
GRTAGS
GPATH
GTAGS
#*#
/* Copyright (c) 2021 Xie Meiyi(xiemeiyi@hust.edu.cn) and OceanBase and/or its affiliates. All rights reserved.
miniob is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details. */
//
// Created by Wangyunlai on 2022/6/23.
//
#pragma once
using PageNum = int32_t;
using SlotNum = int32_t;
......@@ -34,6 +34,7 @@ See the Mulan PSL v2 for more details. */
#include "sql/stmt/delete_stmt.h"
#include "sql/stmt/insert_stmt.h"
#include "storage/common/table.h"
#include "storage/common/field.h"
#include "storage/default/default_handler.h"
#include "storage/common/condition_filter.h"
#include "storage/trx/trx.h"
......@@ -124,7 +125,6 @@ void ExecuteStage::handle_request(common::StageEvent *event)
SessionEvent *session_event = sql_event->session_event();
Stmt *stmt = sql_event->stmt();
Session *session = session_event->session();
Db *current_db = session->get_current_db();
Query *sql = sql_event->query();
if (stmt != nullptr) {
......@@ -132,6 +132,9 @@ void ExecuteStage::handle_request(common::StageEvent *event)
case StmtType::SELECT: {
do_select((SelectStmt *)stmt, session_event);
} break;
case StmtType::INSERT: {
do_insert(sql_event);
} break;
case StmtType::UPDATE: {
//do_update((UpdateStmt *)stmt, session_event);
} break;
......@@ -165,7 +168,6 @@ void ExecuteStage::handle_request(common::StageEvent *event)
// exe_event->done_immediate();
//} bre
case SCF_INSERT:
case SCF_DROP_TABLE:
case SCF_DROP_INDEX:
case SCF_LOAD_DATA: {
......@@ -213,6 +215,31 @@ void end_trx_if_need(Session *session, Trx *trx, bool all_right)
}
}
void record_to_string(std::ostream &os, const Record &record)
{
Field field;
RC rc = RC::SUCCESS;
bool first_field = true;
for (int i = 0; i < record.field_amount(); i++) {
rc = record.field_at(i, field);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to fetch field of record. index=%d, rc=%s", i, strrc(rc));
break;
}
const FieldMeta *field_meta = field.meta();
if (!field_meta->visible()) {
continue;
}
if (!first_field) {
os << " | ";
} else {
first_field = false;
}
field.to_string(os);
}
}
RC ExecuteStage::do_select(SelectStmt *select_stmt, SessionEvent *session_event)
{
RC rc = RC::SUCCESS;
......@@ -230,16 +257,28 @@ RC ExecuteStage::do_select(SelectStmt *select_stmt, SessionEvent *session_event)
return rc;
}
std::stringstream ss;
Record record;
while ((rc = table_scan_operator.next()) == RC::SUCCESS) {
// get current record
// write to response
rc = table_scan_operator.current_record(record);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to get current record. rc=%s", strrc(rc));
break;
}
record_to_string(ss, record);
ss << std::endl;
}
if (rc != RC::RECORD_EOF) {
LOG_WARN("something wrong while iterate operator. rc=%s", strrc(rc));
table_scan_operator.close();
} else {
rc = table_scan_operator.close();
}
session_event->set_response(ss.str());
return rc;
}
......@@ -465,3 +504,25 @@ RC ExecuteStage::do_desc_table(SQLStageEvent *sql_event)
sql_event->session_event()->set_response(ss.str().c_str());
return RC::SUCCESS;
}
RC ExecuteStage::do_insert(SQLStageEvent *sql_event)
{
Stmt *stmt = sql_event->stmt();
SessionEvent *session_event = sql_event->session_event();
if (stmt == nullptr) {
LOG_WARN("cannot find statement");
return RC::GENERIC_ERROR;
}
InsertStmt *insert_stmt = (InsertStmt *)stmt;
Table *table = insert_stmt->table();
RC rc = table->insert_record(nullptr, insert_stmt->value_amount(), insert_stmt->values());
if (rc == RC::SUCCESS) {
session_event->set_response("SUCCESS");
} else {
session_event->set_response("FAILURE");
}
return rc;
}
......@@ -45,6 +45,7 @@ protected:
RC do_show_tables(SQLStageEvent *sql_event);
RC do_desc_table(SQLStageEvent *sql_event);
RC do_select(SelectStmt *select_stmt, SessionEvent *session_event);
RC do_insert(SQLStageEvent *sql_event);
protected:
private:
......
......@@ -18,6 +18,8 @@ See the Mulan PSL v2 for more details. */
#include "sql/parser/parse.h"
#include "rc.h"
class Record;
class Operator
{
public:
......@@ -30,5 +32,6 @@ public:
virtual RC next() = 0;
virtual RC close() = 0;
virtual RC current_record(Record &record) = 0;
private:
};
......@@ -28,13 +28,17 @@ RC TableScanOperator::next()
}
RC rc = record_scanner_.next(current_record_);
current_record_.set_fields(table_->table_meta().field_metas());
while (rc == RC::SUCCESS) {
//if (predicate_ != nullptr && predicate_->filter(current_record_)) {
// return rc;
return rc;
//}
if (record_scanner_.has_next()) {
rc = record_scanner_.next(current_record_);
current_record_.set_fields(table_->table_meta().field_metas());
} else {
rc = RC::RECORD_EOF;
}
}
return rc;
......@@ -44,3 +48,9 @@ RC TableScanOperator::close()
{
return record_scanner_.close_scan();
}
RC TableScanOperator::current_record(Record &record)
{
record = current_record_; // TODO should check status
return RC::SUCCESS;
}
......@@ -39,6 +39,7 @@ public:
RC next() override;
RC close() override;
RC current_record(Record &record) override;
private:
Table *table_ = nullptr;
RecordFileScanner record_scanner_;
......
......@@ -40,7 +40,13 @@ typedef enum {
} CompOp;
//属性值类型
typedef enum { UNDEFINED, CHARS, INTS, FLOATS } AttrType;
typedef enum
{
UNDEFINED,
CHARS,
INTS,
FLOATS
} AttrType;
//属性值
typedef struct _Value {
......
......@@ -108,6 +108,7 @@ void ResolveStage::handle_event(StageEvent *event)
RC rc = Stmt::create_stmt(db, *query, stmt);
if (rc != RC::SUCCESS && rc != RC::UNIMPLENMENT) {
LOG_WARN("failed to create stmt. rc=%d:%s", rc, strrc(rc));
session_event->set_response(strrc(rc));
return;
}
......
......@@ -48,8 +48,9 @@ RC InsertStmt::create(Db *db, const Inserts &inserts, Stmt *&stmt)
}
// check fields type
for (int i = table_meta.sys_field_num(); i < table_meta.field_num(); i++) {
const FieldMeta *field_meta = table_meta.field(i);
const int sys_field_num = table_meta.sys_field_num();
for (int i = 0; i < value_num; i++) {
const FieldMeta *field_meta = table_meta.field(i + sys_field_num);
const AttrType field_type = field_meta->type();
const AttrType value_type = values[i].type;
if (field_type != value_type) { // TODO try to convert the value type to field type
......
......@@ -91,7 +91,7 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt)
}
Table *table = iter->second;
if (field_name == "*") {
if (0 == strcmp(field_name, "*")) {
wildcard_fields(table, query_fields);
} else {
const FieldMeta *field_meta = table->table_meta().field(field_name);
......
......@@ -129,13 +129,13 @@ bool DefaultConditionFilter::filter(const Record &rec) const
char *right_value = nullptr;
if (left_.is_attr) { // value
left_value = (char *)(rec.data + left_.attr_offset);
left_value = (char *)(rec.data() + left_.attr_offset);
} else {
left_value = (char *)left_.value;
}
if (right_.is_attr) {
right_value = (char *)(rec.data + right_.attr_offset);
right_value = (char *)(rec.data() + right_.attr_offset);
} else {
right_value = (char *)right_.value;
}
......
......@@ -18,7 +18,7 @@ See the Mulan PSL v2 for more details. */
#include "rc.h"
#include "sql/parser/parse.h"
struct Record;
class Record;
class Table;
struct ConDesc {
......
#include "storage/common/field.h"
#include "common/log/log.h"
void Field::to_string(std::ostream &os) const
{
switch (meta_->type()) {
case INTS: {
os << *(int *)data_;
} break;
case FLOATS: {
os << *(float *)data_;
} break;
case CHARS: {
for (int i = 0; i < 4; i++) { // the max length of CHARS is 4
if (data_[i] == '\0') {
break;
}
os << data_[i];
}
} break;
default: {
LOG_WARN("unsupported attr type: %d", meta_->type());
} break;
}
}
#pragma once
#include <iostream>
#include "storage/common/table.h"
#include "storage/common/field_meta.h"
class Field
{
public:
Field() = default;
Field(FieldMeta *meta, char *data) : Field(nullptr, meta, data)
{}
Field(Table *table, FieldMeta *meta, char *data)
: table_(table), meta_(meta), data_(data)
{}
void set_table(Table *table) { this->table_ = table; }
void set_meta(const FieldMeta *meta) { this->meta_ = meta; }
void set_data(char *data) { this->data_ = data; }
const FieldMeta *meta() const { return meta_; }
void to_string(std::ostream &os) const;
private:
Table *table_ = nullptr;
const FieldMeta *meta_ = nullptr;
char *data_ = nullptr; // real data. no need to move to field_meta.offset
};
#include "storage/common/record.h"
#include "storage/common/field.h"
#include "common/log/log.h"
#include "rc.h"
RC Record::field_at(int index, Field &field) const
{
if (index < 0 || index >= fields_->size()) {
LOG_WARN("invalid argument. index=%d", index);
return RC::INVALID_ARGUMENT;
}
const FieldMeta &field_meta = (*fields_)[index];
field.set_meta(&field_meta);
field.set_data(this->data_ + field_meta.offset());
return RC::SUCCESS;
}
RC Record::set_field_value(const Value &value, int index)
{
// TODO
return RC::UNIMPLENMENT;
}
RC Record::set_field_values(const Value *values, int value_num, int start_index)
{
// TODO
return RC::UNIMPLENMENT;
}
......@@ -16,11 +16,70 @@ See the Mulan PSL v2 for more details. */
#include <stddef.h>
#include <vector>
#include <sstream>
#include "rc.h"
#include "defs.h"
#include "storage/common/index_meta.h"
#include "storage/common/field_meta.h"
#include "storage/common/record_manager.h"
class Field;
struct RID {
PageNum page_num; // record's page number
SlotNum slot_num; // record's slot number
// bool valid; // true means a valid record
RID() = default;
RID(const PageNum _page_num, const SlotNum _slot_num)
: page_num(_page_num), slot_num(_slot_num)
{}
const std::string to_string() const
{
std::stringstream ss;
ss << "PageNum:" << page_num << ", SlotNum:" << slot_num;
return ss.str();
}
bool operator==(const RID &other) const
{
return page_num == other.page_num && slot_num == other.slot_num;
}
bool operator!=(const RID &other) const
{
return !(*this == other);
}
static int compare(const RID *rid1, const RID *rid2)
{
int page_diff = rid1->page_num - rid2->page_num;
if (page_diff != 0) {
return page_diff;
} else {
return rid1->slot_num - rid2->slot_num;
}
}
/**
* 返回一个不可能出现的最小的RID
* 虽然page num 0和slot num 0都是合法的,但是page num 0通常用于存放meta数据,所以对数据部分来说都是
* 不合法的. 这里在bplus tree中查找时会用到。
*/
static RID *min()
{
static RID rid{0, 0};
return &rid;
}
static RID *max()
{
static RID rid{std::numeric_limits<PageNum>::max(), std::numeric_limits<SlotNum>::max()};
return &rid;
}
};
class Record
{
......@@ -28,19 +87,26 @@ public:
Record() = default;
~Record() = default;
void set_data(char *data);
char *data();
const char *data() const;
void set_data(char *data) { this->data_ = data; }
char *data() { return this->data_; }
const char *data() const { return this->data_; }
void set_fields(const std::vector<FieldMeta> *fields) { this->fields_ = fields; }
const std::vector<FieldMeta> *field_metas() const { return fields_; }
RC field_at(int index, Field &field) const;
int field_amount() const { return fields_->size();}
void set_rid(const RID &rid);
RID & rid();
const RID &rid() const;
void set_rid(const RID &rid) { this->rid_ = rid; }
void set_rid(const PageNum page_num, const SlotNum slot_num) { this->rid_.page_num = page_num; this->rid_.slot_num = slot_num; }
RID & rid() { return rid_; }
const RID &rid() const { return rid_; };
RC set_field_value(const Value &value, int index);
RC set_field_values(const Value *values, int value_num, int start_index);
private:
std::vector<FieldMeta> * fields_ = nullptr;
const std::vector<FieldMeta> * fields_ = nullptr;
RID rid_;
// the data buffer
......
......@@ -67,14 +67,13 @@ bool RecordPageIterator::has_next()
RC RecordPageIterator::next(Record &record)
{
record.rid.page_num = page_num_;
record.rid.slot_num = next_slot_num_;
record.data = record_page_handler_->get_record_data(record.rid.slot_num);
record.set_rid(page_num_, next_slot_num_);
record.set_data(record_page_handler_->get_record_data(record.rid().slot_num));
if (next_slot_num_ >= 0) {
next_slot_num_ = bitmap_.next_setted_bit(next_slot_num_ + 1);
}
return record.rid.slot_num != -1 ? RC::SUCCESS : RC::RECORD_EOF;
return record.rid().slot_num != -1 ? RC::SUCCESS : RC::RECORD_EOF;
}
////////////////////////////////////////////////////////////////////////////////
......@@ -170,21 +169,21 @@ RC RecordPageHandler::insert_record(const char *data, RID *rid)
RC RecordPageHandler::update_record(const Record *rec)
{
if (rec->rid.slot_num >= page_header_->record_capacity) {
if (rec->rid().slot_num >= page_header_->record_capacity) {
LOG_ERROR("Invalid slot_num %d, exceed page's record capacity, page_num %d.",
rec->rid.slot_num, frame_->page_num());
rec->rid().slot_num, frame_->page_num());
return RC::INVALID_ARGUMENT;
}
Bitmap bitmap(bitmap_, page_header_->record_capacity);
if (!bitmap.get_bit(rec->rid.slot_num)) {
if (!bitmap.get_bit(rec->rid().slot_num)) {
LOG_ERROR("Invalid slot_num %d, slot is empty, page_num %d.",
rec->rid.slot_num, frame_->page_num());
rec->rid().slot_num, frame_->page_num());
return RC::RECORD_RECORD_NOT_EXIST;
} else {
char *record_data = get_record_data(rec->rid.slot_num);
memcpy(record_data, rec->data, page_header_->record_real_size);
bitmap.set_bit(rec->rid.slot_num);
char *record_data = get_record_data(rec->rid().slot_num);
memcpy(record_data, rec->data(), page_header_->record_real_size);
bitmap.set_bit(rec->rid().slot_num);
frame_->mark_dirty();
// LOG_TRACE("Update record. file_id=%d, page num=%d,slot=%d", file_id_, rec->rid.page_num, rec->rid.slot_num);
return RC::SUCCESS;
......@@ -234,8 +233,8 @@ RC RecordPageHandler::get_record(const RID *rid, Record *rec)
return RC::RECORD_RECORD_NOT_EXIST;
}
rec->rid = *rid;
rec->data = get_record_data(rid->slot_num);
rec->set_rid(*rid);
rec->set_data(get_record_data(rid->slot_num));
return RC::SUCCESS;
}
......@@ -328,8 +327,8 @@ RC RecordFileHandler::update_record(const Record *rec)
{
RC ret;
RecordPageHandler page_handler;
if ((ret = page_handler.init(*disk_buffer_pool_, rec->rid.page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d", rec->rid.page_num);
if ((ret = page_handler.init(*disk_buffer_pool_, rec->rid().page_num)) != RC::SUCCESS) {
LOG_ERROR("Failed to init record page handler.page number=%d", rec->rid().page_num);
return ret;
}
......@@ -412,7 +411,7 @@ RC RecordFileScanner::fetch_next_record()
return rc;
}
}
next_record_.rid.slot_num = -1;
next_record_.rid().slot_num = -1;
return RC::RECORD_EOF;
}
......@@ -430,7 +429,7 @@ RC RecordFileScanner::fetch_next_record_in_page()
}
}
next_record_.rid.slot_num = -1;
next_record_.rid().slot_num = -1;
return RC::RECORD_EOF;
}
......@@ -449,7 +448,7 @@ RC RecordFileScanner::close_scan()
bool RecordFileScanner::has_next()
{
return next_record_.rid.slot_num != -1;
return next_record_.rid().slot_num != -1;
}
RC RecordFileScanner::next(Record &record)
......
......@@ -17,10 +17,9 @@ See the Mulan PSL v2 for more details. */
#include <sstream>
#include <limits>
#include "storage/default/disk_buffer_pool.h"
#include "storage/common/record.h"
#include "common/lang/bitmap.h"
typedef int32_t SlotNum;
class ConditionFilter;
struct PageHeader {
......@@ -31,56 +30,6 @@ struct PageHeader {
int32_t first_record_offset; // 第一条记录的偏移量
};
struct RID {
PageNum page_num; // record's page number
SlotNum slot_num; // record's slot number
// bool valid; // true means a valid record
const std::string to_string() const
{
std::stringstream ss;
ss << "PageNum:" << page_num << ", SlotNum:" << slot_num;
return ss.str();
}
bool operator==(const RID &other) const
{
return page_num == other.page_num && slot_num == other.slot_num;
}
bool operator!=(const RID &other) const
{
return !(*this == other);
}
static int compare(const RID *rid1, const RID *rid2)
{
int page_diff = rid1->page_num - rid2->page_num;
if (page_diff != 0) {
return page_diff;
} else {
return rid1->slot_num - rid2->slot_num;
}
}
/**
* 返回一个不可能出现的最小的RID
* 虽然page num 0和slot num 0都是合法的,但是page num 0通常用于存放meta数据,所以对数据部分来说都是
* 不合法的. 这里在bplus tree中查找时会用到。
*/
static RID *min()
{
static RID rid{0, 0};
return &rid;
}
static RID *max()
{
static RID rid{std::numeric_limits<PageNum>::max(), std::numeric_limits<SlotNum>::max()};
return &rid;
}
};
class RidDigest {
public:
......@@ -90,12 +39,6 @@ public:
}
};
struct Record {
// bool valid; // false means the record hasn't been load
RID rid; // record's rid
char *data; // record's data
};
class RecordPageHandler;
class RecordPageIterator
{
......
......@@ -202,7 +202,7 @@ RC Table::rollback_insert(Trx *trx, const RID &rid)
}
// remove all indexes
rc = delete_entry_of_indexes(record.data, rid, false);
rc = delete_entry_of_indexes(record.data(), rid, false);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to delete indexes of record(rid=%d.%d) while rollback insert, rc=%d:%s",
rid.page_num,
......@@ -223,7 +223,7 @@ RC Table::insert_record(Trx *trx, Record *record)
if (trx != nullptr) {
trx->init_trx_info(this, *record);
}
rc = record_handler_->insert_record(record->data, table_meta_.record_size(), &record->rid);
rc = record_handler_->insert_record(record->data(), table_meta_.record_size(), &record->rid());
if (rc != RC::SUCCESS) {
LOG_ERROR("Insert record failed. table name=%s, rc=%d:%s", table_meta_.name(), rc, strrc(rc));
return rc;
......@@ -234,7 +234,7 @@ RC Table::insert_record(Trx *trx, Record *record)
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to log operation(insertion) to trx");
RC rc2 = record_handler_->delete_record(&record->rid);
RC rc2 = record_handler_->delete_record(&record->rid());
if (rc2 != RC::SUCCESS) {
LOG_ERROR("Failed to rollback record data when insert index entries failed. table name=%s, rc=%d:%s",
name(),
......@@ -245,16 +245,16 @@ RC Table::insert_record(Trx *trx, Record *record)
}
}
rc = insert_entry_of_indexes(record->data, record->rid);
rc = insert_entry_of_indexes(record->data(), record->rid());
if (rc != RC::SUCCESS) {
RC rc2 = delete_entry_of_indexes(record->data, record->rid, true);
RC rc2 = delete_entry_of_indexes(record->data(), record->rid(), true);
if (rc2 != RC::SUCCESS) {
LOG_ERROR("Failed to rollback index data when insert index entries failed. table name=%s, rc=%d:%s",
name(),
rc2,
strrc(rc2));
}
rc2 = record_handler_->delete_record(&record->rid);
rc2 = record_handler_->delete_record(&record->rid());
if (rc2 != RC::SUCCESS) {
LOG_PANIC("Failed to rollback record data when insert index entries failed. table name=%s, rc=%d:%s",
name(),
......@@ -280,7 +280,7 @@ RC Table::insert_record(Trx *trx, int value_num, const Value *values)
}
Record record;
record.data = record_data;
record.set_data(record_data);
// record.valid = true;
rc = insert_record(trx, &record);
delete[] record_data;
......@@ -377,7 +377,7 @@ public:
void consume(const Record *record)
{
record_reader_(record->data, context_);
record_reader_(record->data(), context_);
}
private:
......@@ -494,7 +494,7 @@ public:
RC insert_index(const Record *record)
{
return index_->insert_entry(record->data, &record->rid);
return index_->insert_entry(record->data(), &record->rid());
}
private:
......@@ -651,15 +651,15 @@ RC Table::delete_record(Trx *trx, Record *record)
if (trx != nullptr) {
rc = trx->delete_record(this, record);
} else {
rc = delete_entry_of_indexes(record->data, record->rid, false); // 重复代码 refer to commit_delete
rc = delete_entry_of_indexes(record->data(), record->rid(), false); // 重复代码 refer to commit_delete
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to delete indexes of record (rid=%d.%d). rc=%d:%s",
record->rid.page_num,
record->rid.slot_num,
record->rid().page_num,
record->rid().slot_num,
rc,
strrc(rc));
} else {
rc = record_handler_->delete_record(&record->rid);
rc = record_handler_->delete_record(&record->rid());
}
}
return rc;
......@@ -673,7 +673,7 @@ RC Table::commit_delete(Trx *trx, const RID &rid)
if (rc != RC::SUCCESS) {
return rc;
}
rc = delete_entry_of_indexes(record.data, record.rid, false);
rc = delete_entry_of_indexes(record.data(), record.rid(), false);
if (rc != RC::SUCCESS) {
LOG_ERROR("Failed to delete indexes of record(rid=%d.%d). rc=%d:%s",
rid.page_num,
......
......@@ -17,8 +17,8 @@ See the Mulan PSL v2 for more details. */
#include "storage/common/table_meta.h"
struct Record;
struct RID;
class Record;
class DiskBufferPool;
class RecordFileHandler;
class RecordFileScanner;
......
......@@ -42,6 +42,7 @@ public:
const FieldMeta *field(int index) const;
const FieldMeta *field(const char *name) const;
const FieldMeta *find_field_by_offset(int offset) const;
const std::vector<FieldMeta> *field_metas() const { return &fields_; }
int field_num() const; // sys field included
int sys_field_num() const;
......
......@@ -25,11 +25,10 @@ See the Mulan PSL v2 for more details. */
#include <unordered_map>
#include "rc.h"
#include "defs.h"
#include "common/mm/mem_pool.h"
#include "common/lang/bitmap.h"
typedef int32_t PageNum;
class BufferPoolManager;
class DiskBufferPool;
......
......@@ -59,7 +59,7 @@ RC Trx::insert_record(Table *table, Record *record)
{
RC rc = RC::SUCCESS;
// 先校验是否以前是否存在过(应该不会存在)
Operation *old_oper = find_operation(table, record->rid);
Operation *old_oper = find_operation(table, record->rid());
if (old_oper != nullptr) {
return RC::GENERIC_ERROR; // error code
}
......@@ -69,7 +69,7 @@ RC Trx::insert_record(Table *table, Record *record)
// 设置record中trx_field为当前的事务号
// set_record_trx_id(table, record, trx_id_, false);
// 记录到operations中
insert_operation(table, Operation::Type::INSERT, record->rid);
insert_operation(table, Operation::Type::INSERT, record->rid());
return rc;
}
......@@ -77,24 +77,24 @@ RC Trx::delete_record(Table *table, Record *record)
{
RC rc = RC::SUCCESS;
start_if_not_started();
Operation *old_oper = find_operation(table, record->rid);
Operation *old_oper = find_operation(table, record->rid());
if (old_oper != nullptr) {
if (old_oper->type() == Operation::Type::INSERT) {
delete_operation(table, record->rid);
delete_operation(table, record->rid());
return RC::SUCCESS;
} else {
return RC::GENERIC_ERROR;
}
}
set_record_trx_id(table, *record, trx_id_, true);
insert_operation(table, Operation::Type::DELETE, record->rid);
insert_operation(table, Operation::Type::DELETE, record->rid());
return rc;
}
void Trx::set_record_trx_id(Table *table, Record &record, int32_t trx_id, bool deleted) const
{
const FieldMeta *trx_field = table->table_meta().trx_field();
int32_t *ptrx_id = (int32_t *)(record.data + trx_field->offset());
int32_t *ptrx_id = (int32_t *)(record.data() + trx_field->offset());
if (deleted) {
trx_id |= DELETED_FLAG_BIT_MASK;
}
......@@ -104,7 +104,7 @@ void Trx::set_record_trx_id(Table *table, Record &record, int32_t trx_id, bool d
void Trx::get_record_trx_id(Table *table, const Record &record, int32_t &trx_id, bool &deleted)
{
const FieldMeta *trx_field = table->table_meta().trx_field();
int32_t trx = *(int32_t *)(record.data + trx_field->offset());
int32_t trx = *(int32_t *)(record.data() + trx_field->offset());
trx_id = trx & TRX_ID_BIT_MASK;
deleted = (trx & DELETED_FLAG_BIT_MASK) != 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册