提交 8cbc52e7 编写于 作者: A Andrei Bodrov

add gzip read/write to file/s3/url/hdfs

上级 15cb620f
......@@ -407,11 +407,11 @@ void HTTPHandler::processQuery(
{
if (http_request_compression_method_str == "gzip")
{
in_post = std::make_unique<ZlibInflatingReadBuffer>(*in_post_raw, CompressionMethod::Gzip);
in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Gzip);
}
else if (http_request_compression_method_str == "deflate")
{
in_post = std::make_unique<ZlibInflatingReadBuffer>(*in_post_raw, CompressionMethod::Zlib);
in_post = std::make_unique<ZlibInflatingReadBuffer>(std::move(in_post_raw), CompressionMethod::Zlib);
}
#if USE_BROTLI
else if (http_request_compression_method_str == "br")
......
......@@ -5,7 +5,9 @@
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/ExecutionSpeedLimits.h>
#include <IO/CompressionMethod.h>
#include <IO/Progress.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <Storages/TableStructureLockHolder.h>
#include <Common/TypePromotion.h>
......@@ -228,6 +230,17 @@ public:
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
template <class TReadBuffer, class... Types>
std::unique_ptr<ReadBuffer> getBuffer(const DB::CompressionMethod method, Types... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(args...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
}
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
......
......@@ -3,6 +3,8 @@
#include <Core/Block.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Storages/TableStructureLockHolder.h>
#include <IO/CompressionMethod.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <boost/noncopyable.hpp>
......@@ -63,6 +65,19 @@ public:
*/
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
template <class TWriteBuffer, class... Types>
std::unique_ptr<WriteBuffer> getBuffer(const DB::CompressionMethod method, Types... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto write_buf = std::make_unique<TWriteBuffer>(args...);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(write_buf), method, (int) 1 /* compression level */);
}
return std::make_unique<TWriteBuffer>(args...);
}
virtual void finalize() {}
private:
std::vector<TableStructureReadLockHolder> table_locks;
};
......
......@@ -12,6 +12,7 @@ enum class CompressionMethod
/// This option corresponds to HTTP Content-Encoding: deflate.
Zlib,
Brotli,
None
};
}
......@@ -90,6 +90,9 @@ public:
++pos;
}
void virtual sync() {}
void virtual finalize() {}
private:
/** Write the data in the buffer (from the beginning of the buffer to the current position).
* Throw an exception if something is wrong.
......
......@@ -113,8 +113,8 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response.set("Content-Encoding", "gzip");
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr);
deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
else if (compression_method == CompressionMethod::Zlib)
......@@ -125,8 +125,8 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response.set("Content-Encoding", "deflate");
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr);
deflating_buf.emplace(*out_raw, compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
deflating_buf.emplace(std::move(out_raw), compression_method, compression_level, working_buffer.size(), working_buffer.begin());
out = &*deflating_buf;
}
#if USE_BROTLI
......@@ -138,7 +138,7 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response.set("Content-Encoding", "br");
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr);
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr);
brotli_buf.emplace(*out_raw, compression_level, working_buffer.size(), working_buffer.begin());
out = &*brotli_buf;
}
......@@ -155,7 +155,7 @@ void WriteBufferFromHTTPServerResponse::nextImpl()
response_body_ostr = &(response.send());
#endif
out_raw.emplace(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out_raw = std::make_unique<WriteBufferFromOStream>(*response_body_ostr, working_buffer.size(), working_buffer.begin());
out = &*out_raw;
}
}
......
......@@ -60,7 +60,7 @@ private:
std::ostream * response_header_ostr = nullptr;
#endif
std::optional<WriteBufferFromOStream> out_raw;
std::unique_ptr<WriteBufferFromOStream> out_raw;
std::optional<ZlibDeflatingWriteBuffer> deflating_buf;
#if USE_BROTLI
std::optional<BrotliWriteBuffer> brotli_buf;
......
......@@ -6,14 +6,14 @@ namespace DB
{
ZlibDeflatingWriteBuffer::ZlibDeflatingWriteBuffer(
WriteBuffer & out_,
std::unique_ptr<WriteBuffer> out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<WriteBuffer>(buf_size, existing_memory, alignment)
, out(out_)
, out(std::move(out_))
{
zstr.zalloc = nullptr;
zstr.zfree = nullptr;
......@@ -64,18 +64,18 @@ void ZlibDeflatingWriteBuffer::nextImpl()
do
{
out.nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out.position());
zstr.avail_out = out.buffer().end() - out.position();
out->nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out->position());
zstr.avail_out = out->buffer().end() - out->position();
int rc = deflate(&zstr, Z_NO_FLUSH);
out.position() = out.buffer().end() - zstr.avail_out;
out->position() = out->buffer().end() - zstr.avail_out;
// Unpoison the result of deflate explicitly. It uses some custom SSE algo
// for computing CRC32, and it looks like msan is unable to comprehend
// it fully, so it complains about the resulting value depending on the
// uninitialized padding of the input buffer.
__msan_unpoison(out.position(), zstr.avail_out);
__msan_unpoison(out->position(), zstr.avail_out);
if (rc != Z_OK)
throw Exception(std::string("deflate failed: ") + zError(rc), ErrorCodes::ZLIB_DEFLATE_FAILED);
......@@ -92,18 +92,18 @@ void ZlibDeflatingWriteBuffer::finish()
while (true)
{
out.nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out.position());
zstr.avail_out = out.buffer().end() - out.position();
out->nextIfAtEnd();
zstr.next_out = reinterpret_cast<unsigned char *>(out->position());
zstr.avail_out = out->buffer().end() - out->position();
int rc = deflate(&zstr, Z_FINISH);
out.position() = out.buffer().end() - zstr.avail_out;
out->position() = out->buffer().end() - zstr.avail_out;
// Unpoison the result of deflate explicitly. It uses some custom SSE algo
// for computing CRC32, and it looks like msan is unable to comprehend
// it fully, so it complains about the resulting value depending on the
// uninitialized padding of the input buffer.
__msan_unpoison(out.position(), zstr.avail_out);
__msan_unpoison(out->position(), zstr.avail_out);
if (rc == Z_STREAM_END)
{
......
......@@ -20,7 +20,7 @@ class ZlibDeflatingWriteBuffer : public BufferWithOwnMemory<WriteBuffer>
{
public:
ZlibDeflatingWriteBuffer(
WriteBuffer & out_,
std::unique_ptr<WriteBuffer> out_,
CompressionMethod compression_method,
int compression_level,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
......@@ -37,7 +37,7 @@ public:
private:
void nextImpl() override;
WriteBuffer & out;
std::unique_ptr<WriteBuffer> out;
z_stream zstr;
bool finished = false;
};
......
......@@ -5,13 +5,13 @@ namespace DB
{
ZlibInflatingReadBuffer::ZlibInflatingReadBuffer(
ReadBuffer & in_,
std::unique_ptr<ReadBuffer> in_,
CompressionMethod compression_method,
size_t buf_size,
char * existing_memory,
size_t alignment)
: BufferWithOwnMemory<ReadBuffer>(buf_size, existing_memory, alignment)
, in(in_)
, in(std::move(in_))
, eof(false)
{
zstr.zalloc = nullptr;
......@@ -49,21 +49,21 @@ bool ZlibInflatingReadBuffer::nextImpl()
if (!zstr.avail_in)
{
in.nextIfAtEnd();
zstr.next_in = reinterpret_cast<unsigned char *>(in.position());
zstr.avail_in = in.buffer().end() - in.position();
in->nextIfAtEnd();
zstr.next_in = reinterpret_cast<unsigned char *>(in->position());
zstr.avail_in = in->buffer().end() - in->position();
}
zstr.next_out = reinterpret_cast<unsigned char *>(internal_buffer.begin());
zstr.avail_out = internal_buffer.size();
int rc = inflate(&zstr, Z_NO_FLUSH);
in.position() = in.buffer().end() - zstr.avail_in;
in->position() = in->buffer().end() - zstr.avail_in;
working_buffer.resize(internal_buffer.size() - zstr.avail_out);
if (rc == Z_STREAM_END)
{
if (in.eof())
if (in->eof())
{
eof = true;
return working_buffer.size() != 0;
......
......@@ -21,7 +21,7 @@ class ZlibInflatingReadBuffer : public BufferWithOwnMemory<ReadBuffer>
{
public:
ZlibInflatingReadBuffer(
ReadBuffer & in_,
std::unique_ptr<ReadBuffer> in_,
CompressionMethod compression_method,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
char * existing_memory = nullptr,
......@@ -32,7 +32,7 @@ public:
private:
bool nextImpl() override;
ReadBuffer & in;
std::unique_ptr<ReadBuffer> in;
z_stream zstr;
bool eof;
};
......
......@@ -22,8 +22,8 @@ try
Stopwatch stopwatch;
{
DB::WriteBufferFromFile buf("test_zlib_buffers.gz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::ZlibDeflatingWriteBuffer deflating_buf(buf, DB::CompressionMethod::Gzip, /* compression_level = */ 3);
auto buf = std::make_unique<DB::WriteBufferFromFile>("test_zlib_buffers.gz", DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC);
DB::ZlibDeflatingWriteBuffer deflating_buf(std::move(buf), DB::CompressionMethod::Gzip, /* compression_level = */ 3);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
......@@ -40,8 +40,8 @@ try
}
{
DB::ReadBufferFromFile buf("test_zlib_buffers.gz");
DB::ZlibInflatingReadBuffer inflating_buf(buf, DB::CompressionMethod::Gzip);
auto buf = std::make_unique<DB::ReadBufferFromFile>("test_zlib_buffers.gz");
DB::ZlibInflatingReadBuffer inflating_buf(std::move(buf), DB::CompressionMethod::Gzip);
stopwatch.restart();
for (size_t i = 0; i < n; ++i)
......
......@@ -3,6 +3,7 @@
#include <Storages/AlterCommands.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
......@@ -445,4 +446,21 @@ BlockInputStreams IStorage::read(
return res;
}
DB::CompressionMethod IStorage::chooseCompressionMethod(const String & uri, const String & compression_method)
{
if (compression_method == "auto" || compression_method == "")
{
if (endsWith(uri, ".gz"))
return DB::CompressionMethod::Gzip;
else
return DB::CompressionMethod::None;
}
else if (compression_method == "gzip")
return DB::CompressionMethod::Gzip;
else if (compression_method == "none")
return DB::CompressionMethod::None;
else
throw Exception("Only auto, none, gzip supported as compression method", ErrorCodes::NOT_IMPLEMENTED);
}
}
......@@ -5,6 +5,7 @@
#include <DataStreams/IBlockStream_fwd.h>
#include <Databases/IDatabase.h>
#include <Interpreters/CancellationCode.h>
#include <IO/CompressionMethod.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/TableStructureLockHolder.h>
......@@ -434,6 +435,8 @@ public:
return {};
}
static DB::CompressionMethod chooseCompressionMethod(const String & uri, const String & compression_method);
private:
/// You always need to take the next three locks in this order.
......
......@@ -127,9 +127,10 @@ StorageFile::StorageFile(
const std::string & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
Context & context_,
const String & compression_method_ = "")
:
table_name(table_name_), database_name(database_name_), format_name(format_name_), context_global(context_), table_fd(table_fd_)
table_name(table_name_), database_name(database_name_), format_name(format_name_), context_global(context_), table_fd(table_fd_), compression_method(compression_method_)
{
setColumns(columns_);
setConstraints(constraints_);
......@@ -178,7 +179,10 @@ StorageFile::StorageFile(
class StorageFileBlockInputStream : public IBlockInputStream
{
public:
StorageFileBlockInputStream(std::shared_ptr<StorageFile> storage_, const Context & context, UInt64 max_block_size, std::string file_path)
StorageFileBlockInputStream(std::shared_ptr<StorageFile> storage_,
const Context & context, UInt64 max_block_size,
std::string file_path,
const CompressionMethod compression_method)
: storage(std::move(storage_))
{
if (storage->use_table_fd)
......@@ -199,12 +203,12 @@ public:
}
storage->table_fd_was_used = true;
read_buf = std::make_unique<ReadBufferFromFileDescriptor>(storage->table_fd);
read_buf = getBuffer<ReadBufferFromFileDescriptor>(compression_method, storage->table_fd);
}
else
{
shared_lock = std::shared_lock(storage->rwlock);
read_buf = std::make_unique<ReadBufferFromFile>(file_path);
read_buf = getBuffer<ReadBufferFromFile>(compression_method, file_path);
}
reader = FormatFactory::instance().getInput(storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
......@@ -235,7 +239,7 @@ public:
private:
std::shared_ptr<StorageFile> storage;
Block sample_block;
std::unique_ptr<ReadBufferFromFileDescriptor> read_buf;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
std::shared_lock<std::shared_mutex> shared_lock;
......@@ -260,7 +264,7 @@ BlockInputStreams StorageFile::read(
for (const auto & file_path : paths)
{
BlockInputStreamPtr cur_block = std::make_shared<StorageFileBlockInputStream>(
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path);
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, IStorage::chooseCompressionMethod(file_path, compression_method));
blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
}
return blocks_input;
......@@ -270,7 +274,8 @@ BlockInputStreams StorageFile::read(
class StorageFileBlockOutputStream : public IBlockOutputStream
{
public:
explicit StorageFileBlockOutputStream(StorageFile & storage_)
explicit StorageFileBlockOutputStream(StorageFile & storage_,
const CompressionMethod compression_method)
: storage(storage_), lock(storage.rwlock)
{
if (storage.use_table_fd)
......@@ -280,13 +285,13 @@ public:
* INSERT data; SELECT *; last SELECT returns only insert_data
*/
storage.table_fd_was_used = true;
write_buf = std::make_unique<WriteBufferFromFileDescriptor>(storage.table_fd);
write_buf = getBuffer<WriteBufferFromFileDescriptor>(compression_method, storage.table_fd);
}
else
{
if (storage.paths.size() != 1)
throw Exception("Table '" + storage.table_name + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
write_buf = std::make_unique<WriteBufferFromFile>(storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
write_buf = getBuffer<WriteBufferFromFile>(compression_method, storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
}
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), storage.context_global);
......@@ -317,7 +322,7 @@ public:
private:
StorageFile & storage;
std::unique_lock<std::shared_mutex> lock;
std::unique_ptr<WriteBufferFromFileDescriptor> write_buf;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
};
......@@ -325,7 +330,8 @@ BlockOutputStreamPtr StorageFile::write(
const ASTPtr & /*query*/,
const Context & /*context*/)
{
return std::make_shared<StorageFileBlockOutputStream>(*this);
return std::make_shared<StorageFileBlockOutputStream>(*this,
IStorage::chooseCompressionMethod(paths[0], compression_method));
}
Strings StorageFile::getDataPaths() const
......@@ -361,9 +367,9 @@ void registerStorageFile(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (!(engine_args.size() == 1 || engine_args.size() == 2))
if (!(engine_args.size() >= 1 && engine_args.size() <= 3))
throw Exception(
"Storage File requires 1 or 2 arguments: name of used format and source.",
"Storage File requires from 1 to 3 arguments: name of used format, source and compression_method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
......@@ -371,6 +377,7 @@ void registerStorageFile(StorageFactory & factory)
int source_fd = -1;
String source_path;
String compression_method;
if (engine_args.size() >= 2)
{
/// Will use FD if engine_args[1] is int literal or identifier with std* name
......@@ -397,13 +404,19 @@ void registerStorageFile(StorageFactory & factory)
else if (type == Field::Types::String)
source_path = literal->value.get<String>();
}
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
}
return StorageFile::create(
source_path, source_fd,
args.data_path,
args.database_name, args.table_name, format_name, args.columns, args.constraints,
args.context);
args.context,
compression_method);
});
}
......
......@@ -60,7 +60,8 @@ protected:
const std::string & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
Context & context_,
const String & compression_method_);
private:
std::string table_name;
......@@ -69,6 +70,7 @@ private:
Context & context_global;
int table_fd = -1;
String compression_method;
std::vector<std::string> paths;
......
......@@ -13,7 +13,6 @@
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Common/parseGlobs.h>
#include <Poco/URI.h>
......@@ -36,12 +35,14 @@ StorageHDFS::StorageHDFS(const String & uri_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
Context & context_,
const String & compression_method_ = "")
: uri(uri_)
, format_name(format_name_)
, table_name(table_name_)
, database_name(database_name_)
, context(context_)
, compression_method(compression_method_)
{
setColumns(columns_);
setConstraints(constraints_);
......@@ -57,9 +58,11 @@ public:
const String & format,
const Block & sample_block,
const Context & context,
UInt64 max_block_size)
UInt64 max_block_size,
const CompressionMethod compression_method)
{
std::unique_ptr<ReadBuffer> read_buf = std::make_unique<ReadBufferFromHDFS>(uri);
auto read_buf = getBuffer<ReadBufferFromHDFS>(compression_method, uri);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
}
......@@ -99,10 +102,11 @@ public:
HDFSBlockOutputStream(const String & uri,
const String & format,
const Block & sample_block_,
const Context & context)
const Context & context,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = std::make_unique<WriteBufferFromHDFS>(uri);
write_buf = getBuffer<WriteBufferFromHDFS>(compression_method, uri);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......@@ -130,7 +134,7 @@ public:
private:
Block sample_block;
std::unique_ptr<WriteBufferFromHDFS> write_buf;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
};
......@@ -203,7 +207,7 @@ BlockInputStreams StorageHDFS::read(
for (const auto & res_path : res_paths)
{
result.push_back(std::make_shared<HDFSBlockInputStream>(uri_without_path + res_path, format_name, getSampleBlock(), context_,
max_block_size));
max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method)));
}
return result;
......@@ -217,7 +221,11 @@ void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_d
BlockOutputStreamPtr StorageHDFS::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<HDFSBlockOutputStream>(uri, format_name, getSampleBlock(), context);
return std::make_shared<HDFSBlockOutputStream>(uri,
format_name,
getSampleBlock(),
context,
IStorage::chooseCompressionMethod(uri, compression_method));
}
void registerStorageHDFS(StorageFactory & factory)
......@@ -226,9 +234,9 @@ void registerStorageHDFS(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2)
if (engine_args.size() != 2 && engine_args.size() != 3)
throw Exception(
"Storage HDFS requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage HDFS requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
......@@ -238,7 +246,14 @@ void registerStorageHDFS(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
String compression_method;
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
return StorageHDFS::create(url, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context, compression_method);
});
}
......
......@@ -39,7 +39,8 @@ protected:
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
Context & context_,
const String & compression_method_);
private:
String uri;
......@@ -47,6 +48,7 @@ private:
String table_name;
String database_name;
Context & context;
String compression_method;
Logger * log = &Logger::get("StorageHDFS");
};
......
......@@ -35,11 +35,11 @@ namespace
const Block & sample_block,
const Context & context,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: name(name_)
{
read_buf = std::make_unique<ReadBufferFromS3>(uri, timeouts);
read_buf = getBuffer<ReadBufferFromS3>(compression_method, uri, timeouts);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
......@@ -70,7 +70,7 @@ namespace
private:
String name;
std::unique_ptr<ReadBufferFromS3> read_buf;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
};
......@@ -82,10 +82,11 @@ namespace
UInt64 min_upload_part_size,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = std::make_unique<WriteBufferFromS3>(uri, min_upload_part_size, timeouts);
write_buf = getBuffer<WriteBufferFromS3>(compression_method, uri, min_upload_part_size, timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......@@ -113,7 +114,7 @@ namespace
private:
Block sample_block;
std::unique_ptr<WriteBufferFromS3> write_buf;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
};
}
......@@ -127,7 +128,8 @@ StorageS3::StorageS3(
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
Context & context_,
const String & compression_method_ = "")
: IStorage(columns_)
, uri(uri_)
, context_global(context_)
......@@ -135,6 +137,7 @@ StorageS3::StorageS3(
, database_name(database_name_)
, table_name(table_name_)
, min_upload_part_size(min_upload_part_size_)
, compression_method(compression_method_)
{
setColumns(columns_);
setConstraints(constraints_);
......@@ -156,7 +159,8 @@ BlockInputStreams StorageS3::read(
getHeaderBlock(column_names),
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context));
ConnectionTimeouts::getHTTPTimeouts(context),
IStorage::chooseCompressionMethod(uri.toString(), compression_method));
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
......@@ -173,7 +177,9 @@ void StorageS3::rename(const String & /*new_path_to_db*/, const String & new_dat
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<StorageS3BlockOutputStream>(
uri, format_name, min_upload_part_size, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global));
uri, format_name, min_upload_part_size, getSampleBlock(), context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global),
IStorage::chooseCompressionMethod(uri.toString(), compression_method));
}
void registerStorageS3(StorageFactory & factory)
......@@ -182,9 +188,9 @@ void registerStorageS3(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2)
if (engine_args.size() != 2 && engine_args.size() != 3)
throw Exception(
"Storage S3 requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage S3 requires 2 or 3 arguments: url, name of used format and compression_method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
......@@ -197,6 +203,13 @@ void registerStorageS3(StorageFactory & factory)
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
String compression_method;
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
return StorageS3::create(uri, args.database_name, args.table_name, format_name, min_upload_part_size, args.columns, args.constraints, args.context);
});
}
......
......@@ -24,7 +24,8 @@ public:
UInt64 min_upload_part_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_);
Context & context_,
const String & compression_method_);
String getName() const override
{
......@@ -61,6 +62,7 @@ private:
String database_name;
String table_name;
UInt64 min_upload_part_size;
String compression_method;
};
}
......@@ -31,8 +31,9 @@ IStorageURLBase::IStorageURLBase(
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_)
: uri(uri_), context_global(context_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
const ConstraintsDescription & constraints_,
const String & compression_method_)
: uri(uri_), context_global(context_), compression_method(compression_method_), format_name(format_name_), table_name(table_name_), database_name(database_name_)
{
setColumns(columns_);
setConstraints(constraints_);
......@@ -51,10 +52,11 @@ namespace
const Block & sample_block,
const Context & context,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: name(name_)
{
read_buf = std::make_unique<ReadWriteBufferFromHTTP>(uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects);
read_buf = getBuffer<ReadWriteBufferFromHTTP>(compression_method, uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
......@@ -85,7 +87,7 @@ namespace
private:
String name;
std::unique_ptr<ReadWriteBufferFromHTTP> read_buf;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
};
......@@ -96,10 +98,11 @@ namespace
const String & format,
const Block & sample_block_,
const Context & context,
const ConnectionTimeouts & timeouts)
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = std::make_unique<WriteBufferFromHTTP>(uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
write_buf = getBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......@@ -127,7 +130,7 @@ namespace
private:
Block sample_block;
std::unique_ptr<WriteBufferFromHTTP> write_buf;
std::unique_ptr<WriteBuffer> write_buf;
BlockOutputStreamPtr writer;
};
}
......@@ -177,8 +180,8 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
getHeaderBlock(column_names),
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context));
ConnectionTimeouts::getHTTPTimeouts(context),
IStorage::chooseCompressionMethod(request_uri.toString(), compression_method));
auto column_defaults = getColumns().getDefaults();
if (column_defaults.empty())
......@@ -195,7 +198,9 @@ void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & n
BlockOutputStreamPtr IStorageURLBase::write(const ASTPtr & /*query*/, const Context & /*context*/)
{
return std::make_shared<StorageURLBlockOutputStream>(
uri, format_name, getSampleBlock(), context_global, ConnectionTimeouts::getHTTPTimeouts(context_global));
uri, format_name, getSampleBlock(), context_global,
ConnectionTimeouts::getHTTPTimeouts(context_global),
IStorage::chooseCompressionMethod(uri.toString(), compression_method));
}
void registerStorageURL(StorageFactory & factory)
......@@ -204,9 +209,9 @@ void registerStorageURL(StorageFactory & factory)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 2)
if (engine_args.size() != 2 && engine_args.size() != 3)
throw Exception(
"Storage URL requires exactly 2 arguments: url and name of used format.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage URL requires 2 or 3 arguments: url, name of used format and optional compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
engine_args[0] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[0], args.local_context);
......@@ -217,7 +222,19 @@ void registerStorageURL(StorageFactory & factory)
String format_name = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
return StorageURL::create(uri, args.database_name, args.table_name, format_name, args.columns, args.constraints, args.context);
String compression_method;
if (engine_args.size() == 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
compression_method = engine_args[2]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
return StorageURL::create(
uri,
args.database_name, args.table_name,
format_name,
args.columns, args.constraints, args.context,
compression_method);
});
}
}
......@@ -39,10 +39,12 @@ protected:
const std::string & table_name_,
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_);
const ConstraintsDescription & constraints_,
const String & compression_method_);
Poco::URI uri;
const Context & context_global;
String compression_method;
private:
String format_name;
......@@ -80,8 +82,9 @@ public:
const String & format_name_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
Context & context_)
: IStorageURLBase(uri_, context_, database_name_, table_name_, format_name_, columns_, constraints_)
Context & context_,
const String & compression_method_)
: IStorageURLBase(uri_, context_, database_name_, table_name_, format_name_, columns_, constraints_, compression_method_)
{
}
......
......@@ -7,6 +7,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
......@@ -31,7 +32,7 @@ StorageXDBC::StorageXDBC(
const Context & context_,
const BridgeHelperPtr bridge_helper_)
/// Please add support for constraints as soon as StorageODBC or JDBC will support insertion.
: IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_, ConstraintsDescription{})
: IStorageURLBase(Poco::URI(), context_, database_name_, table_name_, IXDBCBridgeHelper::DEFAULT_FORMAT, columns_, ConstraintsDescription{}, "" /* CompressionMethod */)
, bridge_helper(bridge_helper_)
, remote_database_name(remote_database_name_)
, remote_table_name(remote_table_name_)
......
......@@ -32,21 +32,27 @@ StoragePtr ITableFunctionFileLike::executeImpl(const ASTPtr & ast_function, cons
ASTs & args = args_func.at(0)->children;
if (args.size() != 3)
throw Exception("Table function '" + getName() + "' requires exactly 3 arguments: filename, format and structure.",
if (args.size() != 3 && args.size() != 4)
throw Exception("Table function '" + getName() + "' requires 3 or 4 arguments: filename, format, structure and compression method (default none).",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < 3; ++i)
for (size_t i = 0; i < args.size(); ++i)
args[i] = evaluateConstantExpressionOrIdentifierAsLiteral(args[i], context);
std::string filename = args[0]->as<ASTLiteral &>().value.safeGet<String>();
std::string format = args[1]->as<ASTLiteral &>().value.safeGet<String>();
std::string structure = args[2]->as<ASTLiteral &>().value.safeGet<String>();
std::string compression_method;
if (args.size() == 4)
{
compression_method = args[3]->as<ASTLiteral &>().value.safeGet<String>();
} else compression_method = "auto";
ColumnsDescription columns = parseColumnsListFromString(structure, context);
/// Create table
StoragePtr storage = getStorage(filename, format, columns, const_cast<Context &>(context), table_name);
StoragePtr storage = getStorage(filename, format, columns, const_cast<Context &>(context), table_name, compression_method);
storage->startup();
......
......@@ -16,6 +16,6 @@ class ITableFunctionFileLike : public ITableFunction
private:
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
virtual StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const = 0;
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const = 0;
};
}
......@@ -6,7 +6,7 @@
namespace DB
{
StoragePtr TableFunctionFile::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const
{
return StorageFile::create(source,
-1,
......@@ -16,7 +16,8 @@ StoragePtr TableFunctionFile::getStorage(
format,
columns,
ConstraintsDescription{},
global_context);
global_context,
compression_method);
}
void registerTableFunctionFile(TableFunctionFactory & factory)
......
......@@ -23,6 +23,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const override;
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const std::string & compression_method) const override;
};
}
......@@ -9,7 +9,7 @@
namespace DB
{
StoragePtr TableFunctionHDFS::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
{
return StorageHDFS::create(source,
getDatabaseName(),
......@@ -17,7 +17,8 @@ StoragePtr TableFunctionHDFS::getStorage(
format,
columns,
ConstraintsDescription{},
global_context);
global_context,
compression_method);
}
void registerTableFunctionHDFS(TableFunctionFactory & factory)
......
......@@ -24,7 +24,7 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const override;
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const override;
};
}
......
......@@ -7,11 +7,11 @@ namespace DB
{
StoragePtr TableFunctionS3::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
{
Poco::URI uri(source);
UInt64 min_upload_part_size = global_context.getSettingsRef().s3_min_upload_part_size;
return StorageS3::create(uri, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context);
return StorageS3::create(uri, getDatabaseName(), table_name, format, min_upload_part_size, columns, ConstraintsDescription{}, global_context, compression_method);
}
void registerTableFunctionS3(TableFunctionFactory & factory)
......
......@@ -25,7 +25,8 @@ private:
const String & format,
const ColumnsDescription & columns,
Context & global_context,
const std::string & table_name) const override;
const std::string & table_name,
const String & compression_method) const override;
};
}
......@@ -8,10 +8,10 @@
namespace DB
{
StoragePtr TableFunctionURL::getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const
{
Poco::URI uri(source);
return StorageURL::create(uri, getDatabaseName(), table_name, format, columns, ConstraintsDescription{}, global_context);
return StorageURL::create(uri, getDatabaseName(), table_name, format, columns, ConstraintsDescription{}, global_context, compression_method);
}
void registerTableFunctionURL(TableFunctionFactory & factory)
......
......@@ -19,6 +19,6 @@ public:
private:
StoragePtr getStorage(
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name) const override;
const String & source, const String & format, const ColumnsDescription & columns, Context & global_context, const std::string & table_name, const String & compression_method) const override;
};
}
#-*- coding: utf-8 -*-
import StringIO
import gzip
import requests
import subprocess
from tempfile import NamedTemporaryFile
......@@ -19,7 +21,7 @@ class HDFSApi(object):
if response_data.status_code != 200:
response_data.raise_for_status()
return response_data.text
return response_data.content
# Requests can't put file
def _curl_to_put(self, filename, path, params):
......@@ -44,3 +46,12 @@ class HDFSApi(object):
output = self._curl_to_put(fpath, path, additional_params)
if "201 Created" not in output:
raise Exception("Can't create file on hdfs:\n {}".format(output))
def write_gzip_data(self, path, content):
out = StringIO.StringIO()
with gzip.GzipFile(fileobj=out, mode="w") as f:
f.write(content)
self.write_data(path, out.getvalue())
def read_gzip_data(self, path):
return gzip.GzipFile(fileobj=StringIO.StringIO(self.read_data(path))).read()
......@@ -133,4 +133,56 @@ def test_globs_in_read_table(started_cluster):
("?", 0)]
for pattern, value in test_requests:
assert node1.query("select * from hdfs('hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64')") == value * some_data
\ No newline at end of file
assert node1.query("select * from hdfs('hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64')") == value * some_data
def test_read_write_gzip_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query("select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function", data)
assert hdfs_api.read_gzip_data("/simple_table_function") == data
assert node1.query("select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64', 'gzip')") == data
def test_read_write_table_with_parameter_none(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_data("/simple_table_function.gz", data)
assert hdfs_api.read_data("/simple_table_function.gz") == data
assert node1.query("select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'none')") == data
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query("select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'auto')") == data
def test_write_gz_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query("create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')")
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def test_write_gzip_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query("create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')")
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
......@@ -2,7 +2,7 @@ drop table if exists test_table_hdfs_syntax
;
create table test_table_hdfs_syntax (id UInt32) ENGINE = HDFS('')
; -- { serverError 42 }
create table test_table_hdfs_syntax (id UInt32) ENGINE = HDFS('','','')
create table test_table_hdfs_syntax (id UInt32) ENGINE = HDFS('','','', '')
; -- { serverError 42 }
drop table if exists test_table_hdfs_syntax
;
......@@ -2,7 +2,7 @@ drop table if exists test_table_s3_syntax
;
create table test_table_s3_syntax (id UInt32) ENGINE = S3('')
; -- { serverError 42 }
create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','')
create table test_table_s3_syntax (id UInt32) ENGINE = S3('','','','')
; -- { serverError 42 }
drop table if exists test_table_s3_syntax
;
......@@ -2,7 +2,7 @@ drop table if exists test_table_url_syntax
;
create table test_table_url_syntax (id UInt32) ENGINE = URL('')
; -- { serverError 42 }
create table test_table_url_syntax (id UInt32) ENGINE = URL('','','')
create table test_table_url_syntax (id UInt32) ENGINE = URL('','','','')
; -- { serverError 42 }
drop table if exists test_table_url_syntax
;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册