提交 25d78e84 编写于 作者: 羽飞's avatar 羽飞

fix bugs

上级 771a8fc4
...@@ -25,7 +25,7 @@ See the Mulan PSL v2 for more details. */ ...@@ -25,7 +25,7 @@ See the Mulan PSL v2 for more details. */
#include "event/storage_event.h" #include "event/storage_event.h"
#include "event/sql_event.h" #include "event/sql_event.h"
#include "event/session_event.h" #include "event/session_event.h"
#include "sql/executor/tuple.h" #include "sql/expr/tuple.h"
#include "sql/operator/table_scan_operator.h" #include "sql/operator/table_scan_operator.h"
#include "sql/operator/predicate_operator.h" #include "sql/operator/predicate_operator.h"
#include "sql/operator/delete_operator.h" #include "sql/operator/delete_operator.h"
...@@ -210,18 +210,18 @@ void end_trx_if_need(Session *session, Trx *trx, bool all_right) ...@@ -210,18 +210,18 @@ void end_trx_if_need(Session *session, Trx *trx, bool all_right)
} }
} }
void print_tuple_header(std::ostream &os, const Operator &oper) void print_tuple_header(std::ostream &os, const ProjectOperator &oper)
{ {
const int cell_num = oper.tuple_cell_num(); const int cell_num = oper.tuple_cell_num();
TupleCellSpec cell_spec; const TupleCellSpec *cell_spec = nullptr;
for (int i = 0; i < cell_num; i++) { for (int i = 0; i < cell_num; i++) {
oper.tuple_cell_spec_at(i, cell_spec); oper.tuple_cell_spec_at(i, cell_spec);
if (i != 0) { if (i != 0) {
os << " | "; os << " | ";
} }
if (cell_spec.alias()) { if (cell_spec->alias()) {
os << cell_spec.alias(); os << cell_spec->alias();
} }
} }
......
#include "sql/expr/tuple.h"
RC FieldExpr::get_value(const Tuple &tuple, TupleCell &cell) const
{
return tuple.find_cell(field_, cell);
}
RC ValueExpr::get_value(const Tuple &tuple, TupleCell & cell) const
{
cell = tuple_cell;
}
#pragma once
#include "storage/common/field.h"
#include "sql/expr/tuple_cell.h"
class Tuple;
class Expression
{
public:
Expression() = default;
virtual ~Expression() = default;
virtual RC get_value(const Tuple &tuple, TupleCell &cell) const = 0;
};
class FieldExpr : public Expression
{
public:
FieldExpr() = default;
FieldExpr(const Table *table, const FieldMeta *field) : field_(table, field)
{}
virtual ~FieldExpr() = default;
Field &field()
{
return field_;
}
const Field &field() const
{
return field_;
}
const char *table_name() const
{
return field_.table_name();
}
const char *field_name() const
{
return field_.field_name();
}
RC get_value(const Tuple &tuple, TupleCell &cell) const override;
private:
Field field_;
};
class ValueExpr : public Expression
{
public:
ValueExpr() = default;
ValueExpr(const Value &value) : tuple_cell(value.type, (char *)value.data)
{}
virtual ~ValueExpr() = default;
RC get_value(const Tuple &tuple, TupleCell & cell) const override;
private:
TupleCell tuple_cell;
};
...@@ -19,7 +19,8 @@ See the Mulan PSL v2 for more details. */ ...@@ -19,7 +19,8 @@ See the Mulan PSL v2 for more details. */
#include "common/log/log.h" #include "common/log/log.h"
#include "sql/parser/parse.h" #include "sql/parser/parse.h"
#include "storage/common/field.h" #include "sql/expr/tuple_cell.h"
#include "sql/expr/expression.h"
#include "storage/common/record.h" #include "storage/common/record.h"
class Table; class Table;
...@@ -28,48 +29,35 @@ class TupleCellSpec ...@@ -28,48 +29,35 @@ class TupleCellSpec
{ {
public: public:
TupleCellSpec() = default; TupleCellSpec() = default;
TupleCellSpec(const char *table_name, const char *field_name, const char *alias) TupleCellSpec(Expression *expr) : expression_(expr)
: table_name_(table_name), field_name_(field_name), alias_(alias)
{} {}
void set_table_name(const char *table_name) ~TupleCellSpec()
{ {
this->table_name_ = table_name; if (expression_) {
} delete expression_;
expression_ = nullptr;
void set_field_name(const char *field_name) }
{
this->field_name_ = field_name;
} }
void set_alias(const char *alias) void set_alias(const char *alias)
{ {
this->alias_ = alias; this->alias_ = alias;
} }
const char *table_name() const
{
return table_name_;
}
const char *field_name() const
{
return field_name_;
}
const char *alias() const const char *alias() const
{ {
return alias_; return alias_;
} }
bool is_same_cell(const TupleCellSpec &other) const Expression *expression() const
{ {
return 0 == strcmp(this->table_name_, other.table_name_) && return expression_;
0 == strcmp(this->field_name_, other.field_name_);
} }
private: private:
// TODO table and field cannot describe all scenerio, should be expression // TODO table and field cannot describe all scenerio, should be expression
const char *table_name_ = nullptr;
const char *field_name_ = nullptr;
const char *alias_ = nullptr; const char *alias_ = nullptr;
Expression *expression_ = nullptr;
}; };
class Tuple class Tuple
...@@ -80,78 +68,82 @@ public: ...@@ -80,78 +68,82 @@ public:
virtual int cell_num() const = 0; virtual int cell_num() const = 0;
virtual RC cell_at(int index, TupleCell &cell) const = 0; virtual RC cell_at(int index, TupleCell &cell) const = 0;
virtual RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const = 0; virtual RC find_cell(const Field &field, TupleCell &cell) const = 0;
virtual RC cell_spec_at(int index, TupleCellSpec &spec) const = 0; virtual RC cell_spec_at(int index, const TupleCellSpec *&spec) const = 0;
}; };
class RowTuple : public Tuple class RowTuple : public Tuple
{ {
public: public:
RowTuple() = default; RowTuple() = default;
virtual ~RowTuple() = default; virtual ~RowTuple()
{
for (TupleCellSpec *spec : speces_) {
delete spec;
}
speces_.clear();
}
void set_record(Record *record) void set_record(Record *record)
{ {
this->record_ = record; this->record_ = record;
} }
void set_table(Table *table) void set_schema(Table *table, const std::vector<FieldMeta> *fields)
{ {
this->table_ = table; table_ = table;
} this->speces_.reserve(fields->size());
for (const FieldMeta &field : *fields) {
void set_schema(const std::vector<FieldMeta> *fields) speces_.push_back(new TupleCellSpec(new FieldExpr(table, &field)));
{ }
this->fields_ = fields;
} }
int cell_num() const override int cell_num() const override
{ {
return fields_->size(); return speces_.size();
} }
RC cell_at(int index, TupleCell &cell) const override RC cell_at(int index, TupleCell &cell) const override
{ {
if (index < 0 || index >= fields_->size()) { if (index < 0 || index >= speces_.size()) {
LOG_WARN("invalid argument. index=%d", index); LOG_WARN("invalid argument. index=%d", index);
return RC::INVALID_ARGUMENT; return RC::INVALID_ARGUMENT;
} }
const FieldMeta &field_meta = (*fields_)[index]; const TupleCellSpec *spec = speces_[index];
cell.set_table(table_); FieldExpr *field_expr = (FieldExpr *)spec->expression();
cell.set_type(field_meta.type()); const FieldMeta *field_meta = field_expr->field().meta();
cell.set_data(this->record_->data() + field_meta.offset()); cell.set_type(field_meta->type());
cell.set_data(this->record_->data() + field_meta->offset());
return RC::SUCCESS; return RC::SUCCESS;
} }
RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const override RC find_cell(const Field &field, TupleCell &cell) const override
{ {
const char *table_name = spec.table_name(); const char *table_name = field.table_name();
if (0 != strcmp(table_name, table_->name())) { if (0 != strcmp(table_name, table_->name())) {
return RC::NOTFOUND; return RC::NOTFOUND;
} }
const char *field_name = spec.field_name(); const char *field_name = field.field_name();
for (int i = 0; i < fields_->size(); ++i) { for (int i = 0; i < speces_.size(); ++i) {
const FieldMeta &field_meta = (*fields_)[i]; const FieldExpr * field_expr = (const FieldExpr *)speces_[i]->expression();
if (0 == strcmp(field_name, field_meta.name())) { const Field &field = field_expr->field();
if (0 == strcmp(field_name, field.field_name())) {
return cell_at(i, cell); return cell_at(i, cell);
} }
} }
return RC::NOTFOUND; return RC::NOTFOUND;
} }
RC cell_spec_at(int index, TupleCellSpec &spec) const override RC cell_spec_at(int index, const TupleCellSpec *&spec) const override
{ {
if (index < 0 || index >= fields_->size()) { if (index < 0 || index >= speces_.size()) {
LOG_WARN("invalid argument. index=%d", index); LOG_WARN("invalid argument. index=%d", index);
return RC::INVALID_ARGUMENT; return RC::INVALID_ARGUMENT;
} }
const FieldMeta &field_meta = (*fields_)[index]; spec = speces_[index];
spec.set_table_name(table_->name());
spec.set_field_name(field_meta.name());
spec.set_alias(field_meta.name());
return RC::SUCCESS; return RC::SUCCESS;
} }
...@@ -167,7 +159,7 @@ public: ...@@ -167,7 +159,7 @@ public:
private: private:
Record *record_ = nullptr; Record *record_ = nullptr;
Table *table_ = nullptr; Table *table_ = nullptr;
const std::vector<FieldMeta> *fields_ = nullptr; std::vector<TupleCellSpec *> speces_;
}; };
/* /*
...@@ -186,13 +178,20 @@ class ProjectTuple : public Tuple ...@@ -186,13 +178,20 @@ class ProjectTuple : public Tuple
{ {
public: public:
ProjectTuple() = default; ProjectTuple() = default;
virtual ~ProjectTuple()
{
for (TupleCellSpec *spec : speces_) {
delete spec;
}
speces_.clear();
}
void set_tuple(Tuple *tuple) void set_tuple(Tuple *tuple)
{ {
this->tuple_ = tuple; this->tuple_ = tuple;
} }
void add_cell_spec(const TupleCellSpec &spec) void add_cell_spec(TupleCellSpec *spec)
{ {
speces_.push_back(spec); speces_.push_back(spec);
} }
...@@ -210,15 +209,15 @@ public: ...@@ -210,15 +209,15 @@ public:
return RC::GENERIC_ERROR; return RC::GENERIC_ERROR;
} }
const TupleCellSpec &spec = speces_[index]; // TODO better: add mapping between projection and raw tuple const TupleCellSpec *spec = speces_[index];
return tuple_->find_cell(spec, cell); return spec->expression()->get_value(*tuple_, cell);
} }
RC find_cell(const TupleCellSpec &spec, TupleCell &cell) const override RC find_cell(const Field &field, TupleCell &cell) const override
{ {
return tuple_->find_cell(spec, cell); return tuple_->find_cell(field, cell);
} }
RC cell_spec_at(int index, TupleCellSpec &spec) const override RC cell_spec_at(int index, const TupleCellSpec *&spec) const override
{ {
if (index < 0 || index >= speces_.size()) { if (index < 0 || index >= speces_.size()) {
return RC::NOTFOUND; return RC::NOTFOUND;
...@@ -227,6 +226,6 @@ public: ...@@ -227,6 +226,6 @@ public:
return RC::SUCCESS; return RC::SUCCESS;
} }
private: private:
std::vector<TupleCellSpec> speces_; std::vector<TupleCellSpec *> speces_;
Tuple *tuple_ = nullptr; Tuple *tuple_ = nullptr;
}; };
#include "sql/expr/tuple_cell.h"
#include "storage/common/field.h"
#include "common/log/log.h"
#include "util/comparator.h"
void TupleCell::to_string(std::ostream &os) const
{
switch (attr_type_) {
case INTS: {
os << *(int *)data_;
} break;
case FLOATS: {
os << *(float *)data_;
} break;
case CHARS: {
for (int i = 0; i < 4; i++) { // the max length of CHARS is 4
if (data_[i] == '\0') {
break;
}
os << data_[i];
}
} break;
default: {
LOG_WARN("unsupported attr type: %d", attr_type_);
} break;
}
}
int TupleCell::compare(const TupleCell &other) const
{
if (this->attr_type_ == other.attr_type_) {
switch (this->attr_type_) {
case INTS: return compare_int(this->data_, other.data_);
case FLOATS: return compare_float(this->data_, other.data_);
case CHARS: return compare_string(this->data_, other.data_, 4);
default: {
LOG_WARN("unsupported type: %d", this->attr_type_);
}
}
} else if (this->attr_type_ == INTS && other.attr_type_ == FLOATS) {
float this_data = *(int *)data_;
return compare_float(&this_data, other.data_);
} else if (this->attr_type_ == FLOATS && other.attr_type_ == INTS) {
float other_data = *(int *)other.data_;
return compare_float(data_, &other_data);
}
LOG_WARN("not supported");
return -1; // TODO return rc?
}
#pragma once
#include <iostream>
#include "storage/common/table.h"
#include "storage/common/field_meta.h"
class TupleCell
{
public:
TupleCell() = default;
TupleCell(FieldMeta *meta, char *data)
: TupleCell(meta->type(), data)
{}
TupleCell(AttrType attr_type, char *data)
: attr_type_(attr_type), data_(data)
{}
void set_type(AttrType type) { this->attr_type_ = type; }
void set_data(char *data) { this->data_ = data; }
void set_data(const char *data) { this->set_data(const_cast<char *>(data)); }
void to_string(std::ostream &os) const;
int compare(const TupleCell &other) const;
private:
AttrType attr_type_ = UNDEFINED;
char *data_ = nullptr; // real data. no need to move to field_meta.offset
};
...@@ -35,15 +35,8 @@ public: ...@@ -35,15 +35,8 @@ public:
Tuple * current_tuple() override { Tuple * current_tuple() override {
return nullptr; return nullptr;
} }
int tuple_cell_num() const override //int tuple_cell_num() const override
{ //RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override
return 0;
}
RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override
{
return RC::NOTFOUND;
}
private: private:
DeleteStmt *delete_stmt_ = nullptr; DeleteStmt *delete_stmt_ = nullptr;
}; };
...@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details. */ ...@@ -16,7 +16,7 @@ See the Mulan PSL v2 for more details. */
#include <vector> #include <vector>
#include "rc.h" #include "rc.h"
#include "sql/executor/tuple.h" #include "sql/expr/tuple.h"
class Record; class Record;
class TupleCellSpec; class TupleCellSpec;
...@@ -34,8 +34,8 @@ public: ...@@ -34,8 +34,8 @@ public:
virtual RC close() = 0; virtual RC close() = 0;
virtual Tuple * current_tuple() = 0; virtual Tuple * current_tuple() = 0;
virtual int tuple_cell_num() const = 0; //virtual int tuple_cell_num() const = 0;
virtual RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const = 0; //virtual RC tuple_cell_spec_at(int index, TupleCellSpec *&spec) const = 0;
void add_child(Operator *oper) { void add_child(Operator *oper) {
children_.push_back(oper); children_.push_back(oper);
......
...@@ -59,31 +59,20 @@ Tuple * PredicateOperator::current_tuple() ...@@ -59,31 +59,20 @@ Tuple * PredicateOperator::current_tuple()
return children_[0]->current_tuple(); return children_[0]->current_tuple();
} }
void get_cell(const RowTuple &tuple, const FilterItem &filter_item, TupleCell &cell)
{
if (filter_item.is_attr()) {
cell.set_data(tuple.record().data() + filter_item.field().field()->offset());
cell.set_type(filter_item.field().field()->type());
} else {
cell.set_data((char *)filter_item.value().data);
cell.set_type(filter_item.value().type);
}
}
bool PredicateOperator::do_predicate(RowTuple &tuple) bool PredicateOperator::do_predicate(RowTuple &tuple)
{ {
if (filter_stmt_ == nullptr || filter_stmt_->filter_units().empty()) { if (filter_stmt_ == nullptr || filter_stmt_->filter_units().empty()) {
return true; return true;
} }
for (const FilterUnit &filter_unit : filter_stmt_->filter_units()) { for (const FilterUnit *filter_unit : filter_stmt_->filter_units()) {
const FilterItem & left = filter_unit.left(); Expression *left_expr = filter_unit->left();
const FilterItem & right = filter_unit.right(); Expression *right_expr = filter_unit->right();
CompOp comp = filter_unit.comp(); CompOp comp = filter_unit->comp();
TupleCell left_cell; TupleCell left_cell;
TupleCell right_cell; TupleCell right_cell;
get_cell(tuple, left, left_cell); left_expr->get_value(tuple, left_cell);
get_cell(tuple, right, right_cell); right_expr->get_value(tuple, right_cell);
const int compare = left_cell.compare(right_cell); const int compare = left_cell.compare(right_cell);
bool filter_result = false; bool filter_result = false;
...@@ -117,11 +106,11 @@ bool PredicateOperator::do_predicate(RowTuple &tuple) ...@@ -117,11 +106,11 @@ bool PredicateOperator::do_predicate(RowTuple &tuple)
return true; return true;
} }
int PredicateOperator::tuple_cell_num() const // int PredicateOperator::tuple_cell_num() const
{ // {
return children_[0]->tuple_cell_num(); // return children_[0]->tuple_cell_num();
} // }
RC PredicateOperator::tuple_cell_spec_at(int index, TupleCellSpec &spec) const // RC PredicateOperator::tuple_cell_spec_at(int index, TupleCellSpec &spec) const
{ // {
return children_[0]->tuple_cell_spec_at(index, spec); // return children_[0]->tuple_cell_spec_at(index, spec);
} // }
...@@ -17,6 +17,11 @@ See the Mulan PSL v2 for more details. */ ...@@ -17,6 +17,11 @@ See the Mulan PSL v2 for more details. */
#include "sql/operator/operator.h" #include "sql/operator/operator.h"
class FilterStmt; class FilterStmt;
/**
* PredicateOperator 用于单个表中的记录过滤
* 如果是多个表数据过滤,比如join条件的过滤,需要设计新的predicate或者扩展:w
*/
class PredicateOperator : public Operator class PredicateOperator : public Operator
{ {
public: public:
...@@ -31,8 +36,8 @@ public: ...@@ -31,8 +36,8 @@ public:
RC close() override; RC close() override;
Tuple * current_tuple() override; Tuple * current_tuple() override;
int tuple_cell_num() const override; //int tuple_cell_num() const override;
RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override; //RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override;
private: private:
bool do_predicate(RowTuple &tuple); bool do_predicate(RowTuple &tuple);
private: private:
......
...@@ -52,11 +52,14 @@ Tuple *ProjectOperator::current_tuple() ...@@ -52,11 +52,14 @@ Tuple *ProjectOperator::current_tuple()
void ProjectOperator::add_projection(const Table *table, const FieldMeta *field_meta) void ProjectOperator::add_projection(const Table *table, const FieldMeta *field_meta)
{ {
TupleCellSpec spec(table->name(), field_meta->name(), field_meta->name()); // 对单表来说,展示的(alias) 字段总是字段名称,
// 对多表查询来说,展示的alias 需要带表名字
TupleCellSpec *spec = new TupleCellSpec(new FieldExpr(table, field_meta));
spec->set_alias(field_meta->name());
tuple_.add_cell_spec(spec); tuple_.add_cell_spec(spec);
} }
RC ProjectOperator::tuple_cell_spec_at(int index, TupleCellSpec &spec) const RC ProjectOperator::tuple_cell_spec_at(int index, const TupleCellSpec *&spec) const
{ {
return tuple_.cell_spec_at(index, spec); return tuple_.cell_spec_at(index, spec);
} }
...@@ -31,12 +31,12 @@ public: ...@@ -31,12 +31,12 @@ public:
RC next() override; RC next() override;
RC close() override; RC close() override;
int tuple_cell_num() const override int tuple_cell_num() const
{ {
return tuple_.cell_num(); return tuple_.cell_num();
} }
RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override; RC tuple_cell_spec_at(int index, const TupleCellSpec *&spec) const;
Tuple * current_tuple() override; Tuple * current_tuple() override;
private: private:
......
...@@ -20,8 +20,7 @@ RC TableScanOperator::open() ...@@ -20,8 +20,7 @@ RC TableScanOperator::open()
{ {
RC rc = table_->get_record_scanner(record_scanner_); RC rc = table_->get_record_scanner(record_scanner_);
if (rc == RC::SUCCESS) { if (rc == RC::SUCCESS) {
tuple_.set_table(table_); tuple_.set_schema(table_, table_->table_meta().field_metas());
tuple_.set_schema(table_->table_meta().field_metas());
} }
return rc; return rc;
} }
...@@ -46,7 +45,7 @@ Tuple * TableScanOperator::current_tuple() ...@@ -46,7 +45,7 @@ Tuple * TableScanOperator::current_tuple()
tuple_.set_record(&current_record_); tuple_.set_record(&current_record_);
return &tuple_; return &tuple_;
} }
RC TableScanOperator::tuple_cell_spec_at(int index, TupleCellSpec &spec) const // RC TableScanOperator::tuple_cell_spec_at(int index, TupleCellSpec &spec) const
{ // {
return tuple_.cell_spec_at(index, spec); // return tuple_.cell_spec_at(index, spec);
} // }
...@@ -35,12 +35,12 @@ public: ...@@ -35,12 +35,12 @@ public:
Tuple * current_tuple() override; Tuple * current_tuple() override;
int tuple_cell_num() const override // int tuple_cell_num() const override
{ // {
return tuple_.cell_num(); // return tuple_.cell_num();
} // }
RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override; // RC tuple_cell_spec_at(int index, TupleCellSpec &spec) const override;
private: private:
Table *table_ = nullptr; Table *table_ = nullptr;
RecordFileScanner record_scanner_; RecordFileScanner record_scanner_;
......
...@@ -46,8 +46,12 @@ RC DeleteStmt::create(Db *db, const Deletes &delete_sql, Stmt *&stmt) ...@@ -46,8 +46,12 @@ RC DeleteStmt::create(Db *db, const Deletes &delete_sql, Stmt *&stmt)
return RC::SCHEMA_TABLE_NOT_EXIST; return RC::SCHEMA_TABLE_NOT_EXIST;
} }
std::unordered_map<std::string_view, Table *> table_map;
table_map.insert(std::pair<std::string_view, Table *>(std::string_view(table_name), table));
FilterStmt *filter_stmt = nullptr; FilterStmt *filter_stmt = nullptr;
RC rc = FilterStmt::create(db, table, delete_sql.conditions, delete_sql.condition_num, filter_stmt); RC rc = FilterStmt::create(db, table, &table_map,
delete_sql.conditions, delete_sql.condition_num, filter_stmt);
if (rc != RC::SUCCESS) { if (rc != RC::SUCCESS) {
LOG_WARN("failed to create filter statement. rc=%d:%s", rc, strrc(rc)); LOG_WARN("failed to create filter statement. rc=%d:%s", rc, strrc(rc));
return rc; return rc;
......
...@@ -19,7 +19,15 @@ See the Mulan PSL v2 for more details. */ ...@@ -19,7 +19,15 @@ See the Mulan PSL v2 for more details. */
#include "storage/common/db.h" #include "storage/common/db.h"
#include "storage/common/table.h" #include "storage/common/table.h"
RC FilterStmt::create(Db *db, Table *default_table, FilterStmt::~FilterStmt()
{
for (FilterUnit *unit : filter_units_) {
delete unit;
}
filter_units_.clear();
}
RC FilterStmt::create(Db *db, Table *default_table, std::unordered_map<std::string_view, Table *> *tables,
const Condition *conditions, int condition_num, const Condition *conditions, int condition_num,
FilterStmt *&stmt) FilterStmt *&stmt)
{ {
...@@ -28,8 +36,8 @@ RC FilterStmt::create(Db *db, Table *default_table, ...@@ -28,8 +36,8 @@ RC FilterStmt::create(Db *db, Table *default_table,
FilterStmt *tmp_stmt = new FilterStmt(); FilterStmt *tmp_stmt = new FilterStmt();
for (int i = 0; i < condition_num; i++) { for (int i = 0; i < condition_num; i++) {
FilterUnit filter_unit; FilterUnit *filter_unit = nullptr;
rc = create_filter_unit(db, default_table, conditions[i], filter_unit); rc = create_filter_unit(db, default_table, tables, conditions[i], filter_unit);
if (rc != RC::SUCCESS) { if (rc != RC::SUCCESS) {
delete tmp_stmt; delete tmp_stmt;
LOG_WARN("failed to create filter unit. condition index=%d", i); LOG_WARN("failed to create filter unit. condition index=%d", i);
...@@ -42,10 +50,16 @@ RC FilterStmt::create(Db *db, Table *default_table, ...@@ -42,10 +50,16 @@ RC FilterStmt::create(Db *db, Table *default_table,
return rc; return rc;
} }
RC get_table_and_field(Db *db, Table *default_table, const RelAttr &attr, Table *&table, const FieldMeta *&field) RC get_table_and_field(Db *db, Table *default_table, std::unordered_map<std::string_view, Table *> *tables,
const RelAttr &attr, Table *&table, const FieldMeta *&field)
{ {
if (common::is_blank(attr.relation_name)) { if (common::is_blank(attr.relation_name)) {
table = default_table; table = default_table;
} else if (nullptr != tables) {
auto iter = tables->find(std::string_view(attr.relation_name));
if (iter != tables->end()) {
table = iter->second;
}
} else { } else {
table = db->find_table(attr.relation_name); table = db->find_table(attr.relation_name);
} }
...@@ -64,8 +78,8 @@ RC get_table_and_field(Db *db, Table *default_table, const RelAttr &attr, Table ...@@ -64,8 +78,8 @@ RC get_table_and_field(Db *db, Table *default_table, const RelAttr &attr, Table
return RC::SUCCESS; return RC::SUCCESS;
} }
RC FilterStmt::create_filter_unit(Db *db, Table *default_table, RC FilterStmt::create_filter_unit(Db *db, Table *default_table, std::unordered_map<std::string_view, Table *> *tables,
const Condition &condition, FilterUnit &filter_unit) const Condition &condition, FilterUnit *&filter_unit)
{ {
RC rc = RC::SUCCESS; RC rc = RC::SUCCESS;
...@@ -75,38 +89,40 @@ RC FilterStmt::create_filter_unit(Db *db, Table *default_table, ...@@ -75,38 +89,40 @@ RC FilterStmt::create_filter_unit(Db *db, Table *default_table,
return RC::INVALID_ARGUMENT; return RC::INVALID_ARGUMENT;
} }
filter_unit.set_comp(comp); Expression *left = nullptr;
FilterItem &left_item = filter_unit.left(); Expression *right = nullptr;
FilterItem &right_item = filter_unit.right();
if (condition.left_is_attr) { if (condition.left_is_attr) {
Table *table = nullptr; Table *table = nullptr;
const FieldMeta *field = nullptr; const FieldMeta *field = nullptr;
rc = get_table_and_field(db, default_table, condition.left_attr, table, field); rc = get_table_and_field(db, default_table, tables, condition.left_attr, table, field);
if (rc != RC::SUCCESS) { if (rc != RC::SUCCESS) {
LOG_WARN("cannot find attr"); LOG_WARN("cannot find attr");
return rc; return rc;
} }
left_item.set_field(table, field); left = new FieldExpr(table, field);
} else { } else {
left_item.set_value(condition.left_value); left = new ValueExpr(condition.left_value);
} }
if (condition.right_is_attr) { if (condition.right_is_attr) {
Table *table = nullptr; Table *table = nullptr;
const FieldMeta *field = nullptr; const FieldMeta *field = nullptr;
rc = get_table_and_field(db, default_table, condition.right_attr, table, field); rc = get_table_and_field(db, default_table, tables, condition.right_attr, table, field);
if (rc != RC::SUCCESS) { if (rc != RC::SUCCESS) {
LOG_WARN("cannot find attr"); LOG_WARN("cannot find attr");
delete left;
return rc; return rc;
} }
right_item.set_field(table, field); right = new FieldExpr(table, field);
} else { } else {
right_item.set_value(condition.right_value); right = new ValueExpr(condition.right_value);
} }
filter_unit = new FilterUnit;
filter_unit->set_comp(comp);
filter_unit->set_left(left);
filter_unit->set_right(right);
// 检查两个类型是否能够比较 // 检查两个类型是否能够比较
return rc; return rc;
} }
...@@ -15,79 +15,31 @@ See the Mulan PSL v2 for more details. */ ...@@ -15,79 +15,31 @@ See the Mulan PSL v2 for more details. */
#pragma once #pragma once
#include <vector> #include <vector>
#include <unordered_map>
#include "rc.h" #include "rc.h"
#include "sql/parser/parse_defs.h" #include "sql/parser/parse_defs.h"
#include "sql/stmt/stmt.h" #include "sql/stmt/stmt.h"
#include "sql/expr/expression.h"
class Db; class Db;
class Table; class Table;
class FieldMeta; class FieldMeta;
class FilterField
{
public:
FilterField() = default;
FilterField(Table *table, FieldMeta *field) : table_(table), field_(field)
{}
Table *table() const {
return table_;
}
const FieldMeta *field() const {
return field_;
}
void set_table(Table *table) {
table_ = table;
}
void set_field(const FieldMeta *field) {
field_ = field;
}
private:
Table *table_ = nullptr;
const FieldMeta *field_ = nullptr;
};
class FilterItem
{
public:
FilterItem() = default;
void set_field(Table *table, const FieldMeta *field) {
is_attr_ = true;
field_.set_table(table);
field_.set_field(field);
}
void set_value(const Value &value) {
is_attr_ = false;
value_ = value;
}
bool is_attr() const {
return is_attr_;
}
const FilterField &field() const {
return field_;
}
const Value &value() const {
return value_;
}
private:
bool is_attr_ = false; // is an attribute or a value, or maybe an expression in future
FilterField field_;
Value value_;
};
class FilterUnit class FilterUnit
{ {
public: public:
FilterUnit() = default; FilterUnit() = default;
~FilterUnit()
{
if (left_) {
delete left_;
left_ = nullptr;
}
if (right_) {
delete right_;
right_ = nullptr;
}
}
void set_comp(CompOp comp) { void set_comp(CompOp comp) {
comp_ = comp; comp_ = comp;
...@@ -96,22 +48,28 @@ public: ...@@ -96,22 +48,28 @@ public:
CompOp comp() const { CompOp comp() const {
return comp_; return comp_;
} }
FilterItem &left() {
return left_; void set_left(Expression *expr)
{
left_ = expr;
} }
FilterItem &right() { void set_right(Expression *expr)
return right_; {
right_ = expr;
} }
const FilterItem &left() const { Expression *left() const
{
return left_; return left_;
} }
const FilterItem &right() const { Expression *right() const
{
return right_; return right_;
} }
private: private:
CompOp comp_ = NO_OP; CompOp comp_ = NO_OP;
FilterItem left_; Expression *left_ = nullptr;
FilterItem right_; Expression *right_ = nullptr;
}; };
class FilterStmt class FilterStmt
...@@ -119,20 +77,22 @@ class FilterStmt ...@@ -119,20 +77,22 @@ class FilterStmt
public: public:
FilterStmt() = default; FilterStmt() = default;
virtual ~FilterStmt();
public: public:
const std::vector<FilterUnit> &filter_units() const const std::vector<FilterUnit *> &filter_units() const
{ {
return filter_units_; return filter_units_;
} }
public: public:
static RC create(Db *db, Table *default_table, static RC create(Db *db, Table *default_table, std::unordered_map<std::string_view, Table *> *tables,
const Condition *conditions, int condition_num, const Condition *conditions, int condition_num,
FilterStmt *&stmt); FilterStmt *&stmt);
static RC create_filter_unit(Db *db, Table *default_table, const Condition &condition, FilterUnit &filter_unit); static RC create_filter_unit(Db *db, Table *default_table, std::unordered_map<std::string_view, Table *> *tables,
const Condition &condition, FilterUnit *&filter_unit);
private: private:
std::vector<FilterUnit> filter_units_; // 默认当前都是AND关系 std::vector<FilterUnit *> filter_units_; // 默认当前都是AND关系
}; };
...@@ -43,8 +43,9 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt) ...@@ -43,8 +43,9 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt)
return RC::INVALID_ARGUMENT; return RC::INVALID_ARGUMENT;
} }
// collect tables in `from` statement
std::vector<Table *> tables; std::vector<Table *> tables;
std::map<std::string, Table *> table_map; std::unordered_map<std::string_view, Table *> table_map;
for (int i = 0; i < select_sql.relation_num; i++) { for (int i = 0; i < select_sql.relation_num; i++) {
const char *table_name = select_sql.relations[i]; const char *table_name = select_sql.relations[i];
if (nullptr == table_name) { if (nullptr == table_name) {
...@@ -62,8 +63,9 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt) ...@@ -62,8 +63,9 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt)
table_map.insert(std::pair<std::string, Table*>(table_name, table)); table_map.insert(std::pair<std::string, Table*>(table_name, table));
} }
// collect query fields in `select` statement
std::vector<FieldDesc> query_fields; std::vector<FieldDesc> query_fields;
for (int i = 0; i < select_sql.attr_num; i++) { for (int i = select_sql.attr_num - 1; i >= 0; i--) {
const RelAttr &relation_attr = select_sql.attributes[i]; const RelAttr &relation_attr = select_sql.attributes[i];
if (common::is_blank(relation_attr.relation_name) && 0 == strcmp(relation_attr.attribute_name, "*")) { if (common::is_blank(relation_attr.relation_name) && 0 == strcmp(relation_attr.attribute_name, "*")) {
...@@ -127,34 +129,15 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt) ...@@ -127,34 +129,15 @@ RC SelectStmt::create(Db *db, const Selects &select_sql, Stmt *&stmt)
default_table = tables[0]; default_table = tables[0];
} }
// create filter statement in `where` statement
FilterStmt *filter_stmt = nullptr; FilterStmt *filter_stmt = nullptr;
RC rc = FilterStmt::create(db, default_table, select_sql.conditions, select_sql.condition_num, filter_stmt); RC rc = FilterStmt::create(db, default_table, &table_map,
select_sql.conditions, select_sql.condition_num, filter_stmt);
if (rc != RC::SUCCESS) { if (rc != RC::SUCCESS) {
LOG_WARN("cannot construct filter stmt"); LOG_WARN("cannot construct filter stmt");
return rc; return rc;
} }
// make sure all tables in predicate are exists in from stmt
const std::vector<FilterUnit> &filter_units = filter_stmt->filter_units();
for (const FilterUnit &filter_unit : filter_units) {
const FilterItem &left = filter_unit.left();
const FilterItem &right = filter_unit.right();
if (left.is_attr()) {
Table *table = left.field().table();
if (table_map.find(table->name()) == table_map.end()) {
LOG_WARN("the table in predicate is not in from stmt: %s", table->name());
return RC::SCHEMA_TABLE_NOT_EXIST;
}
}
if (right.is_attr()) {
Table *table = right.field().table();
if (table_map.find(table->name()) == table_map.end()) {
LOG_WARN("the table in predicate is not in from stmt: %s", table->name());
return RC::SCHEMA_TABLE_NOT_EXIST;
}
}
}
// everything alright // everything alright
SelectStmt *select_stmt = new SelectStmt(); SelectStmt *select_stmt = new SelectStmt();
select_stmt->tables_.swap(tables); select_stmt->tables_.swap(tables);
......
#include "storage/common/field.h"
#include "common/log/log.h"
#include "util/comparator.h"
void TupleCell::to_string(std::ostream &os) const
{
switch (attr_type_) {
case INTS: {
os << *(int *)data_;
} break;
case FLOATS: {
os << *(float *)data_;
} break;
case CHARS: {
for (int i = 0; i < 4; i++) { // the max length of CHARS is 4
if (data_[i] == '\0') {
break;
}
os << data_[i];
}
} break;
default: {
LOG_WARN("unsupported attr type: %d", attr_type_);
} break;
}
}
int TupleCell::compare(const TupleCell &other) const
{
if (this->attr_type_ == other.attr_type_) {
switch (this->attr_type_) {
case INTS: return compare_int(this->data_, other.data_);
case FLOATS: return compare_float(this->data_, other.data_);
case CHARS: return compare_string(this->data_, other.data_, 4);
default: {
LOG_WARN("unsupported type: %d", this->attr_type_);
}
}
} else if (this->attr_type_ == INTS && other.attr_type_ == FLOATS) {
float this_data = *(int *)data_;
return compare_float(&this_data, other.data_);
} else if (this->attr_type_ == FLOATS && other.attr_type_ == INTS) {
float other_data = *(int *)other.data_;
return compare_float(data_, &other_data);
}
LOG_WARN("not supported");
return -1; // TODO return rc?
}
#pragma once #pragma once
#include <iostream>
#include "storage/common/table.h" #include "storage/common/table.h"
#include "storage/common/field_meta.h" #include "storage/common/field_meta.h"
class TupleCell // TODO rename to Cell class Field
{ {
public: public:
TupleCell() = default; Field() = default;
Field(const Table *table, const FieldMeta *field) : table_(table), field_(field)
TupleCell(FieldMeta *meta, char *data) : TupleCell(nullptr, meta, data)
{}
TupleCell(Table *table, FieldMeta *meta, char *data)
: table_(table), attr_type_(meta->type()), data_(data)
{} {}
void set_table(Table *table) { this->table_ = table; } const Table *table() const { return table_; }
void set_type(AttrType type) { this->attr_type_ = type; } const FieldMeta *meta() const { return field_; }
void set_data(char *data) { this->data_ = data; }
void set_data(const char *data) { this->set_data(const_cast<char *>(data)); }
void to_string(std::ostream &os) const; AttrType attr_type() const
{
return field_->type();
}
int compare(const TupleCell &other) const; const char *table_name() const { return table_->name(); }
const char *field_name() const { return field_->name(); }
void set_table(const Table *table)
{
this->table_ = table;
}
void set_field(const FieldMeta *field)
{
this->field_ = field;
}
private: private:
Table *table_ = nullptr; const Table *table_ = nullptr;
AttrType attr_type_ = UNDEFINED; const FieldMeta *field_ = nullptr;
char *data_ = nullptr; // real data. no need to move to field_meta.offset
}; };
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册