未验证 提交 53ae368d 编写于 作者: O Olga Khvostikova 提交者: GitHub

Add virtual columns to hdfs and file table functions (#8489)

* Add virtual column _path to hdfs and file table functions with test

* Fix const of headers

* Add column _file with tests

* Add docs

* Fix improper resolve conflicts

* Fix links in docs

* Better condition for virtual columns proccessing in StorageFile

* better condition for virtual columns processing in StorageHDFS
上级 b6f41e44
......@@ -13,6 +13,7 @@
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
......@@ -181,7 +182,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu
}
StorageFile::StorageFile(CommonArguments args)
: table_name(args.table_name), database_name(args.database_name), format_name(args.format_name)
: IStorage(ColumnsDescription({{"_path", std::make_shared<DataTypeString>()}, {"_file", std::make_shared<DataTypeString>()}}, true))
, table_name(args.table_name), database_name(args.database_name), format_name(args.format_name)
, compression_method(args.compression_method), base_path(args.context.getPath())
{
if (args.format_name != "Distributed")
......@@ -195,7 +197,7 @@ class StorageFileBlockInputStream : public IBlockInputStream
public:
StorageFileBlockInputStream(std::shared_ptr<StorageFile> storage_,
const Context & context, UInt64 max_block_size,
std::string file_path,
std::string file_path_, bool need_path, bool need_file,
const CompressionMethod compression_method,
BlockInputStreamPtr prepared_reader = nullptr)
: storage(std::move(storage_)), reader(std::move(prepared_reader))
......@@ -223,7 +225,10 @@ public:
else
{
shared_lock = std::shared_lock(storage->rwlock);
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(file_path), compression_method);
file_path = std::make_optional(file_path_);
with_file_column = need_file;
with_path_column = need_path;
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromFile>(file_path.value()), compression_method);
}
if (!reader)
......@@ -237,10 +242,35 @@ public:
Block readImpl() override
{
return reader->read();
auto res = reader->read();
if (res && file_path)
{
if (with_path_column)
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.value())->convertToFullColumnIfConst(),
std::make_shared<DataTypeString>(), "_path"}); /// construction with const is for probably generating less code
if (with_file_column)
{
size_t last_slash_pos = file_path.value().find_last_of('/');
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.value().substr(
last_slash_pos + 1))->convertToFullColumnIfConst(),
std::make_shared<DataTypeString>(), "_file"});
}
}
return res;
}
Block getHeader() const override { return reader->getHeader(); }
Block getHeader() const override
{
auto res = reader->getHeader();
if (res && file_path)
{
if (with_path_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
}
return res;
}
void readPrefixImpl() override
{
......@@ -254,6 +284,9 @@ public:
private:
std::shared_ptr<StorageFile> storage;
std::optional<std::string> file_path;
bool with_path_column = false;
bool with_file_column = false;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
......@@ -264,7 +297,7 @@ private:
BlockInputStreams StorageFile::read(
const Names & /*column_names*/,
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -282,6 +315,15 @@ BlockInputStreams StorageFile::read(
throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
}
blocks_input.reserve(paths.size());
bool need_path_column = false;
bool need_file_column = false;
for (const auto & column : column_names)
{
if (column == "_path")
need_path_column = true;
if (column == "_file")
need_file_column = true;
}
for (const auto & file_path : paths)
{
BlockInputStreamPtr cur_block;
......@@ -290,7 +332,7 @@ BlockInputStreams StorageFile::read(
cur_block = StorageDistributedDirectoryMonitor::createStreamFromFile(file_path);
else
cur_block = std::make_shared<StorageFileBlockInputStream>(
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, chooseCompressionMethod(file_path, compression_method));
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, need_path_column, need_file_column, chooseCompressionMethod(file_path, compression_method));
blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
}
......
......@@ -13,6 +13,7 @@
#include <IO/WriteHelpers.h>
#include <IO/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
......@@ -42,7 +43,8 @@ StorageHDFS::StorageHDFS(const String & uri_,
const ConstraintsDescription & constraints_,
Context & context_,
const String & compression_method_ = "")
: uri(uri_)
: IStorage(ColumnsDescription({{"_path", std::make_shared<DataTypeString>()}, {"_file", std::make_shared<DataTypeString>()}}, true))
, uri(uri_)
, format_name(format_name_)
, table_name(table_name_)
, database_name(database_name_)
......@@ -61,6 +63,8 @@ class HDFSBlockInputStream : public IBlockInputStream
{
public:
HDFSBlockInputStream(const String & uri,
bool need_path,
bool need_file,
const String & format,
const Block & sample_block,
const Context & context,
......@@ -68,7 +72,9 @@ public:
const CompressionMethod compression_method)
{
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri), compression_method);
file_path = uri;
with_file_column = need_file;
with_path_column = need_path;
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
}
......@@ -80,12 +86,34 @@ public:
Block readImpl() override
{
return reader->read();
auto res = reader->read();
if (res)
{
if (with_path_column)
res.insert({DataTypeString().createColumnConst(res.rows(), file_path)->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_path"}); /// construction with const is for probably generating less code
if (with_file_column)
{
size_t last_slash_pos = file_path.find_last_of('/');
res.insert({DataTypeString().createColumnConst(res.rows(), file_path.substr(
last_slash_pos + 1))->convertToFullColumnIfConst(), std::make_shared<DataTypeString>(),
"_file"});
}
}
return res;
}
Block getHeader() const override
{
return reader->getHeader();
auto res = reader->getHeader();
if (res)
{
if (with_path_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
res.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
}
return res;
}
void readPrefixImpl() override
......@@ -100,6 +128,9 @@ public:
private:
BlockInputStreamPtr reader;
String file_path;
bool with_path_column = false;
bool with_file_column = false;
};
class HDFSBlockOutputStream : public IBlockOutputStream
......@@ -194,7 +225,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c
BlockInputStreams StorageHDFS::read(
const Names & /*column_names*/,
const Names & column_names,
const SelectQueryInfo & /*query_info*/,
const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -210,9 +241,18 @@ BlockInputStreams StorageHDFS::read(
const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri);
BlockInputStreams result;
bool need_path_column = false;
bool need_file_column = false;
for (const auto & column : column_names)
{
if (column == "_path")
need_path_column = true;
if (column == "_file")
need_file_column = true;
}
for (const auto & res_path : res_paths)
{
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, format_name, getSampleBlock(), context_,
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, need_path_column, need_file_column, format_name, getSampleBlock(), context_,
max_block_size, chooseCompressionMethod(res_path, compression_method)));
}
......
......@@ -120,10 +120,14 @@ def test_deep_structure(start_cluster):
select count(*) from file('{}{}', 'TSV', 'text String, number Float64')
'''.format(path_to_userfiles_from_defaut_config, pattern)) == '{}\n'.format(value)
def test_table_function(start_cluster):
def test_table_function_and_virtual_columns(start_cluster):
node.exec_in_container(['bash', '-c', 'mkdir -p {}some/path/to/'.format(path_to_userfiles_from_defaut_config)])
node.exec_in_container(['bash', '-c', 'touch {}some/path/to/data.CSV'.format(path_to_userfiles_from_defaut_config)])
node.query("insert into table function file('some/path/to/data.CSV', CSV, 'n UInt8, s String') select number, concat('str_', toString(number)) from numbers(100000)")
assert node.query("select count() from file('some/path/to/data.CSV', CSV, 'n UInt8, s String')").rstrip() == '100000'
node.query("insert into table function file('nonexist.csv', 'CSV', 'val1 UInt32') values (1)")
assert node.query("select * from file('nonexist.csv', 'CSV', 'val1 UInt32')").rstrip()== '1'
assert "nonexist.csv" in node.query("select _path from file('nonexis?.csv', 'CSV', 'val1 UInt32')").rstrip()
assert "nonexist.csv" in node.query("select _path from file('nonexist.csv', 'CSV', 'val1 UInt32')").rstrip()
assert "nonexist.csv" == node.query("select _file from file('nonexis?.csv', 'CSV', 'val1 UInt32')").rstrip()
assert "nonexist.csv" == node.query("select _file from file('nonexist.csv', 'CSV', 'val1 UInt32')").rstrip()
\ No newline at end of file
......@@ -119,21 +119,24 @@ def test_globs_in_read_table(started_cluster):
for filename in files:
hdfs_api.write_data(globs_dir + filename, some_data)
test_requests = [("dir{1..5}/dir_dir/file1", 1),
("*_table_functio?", 1),
("dir/fil?", 1),
("table{3..8}_function", 1),
("table{2..8}_function", 2),
("dir/*", 1),
("dir/*?*?*?*?*", 1),
("dir/*?*?*?*?*?*", 0),
("some_dir/*/file", 2),
("some_dir/dir?/*", 2),
("*/*/*", 3),
("?", 0)]
for pattern, value in test_requests:
assert node1.query("select * from hdfs('hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64')") == value * some_data
test_requests = [("dir{1..5}/dir_dir/file1", 1, 1),
("*_table_functio?", 1, 1),
("dir/fil?", 1, 1),
("table{3..8}_function", 1, 1),
("table{2..8}_function", 2, 2),
("dir/*", 1, 1),
("dir/*?*?*?*?*", 1, 1),
("dir/*?*?*?*?*?*", 0, 0),
("some_dir/*/file", 2, 1),
("some_dir/dir?/*", 2, 1),
("*/*/*", 3, 2),
("?", 0, 0)]
for pattern, paths_amount, files_amount in test_requests:
inside_table_func = "'hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64'"
assert node1.query("select * from hdfs(" + inside_table_func + ")") == paths_amount * some_data
assert node1.query("select count(distinct _path) from hdfs(" + inside_table_func + ")").rstrip() == str(paths_amount)
assert node1.query("select count(distinct _file) from hdfs(" + inside_table_func + ")").rstrip() == str(files_amount)
def test_read_write_gzip_table(started_cluster):
hdfs_api = HDFSApi("root")
......
......@@ -100,4 +100,13 @@ Create table with files named `file000`, `file001`, ... , `file999`:
CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## Virtual Columns
- `_path` — Path to the file.
- `_file` — Name of the file.
**See Also**
- [Virtual columns](https://clickhouse.yandex/docs/en/operations/table_engines/#table_engines-virtual_columns)
[Original article](https://clickhouse.yandex/docs/en/operations/table_engines/hdfs/) <!--hide-->
......@@ -98,4 +98,13 @@ SELECT count(*)
FROM file('big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32')
```
## Virtual Columns
- `_path` — Path to the file.
- `_file` — Name of the file.
**See Also**
- [Virtual columns](https://clickhouse.yandex/docs/en/operations/table_engines/#table_engines-virtual_columns)
[Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/file/) <!--hide-->
......@@ -83,4 +83,13 @@ SELECT count(*)
FROM hdfs('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32')
```
## Virtual Columns
- `_path` — Path to the file.
- `_file` — Name of the file.
**See Also**
- [Virtual columns](https://clickhouse.yandex/docs/en/operations/table_engines/#table_engines-virtual_columns)
[Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/hdfs/) <!--hide-->
......@@ -97,4 +97,13 @@ CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs
CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## Виртуальные столбцы
- `_path` — Путь к файлу.
- `_file` — Имя файла.
**Смотрите также**
- [Виртуальные столбцы](index.md#table_engines-virtual_columns)
[Оригинальная статья](https://clickhouse.yandex/docs/ru/operations/table_engines/hdfs/) <!--hide-->
......@@ -91,4 +91,13 @@ SELECT count(*)
FROM file('big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32')
```
## Виртуальные столбцы
- `_path` — Путь к файлу.
- `_file` — Имя файла.
**Смотрите также**
- [Виртуальные столбцы](index.md#table_engines-virtual_columns)
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/table_functions/file/) <!--hide-->
......@@ -47,4 +47,13 @@ LIMIT 2
Шаблоны могут содержаться в разных частях пути. Обрабатываться будут ровно те файлы, которые и удовлетворяют всему шаблону пути, и существуют в файловой системе.
## Виртуальные столбцы
- `_path` — Путь к файлу.
- `_file` — Имя файла.
**Смотрите также**
- [Виртуальные столбцы](index.md#table_engines-virtual_columns)
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/table_functions/hdfs/) <!--hide-->
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册