未验证 提交 6fc225e6 编写于 作者: A Amos Bird 提交者: GitHub

Distributed insertion to one random shard (#18294)

* Distributed insertion to one random shard

* add some tests

* add some documentation

* Respect shards' weights

* fine locking
Co-authored-by: NIvan Lezhankin <ilezhankin@yandex-team.ru>
上级 51481c9c
......@@ -1855,6 +1855,18 @@ Default value: `0`.
- [Distributed Table Engine](../../engines/table-engines/special/distributed.md#distributed)
- [Managing Distributed Tables](../../sql-reference/statements/system.md#query-language-system-distributed)
## insert_distributed_one_random_shard {#insert_distributed_one_random_shard}
Enables or disables random shard insertion into a [Distributed](../../engines/table-engines/special/distributed.md#distributed) table when there is no distributed key.
By default, when inserting data into a `Distributed` table with more than one shard, the ClickHouse server will any insertion request if there is no distributed key. When `insert_distributed_one_random_shard = 1`, insertions are allowed and data is forwarded randomly among all shards.
Possible values:
- 0 — Insertion is rejected if there are multiple shards and no distributed key is given.
- 1 — Insertion is done randomly among all available shards when no distributed key is given.
Default value: `0`.
## use_compact_format_in_distributed_parts_names {#use_compact_format_in_distributed_parts_names}
......
......@@ -490,7 +490,9 @@ class IColumn;
\
M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \
M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0)
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.
......
......@@ -138,11 +138,22 @@ void DistributedBlockOutputStream::write(const Block & block)
void DistributedBlockOutputStream::writeAsync(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
const Settings & settings = context.getSettingsRef();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
writeAsyncImpl(block);
++inserted_blocks;
if (random_shard_insert)
{
writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo()));
}
else
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
return writeSplitAsync(block);
writeAsyncImpl(block);
++inserted_blocks;
}
}
......@@ -175,18 +186,18 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
}
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block)
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end)
{
const Settings & settings = context.getSettingsRef();
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
size_t num_shards = end - start;
remote_jobs_count = 0;
local_jobs_count = 0;
per_shard_jobs.resize(shards_info.size());
for (size_t shard_index : ext::range(0, shards_info.size()))
for (size_t shard_index : ext::range(start, end))
{
const auto & shard_info = shards_info[shard_index];
auto & shard_jobs = per_shard_jobs[shard_index];
......@@ -242,10 +253,11 @@ void DistributedBlockOutputStream::waitForJobs()
}
ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block)
ThreadPool::Job
DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards)
{
auto thread_group = CurrentThread::getGroup();
return [this, thread_group, &job, &current_block]()
return [this, thread_group, &job, &current_block, num_shards]()
{
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
......@@ -262,7 +274,6 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
});
const auto & shard_info = cluster->getShardsInfo()[job.shard_index];
size_t num_shards = cluster->getShardsInfo().size();
auto & shard_job = per_shard_jobs[job.shard_index];
const auto & addresses = cluster->getShardsAddresses();
......@@ -356,12 +367,19 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
{
const Settings & settings = context.getSettingsRef();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;
size_t start = 0, end = shards_info.size();
if (random_shard_insert)
{
start = storage.getRandomShardIndex(shards_info);
end = start + 1;
}
size_t num_shards = end - start;
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
initWritingJobs(block);
initWritingJobs(block, start, end);
pool.emplace(remote_jobs_count + local_jobs_count);
......@@ -394,7 +412,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block));
pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards));
}
catch (...)
{
......
......@@ -73,10 +73,10 @@ private:
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeSync(const Block & block);
void initWritingJobs(const Block & first_block);
void initWritingJobs(const Block & first_block, size_t start, size_t end);
struct JobReplica;
ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block);
ThreadPool::Job runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards);
void waitForJobs();
......
......@@ -17,6 +17,7 @@
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <Common/randomSeed.h>
#include <Parsers/ASTDropQuery.h>
#include <Parsers/ASTExpressionList.h>
......@@ -373,6 +374,7 @@ StorageDistributed::StorageDistributed(
, cluster_name(global_context.getMacros()->expand(cluster_name_))
, has_sharding_key(sharding_key_)
, relative_data_path(relative_data_path_)
, rng(randomSeed())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
......@@ -543,7 +545,8 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta
}
/// If sharding key is not specified, then you can only write to a shard containing only one shard
if (!has_sharding_key && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
if (!settings.insert_distributed_one_random_shard && !has_sharding_key
&& ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2))
{
throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided",
ErrorCodes::STORAGE_REQUIRES_PARAMETER);
......@@ -890,6 +893,32 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto
}
size_t StorageDistributed::getRandomShardIndex(const Cluster::ShardsInfo & shards)
{
UInt32 total_weight = 0;
for (const auto & shard : shards)
total_weight += shard.weight;
assert(total_weight > 0);
size_t res;
{
std::lock_guard lock(rng_mutex);
res = std::uniform_int_distribution<size_t>(0, total_weight - 1)(rng);
}
for (auto i = 0ul, s = shards.size(); i < s; ++i)
{
if (shards[i].weight > res)
return i;
res -= shards[i].weight;
}
__builtin_unreachable();
}
void StorageDistributed::renameOnDisk(const String & new_path_to_table_data)
{
for (const DiskPtr & disk : data_volume->getDisks())
......
......@@ -10,7 +10,9 @@
#include <Parsers/ASTFunction.h>
#include <common/logger_useful.h>
#include <Common/ActionBlocker.h>
#include <Interpreters/Cluster.h>
#include <pcg_random.hpp>
namespace DB
{
......@@ -24,9 +26,6 @@ using VolumePtr = std::shared_ptr<IVolume>;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
/** A distributed table that resides on multiple servers.
* Uses data from the specified database and tables on each server.
*
......@@ -126,6 +125,8 @@ public:
NamesAndTypesList getVirtuals() const override;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
String remote_database;
String remote_table;
ASTPtr remote_table_function_ptr;
......@@ -198,6 +199,9 @@ protected:
std::unordered_map<std::string, ClusterNodeData> cluster_nodes_data;
mutable std::mutex cluster_nodes_mutex;
// For random shard index generation
mutable std::mutex rng_mutex;
pcg64 rng;
};
}
drop table if exists shard;
drop table if exists distr;
create table shard (id Int32) engine = MergeTree order by cityHash64(id);
create table distr as shard engine Distributed (test_cluster_two_shards_localhost, currentDatabase(), shard);
insert into distr (id) values (0), (1); -- { serverError 55; }
set insert_distributed_sync = 1;
insert into distr (id) values (0), (1); -- { serverError 55; }
set insert_distributed_sync = 0;
set insert_distributed_one_random_shard = 1;
insert into distr (id) values (0), (1);
insert into distr (id) values (2), (3);
select * from distr order by id;
drop table if exists shard;
drop table if exists distr;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册