未验证 提交 03b01fc2 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #2118 from yandex/faster-sync-insert

Faster distributed sync insert
......@@ -1128,7 +1128,7 @@ protected:
catch (const zkutil::KeeperException & e)
{
LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name
<< ". Will recheck the partition. Error: " << e.what());
<< ". Will recheck the partition. Error: " << e.displayText());
return false;
}
......@@ -1259,8 +1259,10 @@ protected:
}
/// Remove the locking node
cleaner_holder.reset();
zookeeper->remove(is_dirty_flag_path);
zkutil::Ops ops;
ops.emplace_back(new zkutil::Op::Remove(dirt_cleaner_path, -1));
ops.emplace_back(new zkutil::Op::Remove(is_dirty_flag_path, -1));
zookeeper->multi(ops);
LOG_INFO(log, "Partition " << task_partition.name << " was dropped on cluster " << task_table.cluster_push_name);
return true;
......@@ -1283,6 +1285,7 @@ protected:
Stopwatch watch;
TasksShard expected_shards;
size_t num_failed_shards = 0;
bool previous_shard_is_instantly_finished = false;
++cluster_partition.total_tries;
......@@ -1328,15 +1331,20 @@ protected:
expected_shards.emplace_back(shard);
/// Do not sleep if there is a sequence of already processed shards to increase startup
bool sleep_before_execution = !previous_shard_is_instantly_finished && shard->priority.is_remote;
PartitionTaskStatus task_status = PartitionTaskStatus::Error;
bool was_error = false;
for (size_t try_num = 0; try_num < max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(partition);
task_status = tryProcessPartitionTask(partition, sleep_before_execution);
/// Exit if success
if (task_status == PartitionTaskStatus::Finished)
break;
was_error = true;
/// Skip if the task is being processed by someone
if (task_status == PartitionTaskStatus::Active)
break;
......@@ -1347,6 +1355,8 @@ protected:
if (task_status == PartitionTaskStatus::Error)
++num_failed_shards;
previous_shard_is_instantly_finished = !was_error;
}
cluster_partition.elapsed_time_seconds += watch.elapsedSeconds();
......@@ -1413,13 +1423,13 @@ protected:
Error,
};
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition)
PartitionTaskStatus tryProcessPartitionTask(ShardPartition & task_partition, bool sleep_before_execution)
{
PartitionTaskStatus res;
try
{
res = processPartitionTaskImpl(task_partition);
res = processPartitionTaskImpl(task_partition, sleep_before_execution);
}
catch (...)
{
......@@ -1440,7 +1450,7 @@ protected:
return res;
}
PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition)
PartitionTaskStatus processPartitionTaskImpl(ShardPartition & task_partition, bool sleep_before_execution)
{
TaskShard & task_shard = task_partition.task_shard;
TaskTable & task_table = task_shard.task_table;
......@@ -1480,7 +1490,7 @@ protected:
};
/// Load balancing
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, task_shard.priority.is_remote);
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, sleep_before_execution);
LOG_DEBUG(log, "Processing " << current_task_status_path);
......@@ -1654,7 +1664,7 @@ protected:
}
using ExistsFuture = zkutil::ZooKeeper::ExistsFuture;
auto future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
std::unique_ptr<ExistsFuture> future_is_dirty_checker;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
constexpr size_t check_period_milliseconds = 500;
......@@ -1665,9 +1675,15 @@ protected:
if (zookeeper->expired())
throw Exception("ZooKeeper session is expired, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
if (future_is_dirty_checker != nullptr)
if (!future_is_dirty_checker)
future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
/// check_period_milliseconds should less than average insert time of single block
/// Otherwise, the insertion will slow a little bit
if (watch.elapsedMilliseconds() >= check_period_milliseconds)
{
zkutil::ZooKeeper::StatAndExists status;
try
{
status = future_is_dirty_checker->get();
......@@ -1687,12 +1703,6 @@ protected:
throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
}
if (watch.elapsedMilliseconds() >= check_period_milliseconds)
{
watch.restart();
future_is_dirty_checker = std::make_unique<ExistsFuture>(zookeeper->asyncExists(is_dirty_flag_path));
}
return false;
};
......
......@@ -89,7 +89,7 @@ void DistributedBlockOutputStream::writeAsync(const Block & block)
return writeSplitAsync(block);
writeAsyncImpl(block);
++blocks_inserted;
++inserted_blocks;
}
......@@ -100,17 +100,17 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
buffer << "Insertion status:\n";
for (auto & shard_jobs : per_shard_jobs)
for (JobInfo & job : shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
{
buffer << "Wrote " << job.blocks_written << " blocks and " << job.rows_written << " rows"
<< " on shard " << job.shard_index << " replica " << job.replica_index
<< ", " << addresses[job.shard_index][job.replica_index].readableString();
/// Performance statistics
if (job.bloks_started > 0)
if (job.blocks_started > 0)
{
buffer << " (average " << job.elapsed_time_ms / job.bloks_started << " ms per block";
if (job.bloks_started > 1)
buffer << " (average " << job.elapsed_time_ms / job.blocks_started << " ms per block";
if (job.blocks_started > 1)
buffer << ", the slowest block " << job.max_elapsed_time_for_block_ms << " ms";
buffer << ")";
}
......@@ -122,10 +122,11 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription()
}
void DistributedBlockOutputStream::initWritingJobs()
void DistributedBlockOutputStream::initWritingJobs(const Block & first_block)
{
const auto & addresses_with_failovers = cluster->getShardsAddresses();
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
remote_jobs_count = 0;
local_jobs_count = 0;
......@@ -145,7 +146,7 @@ void DistributedBlockOutputStream::initWritingJobs()
{
if (!replicas[replica_index].is_local)
{
shard_jobs.emplace_back(shard_index, replica_index, false);
shard_jobs.replicas_jobs.emplace_back(shard_index, replica_index, false, first_block);
++remote_jobs_count;
if (shard_info.hasInternalReplication())
......@@ -156,9 +157,12 @@ void DistributedBlockOutputStream::initWritingJobs()
if (shard_info.isLocal())
{
shard_jobs.emplace_back(shard_index, 0, true);
shard_jobs.replicas_jobs.emplace_back(shard_index, 0, true, first_block);
++local_jobs_count;
}
if (num_shards > 1)
shard_jobs.shard_current_block_permuation.reserve(first_block.rows());
}
}
......@@ -184,18 +188,17 @@ void DistributedBlockOutputStream::waitForJobs()
}
ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobInfo & job)
ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block)
{
auto memory_tracker = current_memory_tracker;
return [this, memory_tracker, &job]()
return [this, memory_tracker, &job, &current_block]()
{
SCOPE_EXIT({++finished_jobs_count;});
Stopwatch watch;
++job.bloks_started;
++job.blocks_started;
SCOPE_EXIT({
UInt64 elapsed_time_for_block_ms = watch.elapsedMilliseconds();
++finished_jobs_count;
UInt64 elapsed_time_for_block_ms = watch_current_block.elapsedMilliseconds();
job.elapsed_time_ms += elapsed_time_for_block_ms;
job.max_elapsed_time_for_block_ms = std::max(job.max_elapsed_time_for_block_ms, elapsed_time_for_block_ms);
});
......@@ -207,8 +210,30 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
}
const auto & shard_info = cluster->getShardsInfo()[job.shard_index];
size_t num_shards = cluster->getShardsInfo().size();
auto & shard_job = per_shard_jobs[job.shard_index];
const auto & addresses = cluster->getShardsAddresses();
Block & block = current_blocks.at(job.shard_index);
/// Generate current shard block
if (num_shards > 1)
{
auto & shard_permutation = shard_job.shard_current_block_permuation;
size_t num_shard_rows = shard_permutation.size();
for (size_t j = 0; j < current_block.columns(); ++j)
{
auto & src_column = current_block.getByPosition(j).column;
auto & dst_column = job.current_shard_block.getByPosition(j).column;
/// Zero permutation size has special meaning in IColumn::permute
if (num_shard_rows)
dst_column = src_column->permute(shard_permutation, num_shard_rows);
else
dst_column = src_column->cloneEmpty();
}
}
const Block & shard_block = (num_shards > 1) ? job.current_shard_block : current_block;
if (!job.is_local_job)
{
......@@ -217,7 +242,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
if (shard_info.hasInternalReplication())
{
/// Skip replica_index in case of internal replication
if (per_shard_jobs[job.shard_index].size() != 1)
if (shard_job.replicas_jobs.size() != 1)
throw Exception("There are several writing job for an automatically replicated shard", ErrorCodes::LOGICAL_ERROR);
/// TODO: it make sense to rewrite skip_unavailable_shards and max_parallel_replicas here
......@@ -248,7 +273,7 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
}
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
job.stream->write(block);
job.stream->write(shard_block);
}
else
{
......@@ -265,21 +290,25 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp
size_t num_repetitions = shard_info.getLocalNodeCount();
for (size_t i = 0; i < num_repetitions; ++i)
job.stream->write(block);
job.stream->write(shard_block);
}
job.blocks_written += 1;
job.rows_written += block.rows();
job.rows_written += shard_block.rows();
};
}
void DistributedBlockOutputStream::writeSync(const Block & block)
{
const auto & shards_info = cluster->getShardsInfo();
size_t num_shards = shards_info.size();
if (!pool)
{
/// Deferred initialization. Only for sync insertion.
initWritingJobs();
initWritingJobs(block);
pool.emplace(remote_jobs_count + local_jobs_count);
query_string = queryToString(query_ast);
......@@ -292,14 +321,25 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
watch.restart();
}
const auto & shards_info = cluster->getShardsInfo();
current_blocks = shards_info.size() > 1 ? splitBlock(block) : Blocks({block});
watch_current_block.restart();
if (num_shards > 1)
{
auto current_selector = createSelector(block);
/// Prepare row numbers for each shard
for (size_t shard_index : ext::range(0, num_shards))
per_shard_jobs[shard_index].shard_current_block_permuation.resize(0);
for (size_t i = 0; i < block.rows(); ++i)
per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i);
}
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, current_blocks.size()))
for (JobInfo & job : per_shard_jobs.at(shard_index))
pool->schedule(runWritingJob(job));
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->schedule(runWritingJob(job, block));
try
{
......@@ -311,7 +351,8 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
throw;
}
++blocks_inserted;
inserted_blocks += 1;
inserted_rows += block.rows();
}
......@@ -321,7 +362,7 @@ void DistributedBlockOutputStream::writeSuffix()
{
finished_jobs_count = 0;
for (auto & shard_jobs : per_shard_jobs)
for (JobInfo & job : shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
{
if (job.stream)
pool->schedule([&job] () { job.stream->writeSuffix(); });
......@@ -338,17 +379,19 @@ void DistributedBlockOutputStream::writeSuffix()
}
double elapsed = watch.elapsedSeconds();
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << blocks_inserted << " blocks"
<< " (average " << std::fixed << std::setprecision(1) << elapsed / blocks_inserted * 1000 << " ms. per block)"
LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << inserted_blocks << " blocks"
<< ", " << std::fixed << std::setprecision(1) << inserted_rows / elapsed << " rows per second"
<< ". " << getCurrentStateDescription());
}
}
IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
IColumn::Selector DistributedBlockOutputStream::createSelector(const Block & source_block)
{
storage.getShardingKeyExpr()->execute(block);
const auto & key_column = block.getByName(storage.getShardingKeyColumnName());
Block current_block_with_sharding_key_expr = source_block;
storage.getShardingKeyExpr()->execute(current_block_with_sharding_key_expr);
const auto & key_column = current_block_with_sharding_key_expr.getByName(storage.getShardingKeyColumnName());
const auto & slot_to_shard = cluster->getSlotToShard();
#define CREATE_FOR_TYPE(TYPE) \
......@@ -372,12 +415,6 @@ IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
Blocks DistributedBlockOutputStream::splitBlock(const Block & block)
{
const auto num_cols = block.columns();
/// cache column pointers for later reuse
std::vector<const IColumn *> columns(num_cols);
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.safeGetByPosition(i).column.get();
auto selector = createSelector(block);
/// Split block to num_shard smaller block, using 'selector'.
......@@ -409,7 +446,7 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
if (splitted_blocks[shard_idx].rows())
writeAsyncImpl(splitted_blocks[shard_idx], shard_idx);
++blocks_inserted;
++inserted_blocks;
}
......
......@@ -44,7 +44,7 @@ public:
private:
IColumn::Selector createSelector(Block block);
IColumn::Selector createSelector(const Block & source_block);
void writeAsync(const Block & block);
......@@ -65,10 +65,10 @@ private:
/// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws.
void writeSync(const Block & block);
void initWritingJobs();
void initWritingJobs(const Block & first_block);
struct JobInfo;
ThreadPool::Job runWritingJob(JobInfo & job);
struct JobReplica;
ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block);
void waitForJobs();
......@@ -80,27 +80,31 @@ private:
ASTPtr query_ast;
ClusterPtr cluster;
const Settings & settings;
size_t blocks_inserted = 0;
size_t inserted_blocks = 0;
size_t inserted_rows = 0;
bool insert_sync;
/// Sync-related stuff
UInt64 insert_timeout; // in seconds
Stopwatch watch;
Stopwatch watch_current_block;
std::optional<ThreadPool> pool;
ThrottlerPtr throttler;
String query_string;
struct JobInfo
struct JobReplica
{
JobInfo() = default;
JobInfo(size_t shard_index, size_t replica_index, bool is_local_job)
: shard_index(shard_index), replica_index(replica_index), is_local_job(is_local_job) {}
JobReplica() = default;
JobReplica(size_t shard_index, size_t replica_index, bool is_local_job, const Block & sample_block)
: shard_index(shard_index), replica_index(replica_index), is_local_job(is_local_job), current_shard_block(sample_block.cloneEmpty()) {}
size_t shard_index = 0;
size_t replica_index = 0;
bool is_local_job = false;
Block current_shard_block;
ConnectionPool::Entry connection_entry;
std::unique_ptr<Context> local_context;
BlockOutputStreamPtr stream;
......@@ -108,13 +112,18 @@ private:
UInt64 blocks_written = 0;
UInt64 rows_written = 0;
UInt64 bloks_started = 0;
UInt64 blocks_started = 0;
UInt64 elapsed_time_ms = 0;
UInt64 max_elapsed_time_for_block_ms = 0;
};
std::vector<std::list<JobInfo>> per_shard_jobs;
Blocks current_blocks;
struct JobShard
{
std::list<JobReplica> replicas_jobs;
IColumn::Permutation shard_current_block_permuation;
};
std::vector<JobShard> per_shard_jobs;
size_t remote_jobs_count = 0;
size_t local_jobs_count = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册