diff --git a/dbms/src/Storages/StorageFile.cpp b/dbms/src/Storages/StorageFile.cpp index 8df0385d55f4365cd5c6bef68de47290a4547a6a..ce4e8864d8ff854d583a02cbcc598386f30e92b5 100644 --- a/dbms/src/Storages/StorageFile.cpp +++ b/dbms/src/Storages/StorageFile.cpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include @@ -181,7 +182,8 @@ StorageFile::StorageFile(const std::string & relative_table_dir_path, CommonArgu } StorageFile::StorageFile(CommonArguments args) - : table_name(args.table_name), database_name(args.database_name), format_name(args.format_name) + : IStorage(ColumnsDescription({{"_path", std::make_shared()}, {"_file", std::make_shared()}}, true)) + , table_name(args.table_name), database_name(args.database_name), format_name(args.format_name) , compression_method(args.compression_method), base_path(args.context.getPath()) { if (args.format_name != "Distributed") @@ -195,7 +197,7 @@ class StorageFileBlockInputStream : public IBlockInputStream public: StorageFileBlockInputStream(std::shared_ptr storage_, const Context & context, UInt64 max_block_size, - std::string file_path, + std::string file_path_, bool need_path, bool need_file, const CompressionMethod compression_method, BlockInputStreamPtr prepared_reader = nullptr) : storage(std::move(storage_)), reader(std::move(prepared_reader)) @@ -223,7 +225,10 @@ public: else { shared_lock = std::shared_lock(storage->rwlock); - read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(file_path), compression_method); + file_path = std::make_optional(file_path_); + with_file_column = need_file; + with_path_column = need_path; + read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(file_path.value()), compression_method); } if (!reader) @@ -237,10 +242,35 @@ public: Block readImpl() override { - return reader->read(); + auto res = reader->read(); + if (res && file_path) + { + if (with_path_column) + res.insert({DataTypeString().createColumnConst(res.rows(), file_path.value())->convertToFullColumnIfConst(), + std::make_shared(), "_path"}); /// construction with const is for probably generating less code + if (with_file_column) + { + size_t last_slash_pos = file_path.value().find_last_of('/'); + res.insert({DataTypeString().createColumnConst(res.rows(), file_path.value().substr( + last_slash_pos + 1))->convertToFullColumnIfConst(), + std::make_shared(), "_file"}); + } + } + return res; } - Block getHeader() const override { return reader->getHeader(); } + Block getHeader() const override + { + auto res = reader->getHeader(); + if (res && file_path) + { + if (with_path_column) + res.insert({DataTypeString().createColumn(), std::make_shared(), "_path"}); + if (with_file_column) + res.insert({DataTypeString().createColumn(), std::make_shared(), "_file"}); + } + return res; + } void readPrefixImpl() override { @@ -254,6 +284,9 @@ public: private: std::shared_ptr storage; + std::optional file_path; + bool with_path_column = false; + bool with_file_column = false; Block sample_block; std::unique_ptr read_buf; BlockInputStreamPtr reader; @@ -264,7 +297,7 @@ private: BlockInputStreams StorageFile::read( - const Names & /*column_names*/, + const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context, QueryProcessingStage::Enum /*processed_stage*/, @@ -282,6 +315,15 @@ BlockInputStreams StorageFile::read( throw Exception("File " + paths[0] + " doesn't exist", ErrorCodes::FILE_DOESNT_EXIST); } blocks_input.reserve(paths.size()); + bool need_path_column = false; + bool need_file_column = false; + for (const auto & column : column_names) + { + if (column == "_path") + need_path_column = true; + if (column == "_file") + need_file_column = true; + } for (const auto & file_path : paths) { BlockInputStreamPtr cur_block; @@ -290,7 +332,7 @@ BlockInputStreams StorageFile::read( cur_block = StorageDistributedDirectoryMonitor::createStreamFromFile(file_path); else cur_block = std::make_shared( - std::static_pointer_cast(shared_from_this()), context, max_block_size, file_path, chooseCompressionMethod(file_path, compression_method)); + std::static_pointer_cast(shared_from_this()), context, max_block_size, file_path, need_path_column, need_file_column, chooseCompressionMethod(file_path, compression_method)); blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared(cur_block, column_defaults, context)); } diff --git a/dbms/src/Storages/StorageHDFS.cpp b/dbms/src/Storages/StorageHDFS.cpp index 8e5db9100926b785229f7cc5d837f9c2e55b98a7..0795c57b950857f001cd813f8f20bba90a7d7a08 100644 --- a/dbms/src/Storages/StorageHDFS.cpp +++ b/dbms/src/Storages/StorageHDFS.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -42,7 +43,8 @@ StorageHDFS::StorageHDFS(const String & uri_, const ConstraintsDescription & constraints_, Context & context_, const String & compression_method_ = "") - : uri(uri_) + : IStorage(ColumnsDescription({{"_path", std::make_shared()}, {"_file", std::make_shared()}}, true)) + , uri(uri_) , format_name(format_name_) , table_name(table_name_) , database_name(database_name_) @@ -61,6 +63,8 @@ class HDFSBlockInputStream : public IBlockInputStream { public: HDFSBlockInputStream(const String & uri, + bool need_path, + bool need_file, const String & format, const Block & sample_block, const Context & context, @@ -68,7 +72,9 @@ public: const CompressionMethod compression_method) { auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri), compression_method); - + file_path = uri; + with_file_column = need_file; + with_path_column = need_path; auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); reader = std::make_shared>(input_stream, std::move(read_buf)); } @@ -80,12 +86,34 @@ public: Block readImpl() override { - return reader->read(); + auto res = reader->read(); + if (res) + { + if (with_path_column) + res.insert({DataTypeString().createColumnConst(res.rows(), file_path)->convertToFullColumnIfConst(), std::make_shared(), + "_path"}); /// construction with const is for probably generating less code + if (with_file_column) + { + size_t last_slash_pos = file_path.find_last_of('/'); + res.insert({DataTypeString().createColumnConst(res.rows(), file_path.substr( + last_slash_pos + 1))->convertToFullColumnIfConst(), std::make_shared(), + "_file"}); + } + } + return res; } Block getHeader() const override { - return reader->getHeader(); + auto res = reader->getHeader(); + if (res) + { + if (with_path_column) + res.insert({DataTypeString().createColumn(), std::make_shared(), "_path"}); + if (with_file_column) + res.insert({DataTypeString().createColumn(), std::make_shared(), "_file"}); + } + return res; } void readPrefixImpl() override @@ -100,6 +128,9 @@ public: private: BlockInputStreamPtr reader; + String file_path; + bool with_path_column = false; + bool with_file_column = false; }; class HDFSBlockOutputStream : public IBlockOutputStream @@ -194,7 +225,7 @@ Strings LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, c BlockInputStreams StorageHDFS::read( - const Names & /*column_names*/, + const Names & column_names, const SelectQueryInfo & /*query_info*/, const Context & context_, QueryProcessingStage::Enum /*processed_stage*/, @@ -210,9 +241,18 @@ BlockInputStreams StorageHDFS::read( const Strings res_paths = LSWithRegexpMatching("/", fs, path_from_uri); BlockInputStreams result; + bool need_path_column = false; + bool need_file_column = false; + for (const auto & column : column_names) + { + if (column == "_path") + need_path_column = true; + if (column == "_file") + need_file_column = true; + } for (const auto & res_path : res_paths) { - result.push_back(std::make_shared(uri_without_path + res_path, format_name, getSampleBlock(), context_, + result.push_back(std::make_shared(uri_without_path + res_path, need_path_column, need_file_column, format_name, getSampleBlock(), context_, max_block_size, chooseCompressionMethod(res_path, compression_method))); } diff --git a/dbms/tests/integration/test_globs_in_filepath/test.py b/dbms/tests/integration/test_globs_in_filepath/test.py index 1ca3d003f1d98b0e062792b7b989f4fa469fb99b..70bdb7777fb10e0f84a4bed9e5c099628b5cb089 100644 --- a/dbms/tests/integration/test_globs_in_filepath/test.py +++ b/dbms/tests/integration/test_globs_in_filepath/test.py @@ -120,10 +120,14 @@ def test_deep_structure(start_cluster): select count(*) from file('{}{}', 'TSV', 'text String, number Float64') '''.format(path_to_userfiles_from_defaut_config, pattern)) == '{}\n'.format(value) -def test_table_function(start_cluster): +def test_table_function_and_virtual_columns(start_cluster): node.exec_in_container(['bash', '-c', 'mkdir -p {}some/path/to/'.format(path_to_userfiles_from_defaut_config)]) node.exec_in_container(['bash', '-c', 'touch {}some/path/to/data.CSV'.format(path_to_userfiles_from_defaut_config)]) node.query("insert into table function file('some/path/to/data.CSV', CSV, 'n UInt8, s String') select number, concat('str_', toString(number)) from numbers(100000)") assert node.query("select count() from file('some/path/to/data.CSV', CSV, 'n UInt8, s String')").rstrip() == '100000' node.query("insert into table function file('nonexist.csv', 'CSV', 'val1 UInt32') values (1)") assert node.query("select * from file('nonexist.csv', 'CSV', 'val1 UInt32')").rstrip()== '1' + assert "nonexist.csv" in node.query("select _path from file('nonexis?.csv', 'CSV', 'val1 UInt32')").rstrip() + assert "nonexist.csv" in node.query("select _path from file('nonexist.csv', 'CSV', 'val1 UInt32')").rstrip() + assert "nonexist.csv" == node.query("select _file from file('nonexis?.csv', 'CSV', 'val1 UInt32')").rstrip() + assert "nonexist.csv" == node.query("select _file from file('nonexist.csv', 'CSV', 'val1 UInt32')").rstrip() \ No newline at end of file diff --git a/dbms/tests/integration/test_storage_hdfs/test.py b/dbms/tests/integration/test_storage_hdfs/test.py index 575b7593ca09000747f02c28b149f3d923549c1f..d65b0efc334cb9153733fc4e1d68849ea576b876 100644 --- a/dbms/tests/integration/test_storage_hdfs/test.py +++ b/dbms/tests/integration/test_storage_hdfs/test.py @@ -119,21 +119,24 @@ def test_globs_in_read_table(started_cluster): for filename in files: hdfs_api.write_data(globs_dir + filename, some_data) - test_requests = [("dir{1..5}/dir_dir/file1", 1), - ("*_table_functio?", 1), - ("dir/fil?", 1), - ("table{3..8}_function", 1), - ("table{2..8}_function", 2), - ("dir/*", 1), - ("dir/*?*?*?*?*", 1), - ("dir/*?*?*?*?*?*", 0), - ("some_dir/*/file", 2), - ("some_dir/dir?/*", 2), - ("*/*/*", 3), - ("?", 0)] - - for pattern, value in test_requests: - assert node1.query("select * from hdfs('hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64')") == value * some_data + test_requests = [("dir{1..5}/dir_dir/file1", 1, 1), + ("*_table_functio?", 1, 1), + ("dir/fil?", 1, 1), + ("table{3..8}_function", 1, 1), + ("table{2..8}_function", 2, 2), + ("dir/*", 1, 1), + ("dir/*?*?*?*?*", 1, 1), + ("dir/*?*?*?*?*?*", 0, 0), + ("some_dir/*/file", 2, 1), + ("some_dir/dir?/*", 2, 1), + ("*/*/*", 3, 2), + ("?", 0, 0)] + + for pattern, paths_amount, files_amount in test_requests: + inside_table_func = "'hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64'" + assert node1.query("select * from hdfs(" + inside_table_func + ")") == paths_amount * some_data + assert node1.query("select count(distinct _path) from hdfs(" + inside_table_func + ")").rstrip() == str(paths_amount) + assert node1.query("select count(distinct _file) from hdfs(" + inside_table_func + ")").rstrip() == str(files_amount) def test_read_write_gzip_table(started_cluster): hdfs_api = HDFSApi("root") diff --git a/docs/en/operations/table_engines/hdfs.md b/docs/en/operations/table_engines/hdfs.md index 22760c02d83347af26a9f3eb3df3ec3629a9d0e3..2f98e4dc45234e7a7d053578ab37502e93af24c7 100644 --- a/docs/en/operations/table_engines/hdfs.md +++ b/docs/en/operations/table_engines/hdfs.md @@ -100,4 +100,13 @@ Create table with files named `file000`, `file001`, ... , `file999`: CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV') ``` +## Virtual Columns + +- `_path` — Path to the file. +- `_file` — Name of the file. + +**See Also** + +- [Virtual columns](https://clickhouse.yandex/docs/en/operations/table_engines/#table_engines-virtual_columns) + [Original article](https://clickhouse.yandex/docs/en/operations/table_engines/hdfs/) diff --git a/docs/en/query_language/table_functions/file.md b/docs/en/query_language/table_functions/file.md index b4f74bd2d8cc7a2e64279f51ac1f3f172250a0e0..6ad324bb0d9e2326b7d7f65bfc66d22dd53d599e 100644 --- a/docs/en/query_language/table_functions/file.md +++ b/docs/en/query_language/table_functions/file.md @@ -98,4 +98,13 @@ SELECT count(*) FROM file('big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32') ``` +## Virtual Columns + +- `_path` — Path to the file. +- `_file` — Name of the file. + +**See Also** + +- [Virtual columns](https://clickhouse.yandex/docs/en/operations/table_engines/#table_engines-virtual_columns) + [Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/file/) diff --git a/docs/en/query_language/table_functions/hdfs.md b/docs/en/query_language/table_functions/hdfs.md index b069ce5a9c0b1e2cab1028d5181737482ff433de..2c13ef0c95bfe806ba55b3b6a177b6b400b556d2 100644 --- a/docs/en/query_language/table_functions/hdfs.md +++ b/docs/en/query_language/table_functions/hdfs.md @@ -83,4 +83,13 @@ SELECT count(*) FROM hdfs('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32') ``` +## Virtual Columns + +- `_path` — Path to the file. +- `_file` — Name of the file. + +**See Also** + +- [Virtual columns](https://clickhouse.yandex/docs/en/operations/table_engines/#table_engines-virtual_columns) + [Original article](https://clickhouse.yandex/docs/en/query_language/table_functions/hdfs/) diff --git a/docs/ru/operations/table_engines/hdfs.md b/docs/ru/operations/table_engines/hdfs.md index b384eb3bf60e1dd82eb2835e40c7a650cac74fb4..ca889a5d4ee471e20fdb74aadcb1789d749bede1 100644 --- a/docs/ru/operations/table_engines/hdfs.md +++ b/docs/ru/operations/table_engines/hdfs.md @@ -97,4 +97,13 @@ CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV') ``` +## Виртуальные столбцы + +- `_path` — Путь к файлу. +- `_file` — Имя файла. + +**Смотрите также** + +- [Виртуальные столбцы](index.md#table_engines-virtual_columns) + [Оригинальная статья](https://clickhouse.yandex/docs/ru/operations/table_engines/hdfs/) diff --git a/docs/ru/query_language/table_functions/file.md b/docs/ru/query_language/table_functions/file.md index 9938077de0a55db1a3bc5e3fac572c242ce2c1bd..d43f2773a427884ec03658d1d946a09ea62436c2 100644 --- a/docs/ru/query_language/table_functions/file.md +++ b/docs/ru/query_language/table_functions/file.md @@ -91,4 +91,13 @@ SELECT count(*) FROM file('big_dir/file{0..9}{0..9}{0..9}', 'CSV', 'name String, value UInt32') ``` +## Виртуальные столбцы + +- `_path` — Путь к файлу. +- `_file` — Имя файла. + +**Смотрите также** + +- [Виртуальные столбцы](index.md#table_engines-virtual_columns) + [Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/table_functions/file/) diff --git a/docs/ru/query_language/table_functions/hdfs.md b/docs/ru/query_language/table_functions/hdfs.md index 8787c23e826bf421a530f82860b2f902715d1c74..768cf4e3b1ddb6eb8dc99fdd37b5db38cd9928f4 100644 --- a/docs/ru/query_language/table_functions/hdfs.md +++ b/docs/ru/query_language/table_functions/hdfs.md @@ -47,4 +47,13 @@ LIMIT 2 Шаблоны могут содержаться в разных частях пути. Обрабатываться будут ровно те файлы, которые и удовлетворяют всему шаблону пути, и существуют в файловой системе. +## Виртуальные столбцы + +- `_path` — Путь к файлу. +- `_file` — Имя файла. + +**Смотрите также** + +- [Виртуальные столбцы](index.md#table_engines-virtual_columns) + [Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/table_functions/hdfs/)