提交 cbf82827 编写于 作者: V Vxider

add disable_set_and_join_persistency

上级 c87f50bc
......@@ -171,6 +171,7 @@ class IColumn;
M(Bool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.", 0) \
M(Int64, http_zlib_compression_level, 3, "Compression level - used if the client on HTTP said that it understands data compressed by gzip or deflate.", 0) \
\
M(Bool, disable_set_and_join_persistency, false, "Disable persistency for StorageSet and StorageJoin to reduce IO overhead", 0) \
M(Bool, http_native_compression_disable_checksumming_on_decompress, 0, "If you uncompress the POST data from the client compressed by the native format, do not check the checksum.", 0) \
\
M(String, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)", 0) \
......
......@@ -44,8 +44,9 @@ StorageJoin::StorageJoin(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite_,
bool disable_set_and_join_persistency_,
const Context & context_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_}
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, disable_set_and_join_persistency_, context_}
, key_names(key_names_)
, use_nulls(use_nulls_)
, limits(limits_)
......@@ -118,6 +119,7 @@ void registerStorageJoin(StorageFactory & factory)
auto join_overflow_mode = settings.join_overflow_mode;
auto join_any_take_last_row = settings.join_any_take_last_row;
auto old_any_join = settings.any_join_distinct_right_table_keys;
auto disable_set_and_join_persistency = settings.disable_set_and_join_persistency;
if (args.storage_def && args.storage_def->settings)
{
......@@ -135,6 +137,8 @@ void registerStorageJoin(StorageFactory & factory)
join_any_take_last_row = setting.value;
else if (setting.name == "any_join_distinct_right_table_keys")
old_any_join = setting.value;
else if (setting.name == "disable_set_and_join_persistency")
disable_set_and_join_persistency = setting.value;
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + args.engine_name,
......@@ -217,6 +221,7 @@ void registerStorageJoin(StorageFactory & factory)
args.columns,
args.constraints,
join_any_take_last_row,
disable_set_and_join_persistency,
args.context);
};
......
......@@ -72,6 +72,7 @@ protected:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool overwrite,
bool disable_set_and_join_persistency_,
const Context & context_);
};
......
......@@ -12,6 +12,8 @@
#include <Interpreters/Set.h>
#include <Interpreters/Context.h>
#include <Poco/DirectoryIterator.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
namespace DB
......@@ -35,7 +37,7 @@ public:
SetOrJoinBlockOutputStream(
StorageSetOrJoinBase & table_, const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_, const String & backup_tmp_path_,
const String & backup_file_name_);
const String & backup_file_name_, bool disable_set_and_join_persistency_);
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
......@@ -50,6 +52,7 @@ private:
WriteBufferFromFile backup_buf;
CompressedWriteBuffer compressed_backup_buf;
NativeBlockOutputStream backup_stream;
bool disable_set_and_join_persistency;
};
......@@ -58,7 +61,8 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
const StorageMetadataPtr & metadata_snapshot_,
const String & backup_path_,
const String & backup_tmp_path_,
const String & backup_file_name_)
const String & backup_file_name_,
bool disable_set_and_join_persistency_)
: table(table_)
, metadata_snapshot(metadata_snapshot_)
, backup_path(backup_path_)
......@@ -67,6 +71,7 @@ SetOrJoinBlockOutputStream::SetOrJoinBlockOutputStream(
, backup_buf(backup_tmp_path + backup_file_name)
, compressed_backup_buf(backup_buf)
, backup_stream(compressed_backup_buf, 0, metadata_snapshot->getSampleBlock())
, disable_set_and_join_persistency(disable_set_and_join_persistency_)
{
}
......@@ -76,24 +81,28 @@ void SetOrJoinBlockOutputStream::write(const Block & block)
Block sorted_block = block.sortColumns();
table.insertBlock(sorted_block);
backup_stream.write(sorted_block);
if(!disable_set_and_join_persistency)
backup_stream.write(sorted_block);
}
void SetOrJoinBlockOutputStream::writeSuffix()
{
table.finishInsert();
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
if(!disable_set_and_join_persistency)
{
backup_stream.flush();
compressed_backup_buf.next();
backup_buf.next();
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
Poco::File(backup_tmp_path + backup_file_name).renameTo(backup_path + backup_file_name);
}
}
BlockOutputStreamPtr StorageSetOrJoinBase::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
UInt64 id = ++increment;
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin");
return std::make_shared<SetOrJoinBlockOutputStream>(*this, metadata_snapshot, path, path + "tmp/", toString(id) + ".bin", disable_set_and_join_persistency);
}
......@@ -102,8 +111,10 @@ StorageSetOrJoinBase::StorageSetOrJoinBase(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool disable_set_and_join_persistency_,
const Context & context_)
: IStorage(table_id_)
: IStorage(table_id_),
disable_set_and_join_persistency(disable_set_and_join_persistency_)
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
......@@ -124,8 +135,9 @@ StorageSet::StorageSet(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool disable_set_and_join_persistency_,
const Context & context_)
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_},
: StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, disable_set_and_join_persistency_, context_},
set(std::make_shared<Set>(SizeLimits(), false, true))
{
......@@ -229,8 +241,24 @@ void registerStorageSet(StorageFactory & factory)
"Engine " + args.engine_name + " doesn't support any arguments (" + toString(args.engine_args.size()) + " given)",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, args.context);
});
const auto & settings = args.context.getSettingsRef();
auto disable_set_and_join_persistency = settings.disable_set_and_join_persistency;
if (args.storage_def && args.storage_def->settings)
{
for (const auto & setting : args.storage_def->settings->changes)
{
if (setting.name == "disable_set_and_join_persistency")
disable_set_and_join_persistency = setting.value;
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + args.engine_name,
ErrorCodes::BAD_ARGUMENTS);
}
}
return StorageSet::create(args.relative_data_path, args.table_id, args.columns, args.constraints, disable_set_and_join_persistency, args.context);
}, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
......
......@@ -31,10 +31,12 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool disable_set_and_join_persistency_,
const Context & context_);
String base_path;
String path;
bool disable_set_and_join_persistency;
std::atomic<UInt64> increment = 0; /// For the backup file names.
......@@ -82,6 +84,7 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool disable_set_and_join_persistency_,
const Context & context_);
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册