From 69ce664c9c1ae997f8c9ef09bcab401e97949cf0 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 30 Oct 2011 11:30:52 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- .../DB/Interpreters/InterpreterInsertQuery.h | 32 +++++++ .../DB/Interpreters/InterpreterQuery.h | 38 ++++++++ dbms/include/DB/Interpreters/executeQuery.h | 21 +++++ dbms/include/DB/Interpreters/loadMetadata.h | 15 ++++ dbms/include/DB/Parsers/ParserQuery.h | 17 ++++ .../Interpreters/InterpreterInsertQuery.cpp | 87 +++++++++++++++++++ dbms/src/Interpreters/InterpreterQuery.cpp | 43 +++++++++ dbms/src/Interpreters/executeQuery.cpp | 61 +++++++++++++ dbms/src/Interpreters/loadMetadata.cpp | 75 ++++++++++++++++ dbms/src/Parsers/ParserQuery.cpp | 22 +++++ 10 files changed, 411 insertions(+) create mode 100644 dbms/include/DB/Interpreters/InterpreterInsertQuery.h create mode 100644 dbms/include/DB/Interpreters/InterpreterQuery.h create mode 100644 dbms/include/DB/Interpreters/executeQuery.h create mode 100644 dbms/include/DB/Interpreters/loadMetadata.h create mode 100644 dbms/include/DB/Parsers/ParserQuery.h create mode 100644 dbms/src/Interpreters/InterpreterInsertQuery.cpp create mode 100644 dbms/src/Interpreters/InterpreterQuery.cpp create mode 100644 dbms/src/Interpreters/executeQuery.cpp create mode 100644 dbms/src/Interpreters/loadMetadata.cpp create mode 100644 dbms/src/Parsers/ParserQuery.cpp diff --git a/dbms/include/DB/Interpreters/InterpreterInsertQuery.h b/dbms/include/DB/Interpreters/InterpreterInsertQuery.h new file mode 100644 index 0000000000..f4c29a076f --- /dev/null +++ b/dbms/include/DB/Interpreters/InterpreterInsertQuery.h @@ -0,0 +1,32 @@ +#pragma once + +#include + + +namespace DB +{ + + +/** Интерпретирует запрос INSERT. + */ +class InterpreterInsertQuery +{ +public: + InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE); + + /** Выполнить запрос. + * remaining_data_istr, если не NULL, может содержать нераспарсенные данные для вставки. + * (заранее может быть считан в оперативку для парсинга лишь небольшой кусок запроса, который содержит не все данные) + */ + void execute(SharedPtr remaining_data_istr); + +private: + StoragePtr getTable(); + + ASTPtr query_ptr; + Context context; + size_t max_block_size; +}; + + +} diff --git a/dbms/include/DB/Interpreters/InterpreterQuery.h b/dbms/include/DB/Interpreters/InterpreterQuery.h new file mode 100644 index 0000000000..7fcd2aa354 --- /dev/null +++ b/dbms/include/DB/Interpreters/InterpreterQuery.h @@ -0,0 +1,38 @@ +#pragma once + +#include + + +namespace DB +{ + + +/** Интерпретирует произвольный запрос. + */ +class InterpreterQuery +{ +public: + InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_ = DEFAULT_BLOCK_SIZE); + + /** Выполнить запрос. + * + * ostr - куда писать результат выполнения запроса, если он есть. + * + * remaining_data_istr, если не NULL, может содержать нераспарсенный остаток запроса с данными. + * (заранее может быть считан в оперативку для парсинга лишь небольшой кусок запроса, который содержит не все данные) + * + * В query_plan, + * после выполнения запроса, может быть записан BlockInputStreamPtr, + * использовавшийся при выполнении запроса, + * чтобы можно было получить информацию о том, как выполнялся запрос. + */ + void execute(WriteBuffer & ostr, SharedPtr remaining_data_istr, BlockInputStreamPtr & query_plan); + +private: + ASTPtr query_ptr; + Context context; + size_t max_block_size; +}; + + +} diff --git a/dbms/include/DB/Interpreters/executeQuery.h b/dbms/include/DB/Interpreters/executeQuery.h new file mode 100644 index 0000000000..fa53916619 --- /dev/null +++ b/dbms/include/DB/Interpreters/executeQuery.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + + +namespace DB +{ + + +/** Парсит и исполняет запрос. + */ +void executeQuery( + ReadBuffer & istr, /// Откуда читать запрос (а также данные для INSERT-а, если есть) + WriteBuffer & ostr, /// Куда писать результат + Context & context, /// БД, таблицы, типы данных, движки таблиц, функции, агрегатные функции... + BlockInputStreamPtr & query_plan, /// Сюда может быть записано описание, как выполнялся запрос + size_t max_query_size = DEFAULT_MAX_QUERY_SIZE, /// Какую часть запроса можно прочитать в оперативку для парсинга (оставшиеся данные для INSERT, если есть, считываются позже) + size_t max_block_size = DEFAULT_BLOCK_SIZE); /// Максимальный размер блока при чтении или вставке данных + +} diff --git a/dbms/include/DB/Interpreters/loadMetadata.h b/dbms/include/DB/Interpreters/loadMetadata.h new file mode 100644 index 0000000000..4d7018dfd4 --- /dev/null +++ b/dbms/include/DB/Interpreters/loadMetadata.h @@ -0,0 +1,15 @@ +#pragma once + +#include + + +namespace DB +{ + + +/** Загружает определения таблиц и добавляет их в контекст. + */ +void loadMetadata(Context & context); + + +} diff --git a/dbms/include/DB/Parsers/ParserQuery.h b/dbms/include/DB/Parsers/ParserQuery.h new file mode 100644 index 0000000000..622093db79 --- /dev/null +++ b/dbms/include/DB/Parsers/ParserQuery.h @@ -0,0 +1,17 @@ +#pragma once + +#include + + +namespace DB +{ + + +class ParserQuery : public IParserBase +{ +protected: + String getName() { return "Query"; } + bool parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected); +}; + +} diff --git a/dbms/src/Interpreters/InterpreterInsertQuery.cpp b/dbms/src/Interpreters/InterpreterInsertQuery.cpp new file mode 100644 index 0000000000..9539565f89 --- /dev/null +++ b/dbms/src/Interpreters/InterpreterInsertQuery.cpp @@ -0,0 +1,87 @@ +#include + +#include +#include + +#include +#include +#include + +#include +#include + + +namespace DB +{ + + +InterpreterInsertQuery::InterpreterInsertQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_) + : query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_) +{ +} + + +StoragePtr InterpreterInsertQuery::getTable() +{ + ASTInsertQuery & query = dynamic_cast(*query_ptr); + + /// В какую таблицу писать. + + String database_name = query.database; + String table_name = query.table; + + /** Если база данных не указана - используем текущую базу данных. + */ + if (database_name.empty()) + database_name = context.current_database; + + if (context.databases->end() == context.databases->find(database_name) + || (*context.databases)[database_name].end() == (*context.databases)[database_name].find(table_name)) + throw Exception("Unknown table '" + table_name + "' in database '" + database_name + "'", ErrorCodes::UNKNOWN_TABLE); + + return (*context.databases)[database_name][table_name]; +} + + +void InterpreterInsertQuery::execute(SharedPtr remaining_data_istr) +{ + ASTInsertQuery & query = dynamic_cast(*query_ptr); + StoragePtr table = getTable(); + + /// TODO - если указаны не все столбцы, то дополнить поток недостающими столбцами со значениями по-умолчанию. + + BlockInputStreamPtr in; + BlockOutputStreamPtr out = table->write(query_ptr); + + /// Какой тип запроса: INSERT VALUES | INSERT FORMAT | INSERT SELECT? + if (!query.select) + { + FormatFactory format_factory; + String format = query.format; + if (format.empty()) + format = "Values"; + + /// Данные могут содержаться в распарсенной и ещё не распарсенной части запроса. + ConcatReadBuffer::ReadBuffers buffers; + ReadBuffer buf1(const_cast(query.data), query.end - query.data, 0); + + buffers.push_back(&buf1); + if (remaining_data_istr) + buffers.push_back(&*remaining_data_istr); + + ConcatReadBuffer istr(buffers); + Block sample = table->getSampleBlock(); + + in = format_factory.getInput(format, istr, sample, max_block_size, *context.data_type_factory); + copyData(*in, *out); + } + else + { + InterpreterSelectQuery interpreter_select(query.select, context, max_block_size); + in = interpreter_select.execute(); + copyData(*in, *out); + } +} + + +} diff --git a/dbms/src/Interpreters/InterpreterQuery.cpp b/dbms/src/Interpreters/InterpreterQuery.cpp new file mode 100644 index 0000000000..0b5b3c9914 --- /dev/null +++ b/dbms/src/Interpreters/InterpreterQuery.cpp @@ -0,0 +1,43 @@ +#include +#include +#include + +#include +#include +#include +#include + + +namespace DB +{ + + +InterpreterQuery::InterpreterQuery(ASTPtr query_ptr_, Context & context_, size_t max_block_size_) + : query_ptr(query_ptr_), context(context_), max_block_size(max_block_size_) +{ +} + + +void InterpreterQuery::execute(WriteBuffer & ostr, SharedPtr remaining_data_istr, BlockInputStreamPtr & query_plan) +{ + if (dynamic_cast(&*query_ptr)) + { + InterpreterSelectQuery interpreter(query_ptr, context, max_block_size); + query_plan = interpreter.executeAndFormat(ostr); + } + else if (dynamic_cast(&*query_ptr)) + { + InterpreterInsertQuery interpreter(query_ptr, context, max_block_size); + interpreter.execute(remaining_data_istr); + } + else if (dynamic_cast(&*query_ptr)) + { + InterpreterCreateQuery interpreter; + interpreter.execute(query_ptr, context); + } + else + throw Exception("Unknown type of query: " + query_ptr->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY); +} + + +} diff --git a/dbms/src/Interpreters/executeQuery.cpp b/dbms/src/Interpreters/executeQuery.cpp new file mode 100644 index 0000000000..27ca206896 --- /dev/null +++ b/dbms/src/Interpreters/executeQuery.cpp @@ -0,0 +1,61 @@ +#include + + +namespace DB +{ + + +void executeQuery( + ReadBuffer & istr, + WriteBuffer & ostr, + Context & context, + BlockInputStreamPtr & query_plan, + size_t max_query_size, + size_t max_block_size) +{ + DB::ParserQuery parser; + DB::ASTPtr ast; + std::string expected; + + std::vector parse_buf; + const char * begin; + const char * end; + + /// Если в istr ещё ничего нет, то считываем кусок данных + if (istr.buffer().size() == 0) + istr.next(); + + if (istr.buffer().end() - istr.position() >= static_cast(max_query_size)) + { + /// Если оставшийся размер буфера istr достаточен, чтобы распарсить запрос до max_query_size, то парсим прямо в нём + begin = istr.position(); + end = istr.buffer().end(); + istr.position() += end - begin; + } + else + { + /// Если нет - считываем достаточное количество данных в parse_buf + parse_buf.resize(max_query_size); + parse_buf.resize(istr.read(&parse_buf[0], max_query_size)); + begin = &parse_buf[0]; + end = begin + parse_buf.size(); + } + + const char * pos = begin; + + bool parse_res = parser.parse(pos, end, ast, expected); + + /// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой. + if (!parse_res || (pos != end && *pos != ';')) + throw DB::Exception("Syntax error: failed at position " + + Poco::NumberFormatter::format(pos - begin) + ": " + + std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos)) + + ", expected " + (parse_res ? "end of query" : expected) + ".", + DB::ErrorCodes::SYNTAX_ERROR); + + InterpreterQuery interpreter(ast, context, max_block_size); + interpreter.execute(ostr, new ReadBuffer(istr), query_plan); +} + + +} diff --git a/dbms/src/Interpreters/loadMetadata.cpp b/dbms/src/Interpreters/loadMetadata.cpp new file mode 100644 index 0000000000..c45f3d1cb0 --- /dev/null +++ b/dbms/src/Interpreters/loadMetadata.cpp @@ -0,0 +1,75 @@ +#include +#include + +#include +#include + +#include +#include + + +namespace DB +{ + + +static void executeCreateQuery(const String & query, Context & context, const String & database, const String & file_name) +{ + const char * begin = query.data(); + const char * end = begin + query.size(); + const char * pos = begin; + + ParserCreateQuery parser; + ASTPtr ast; + String expected; + bool parse_res = parser.parse(pos, end, ast, expected); + + /// Распарсенный запрос должен заканчиваться на конец входных данных или на точку с запятой. + if (!parse_res || (pos != end && *pos != ';')) + throw DB::Exception("Syntax error while executing query from file " + file_name + ": failed at position " + + Poco::NumberFormatter::format(pos - begin) + ": " + + std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos)) + + ", expected " + (parse_res ? "end of query" : expected) + ".", + DB::ErrorCodes::SYNTAX_ERROR); + + ASTCreateQuery & ast_create_query = dynamic_cast(*ast); + ast_create_query.attach = true; + ast_create_query.database = database; + + InterpreterCreateQuery interpreter; + interpreter.execute(ast, context); +} + + +void loadMetadata(Context & context) +{ + /// Здесь хранятся определения таблиц + String path = context.path + "metadata"; + + /// Цикл по базам данных + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator it(path); it != dir_end; ++it) + { + if (!it->isDirectory()) + continue; + + /// Цикл по таблицам + for (Poco::DirectoryIterator jt(it->path()); jt != dir_end; ++jt) + { + /// Файлы имеют имена вида table_name.sql + if (jt.name().compare(jt.name().size() - 4, 4, ".sql")) + throw Exception("Incorrect file extension: " + jt.name() + " in metadata directory " + it->path(), ErrorCodes::INCORRECT_FILE_NAME); + + Poco::FileInputStream istr(jt->path()); + std::stringstream s; + s << istr.rdbuf(); + + if (!istr.good()) + throw Exception("Cannot read from file " + jt->path(), ErrorCodes::CANNOT_READ_FROM_ISTREAM); + + executeCreateQuery(s.str(), context, it.name(), jt->path()); + } + } +} + + +} diff --git a/dbms/src/Parsers/ParserQuery.cpp b/dbms/src/Parsers/ParserQuery.cpp new file mode 100644 index 0000000000..5d8eca727b --- /dev/null +++ b/dbms/src/Parsers/ParserQuery.cpp @@ -0,0 +1,22 @@ +#include +#include +#include +#include + + +namespace DB +{ + + +bool ParserQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & expected) +{ + ParserSelectQuery select_p; + ParserInsertQuery insert_p; + ParserCreateQuery create_p; + + return select_p.parse(pos, end, node, expected) + || insert_p.parse(pos, end, node, expected) + || create_p.parse(pos, end, node, expected); +} + +} \ No newline at end of file -- GitLab