提交 c2849d08 编写于 作者: A Andrei Bodrov

move to helpers

上级 53b02227
......@@ -5,9 +5,7 @@
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/ExecutionSpeedLimits.h>
#include <IO/CompressionMethod.h>
#include <IO/Progress.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <Storages/TableStructureLockHolder.h>
#include <Common/TypePromotion.h>
......@@ -230,17 +228,6 @@ public:
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
template <class TReadBuffer, class... Types>
std::unique_ptr<ReadBuffer> getBuffer(const DB::CompressionMethod method, Types... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(args...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
}
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
......
......@@ -3,8 +3,6 @@
#include <Core/Block.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Storages/TableStructureLockHolder.h>
#include <IO/CompressionMethod.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <boost/noncopyable.hpp>
......@@ -65,17 +63,6 @@ public:
*/
void addTableLock(const TableStructureReadLockHolder & lock) { table_locks.push_back(lock); }
template <class TWriteBuffer, class... Types>
std::unique_ptr<WriteBuffer> getBuffer(const DB::CompressionMethod method, Types... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto write_buf = std::make_unique<TWriteBuffer>(args...);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(write_buf), method, 1 /* compression level */);
}
return std::make_unique<TWriteBuffer>(args...);
}
virtual void finalize() {}
private:
......
......@@ -23,9 +23,11 @@
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/VarInt.h>
#include <IO/ZlibInflatingReadBuffer.h>
#ifdef __clang__
#pragma clang diagnostic push
......@@ -911,4 +913,15 @@ void skipToNextLineOrEOF(ReadBuffer & buf);
/// Skip to next character after next unescaped \n. If no \n in stream, skip to end. Does not throw on invalid escape sequences.
void skipToUnescapedNextLineOrEOF(ReadBuffer & buf);
template <class TReadBuffer, class... Types>
std::unique_ptr<ReadBuffer> getReadBuffer(const DB::CompressionMethod method, Types... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto read_buf = std::make_unique<TReadBuffer>(args...);
return std::make_unique<ZlibInflatingReadBuffer>(std::move(read_buf), method);
}
return std::make_unique<TReadBuffer>(args...);
}
}
......@@ -20,11 +20,13 @@
#include <Common/UInt128.h>
#include <Common/intExp.h>
#include <IO/CompressionMethod.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteIntText.h>
#include <IO/VarInt.h>
#include <IO/DoubleConverter.h>
#include <IO/WriteBufferFromString.h>
#include <IO/ZlibDeflatingWriteBuffer.h>
#include <Formats/FormatSettings.h>
......@@ -905,4 +907,16 @@ inline String toString(const T & x)
writeText(x, buf);
return buf.str();
}
template <class TWriteBuffer, class... Types>
std::unique_ptr<WriteBuffer> getWriteBuffer(const DB::CompressionMethod method, Types... args)
{
if (method == DB::CompressionMethod::Gzip)
{
auto write_buf = std::make_unique<TWriteBuffer>(args...);
return std::make_unique<ZlibDeflatingWriteBuffer>(std::move(write_buf), method, 1 /* compression level */);
}
return std::make_unique<TWriteBuffer>(args...);
}
}
......@@ -8,6 +8,7 @@
#include <Parsers/ASTIdentifier.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
......@@ -203,12 +204,12 @@ public:
}
storage->table_fd_was_used = true;
read_buf = getBuffer<ReadBufferFromFileDescriptor>(compression_method, storage->table_fd);
read_buf = getReadBuffer<ReadBufferFromFileDescriptor>(compression_method, storage->table_fd);
}
else
{
shared_lock = std::shared_lock(storage->rwlock);
read_buf = getBuffer<ReadBufferFromFile>(compression_method, file_path);
read_buf = getReadBuffer<ReadBufferFromFile>(compression_method, file_path);
}
reader = FormatFactory::instance().getInput(storage->format_name, *read_buf, storage->getSampleBlock(), context, max_block_size);
......@@ -285,13 +286,13 @@ public:
* INSERT data; SELECT *; last SELECT returns only insert_data
*/
storage.table_fd_was_used = true;
write_buf = getBuffer<WriteBufferFromFileDescriptor>(compression_method, storage.table_fd);
write_buf = getWriteBuffer<WriteBufferFromFileDescriptor>(compression_method, storage.table_fd);
}
else
{
if (storage.paths.size() != 1)
throw Exception("Table '" + storage.table_name + "' is in readonly mode because of globs in filepath", ErrorCodes::DATABASE_ACCESS_DENIED);
write_buf = getBuffer<WriteBufferFromFile>(compression_method, storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
write_buf = getWriteBuffer<WriteBufferFromFile>(compression_method, storage.paths[0], DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
}
writer = FormatFactory::instance().getOutput(storage.format_name, *write_buf, storage.getSampleBlock(), storage.context_global);
......
......@@ -7,8 +7,10 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/WriteHelpers.h>
#include <IO/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
......@@ -61,7 +63,7 @@ public:
UInt64 max_block_size,
const CompressionMethod compression_method)
{
auto read_buf = getBuffer<ReadBufferFromHDFS>(compression_method, uri);
auto read_buf = getReadBuffer<ReadBufferFromHDFS>(compression_method, uri);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
......@@ -106,7 +108,7 @@ public:
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = getBuffer<WriteBufferFromHDFS>(compression_method, uri);
write_buf = getWriteBuffer<WriteBufferFromHDFS>(compression_method, uri);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......
......@@ -6,7 +6,9 @@
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
......@@ -39,7 +41,7 @@ namespace
const CompressionMethod compression_method)
: name(name_)
{
read_buf = getBuffer<ReadBufferFromS3>(compression_method, uri, timeouts);
read_buf = getReadBuffer<ReadBufferFromS3>(compression_method, uri, timeouts);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
......@@ -86,7 +88,7 @@ namespace
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = getBuffer<WriteBufferFromS3>(compression_method, uri, min_upload_part_size, timeouts);
write_buf = getWriteBuffer<WriteBufferFromS3>(compression_method, uri, min_upload_part_size, timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......
......@@ -5,8 +5,10 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h>
......@@ -56,7 +58,7 @@ namespace
const CompressionMethod compression_method)
: name(name_)
{
read_buf = getBuffer<ReadWriteBufferFromHTTP>(compression_method, uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects);
read_buf = getReadBuffer<ReadWriteBufferFromHTTP>(compression_method, uri, method, callback, timeouts, context.getSettingsRef().max_http_get_redirects);
reader = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
}
......@@ -102,7 +104,7 @@ namespace
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = getBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
write_buf = getWriteBuffer<WriteBufferFromHTTP>(compression_method, uri, Poco::Net::HTTPRequest::HTTP_POST, timeouts);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册