提交 0046b9f1 编写于 作者: T tavplubix 提交者: Alexander Kuzmenkov

Wait for jobs to finish on exception (fixes rare segfaults) (#7350)

上级 b62c9e72
......@@ -274,15 +274,24 @@ private:
pcg64 generator(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
for (size_t i = 0; i < concurrency; ++i)
try
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (size_t i = 0; i < concurrency; ++i)
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(
connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
}
}
catch (...)
{
pool.wait();
throw;
}
InterruptListener interrupt_listener;
......
......@@ -895,7 +895,7 @@ public:
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
......@@ -2038,7 +2038,7 @@ protected:
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
thread_pool.wait();
}
......
......@@ -565,7 +565,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto executor = pipeline.execute();
std::atomic_bool exception = false;
pool.schedule([&]()
pool.scheduleOrThrowOnError([&]()
{
/// ThreadStatus thread_status;
......
......@@ -121,13 +121,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
}
template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}
......
......@@ -36,18 +36,23 @@ public:
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: greater is higher.
void schedule(Job job, int priority = 0);
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
void scheduleOrThrowOnError(Job job, int priority = 0);
/// Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
/// Wait for specified amount of time and schedule a job or throw an exception.
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// You may call schedule and wait many times in arbitrary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
......@@ -140,7 +145,7 @@ public:
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
: state(std::make_shared<Poco::Event>())
{
/// NOTE: If this will throw an exception, the descructor won't be called.
/// NOTE: If this will throw an exception, the destructor won't be called.
GlobalThreadPool::instance().scheduleOrThrow([
state = state,
func = std::forward<Function>(func),
......
......@@ -21,14 +21,14 @@ TEST(ThreadPool, ConcurrentWait)
ThreadPool pool(num_threads);
for (size_t i = 0; i < num_jobs; ++i)
pool.schedule(worker);
pool.scheduleOrThrowOnError(worker);
constexpr size_t num_waiting_threads = 4;
ThreadPool waiting_pool(num_waiting_threads);
for (size_t i = 0; i < num_waiting_threads; ++i)
waiting_pool.schedule([&pool]{ pool.wait(); });
waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); });
waiting_pool.wait();
}
......@@ -30,11 +30,11 @@ TEST(ThreadPool, GlobalFull1)
ThreadPool pool(num_jobs);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);
for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
}
......@@ -67,10 +67,10 @@ TEST(ThreadPool, GlobalFull2)
ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);
ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
......@@ -79,7 +79,7 @@ TEST(ThreadPool, GlobalFull2)
global_pool.wait();
for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });
another_pool.scheduleOrThrowOnError([&] { ++counter; });
another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);
......
......@@ -14,7 +14,7 @@ int test()
std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ ++counter; });
pool.scheduleOrThrowOnError([&]{ ++counter; });
pool.wait();
return counter;
......
......@@ -14,7 +14,7 @@ TEST(ThreadPool, Loop)
size_t threads = 16;
ThreadPool pool(threads);
for (size_t j = 0; j < threads; ++j)
pool.schedule([&]{ ++res; });
pool.scheduleOrThrowOnError([&] { ++res; });
pool.wait();
}
......
......@@ -9,12 +9,12 @@ bool check()
{
ThreadPool pool(10);
pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); });
try
{
for (size_t i = 0; i < 100; ++i)
pool.schedule([]{}); /// An exception will be rethrown from this method.
pool.scheduleOrThrowOnError([] {}); /// An exception will be rethrown from this method.
}
catch (const std::runtime_error &)
{
......
......@@ -37,8 +37,8 @@ int main(int, char **)
ThreadPool tp(8);
for (size_t i = 0; i < n; ++i)
{
tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
tp.scheduleOrThrowOnError(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.scheduleOrThrowOnError(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
}
tp.wait();
......
......@@ -284,7 +284,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
......@@ -338,7 +338,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate12,
pool.scheduleOrThrowOnError(std::bind(aggregate12,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
......@@ -397,7 +397,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
......@@ -473,7 +473,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate2,
pool.scheduleOrThrowOnError(std::bind(aggregate2,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
......@@ -499,7 +499,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2,
pool.scheduleOrThrowOnError(std::bind(merge2,
maps.data(), num_threads, i));
pool.wait();
......@@ -527,7 +527,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate22,
pool.scheduleOrThrowOnError(std::bind(aggregate22,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
......@@ -553,7 +553,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2, maps.data(), num_threads, i));
pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i));
pool.wait();
......@@ -592,7 +592,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate3,
pool.scheduleOrThrowOnError(std::bind(aggregate3,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
......@@ -658,7 +658,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate33,
pool.scheduleOrThrowOnError(std::bind(aggregate33,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
......@@ -727,7 +727,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate4,
pool.scheduleOrThrowOnError(std::bind(aggregate4,
std::ref(local_maps[i]),
std::ref(global_map),
mutexes.data(),
......@@ -797,7 +797,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate5,
pool.scheduleOrThrowOnError(std::bind(aggregate5,
std::ref(local_maps[i]),
std::ref(global_map),
data.begin() + (data.size() * i) / num_threads,
......@@ -860,7 +860,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
......
......@@ -42,7 +42,7 @@ struct AggregateIndependent
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];
pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
for (auto it = begin; it != end; ++it)
{
......@@ -85,7 +85,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];
pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
typename Map::LookupResult place = nullptr;
Key prev_key {};
......@@ -180,7 +180,7 @@ struct MergeParallelForTwoLevelTable
ThreadPool & pool)
{
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
pool.schedule([&, bucket, num_maps]
pool.scheduleOrThrowOnError([&, bucket, num_maps]
{
std::vector<typename Map::Impl *> section(num_maps);
for (size_t i = 0; i < num_maps; ++i)
......
......@@ -66,7 +66,7 @@ int main(int argc, char ** argv)
test(n, "Create and destroy ThreadPool each iteration", []
{
ThreadPool tp(1);
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
......@@ -90,7 +90,7 @@ int main(int argc, char ** argv)
test(n, "Schedule job for Threadpool each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}
......@@ -100,7 +100,7 @@ int main(int argc, char ** argv)
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}
......
......@@ -36,7 +36,7 @@ void AsynchronousBlockInputStream::next()
{
ready.reset();
pool.schedule([this, thread_group = CurrentThread::getGroup()] ()
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
......
......@@ -168,21 +168,28 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
else
{
size_t num_children = children.size();
for (size_t i = 0; i < num_children; ++i)
try
{
auto & child = children[i];
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&child, thread_group]
for (size_t i = 0; i < num_children; ++i)
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
}
auto & child = children[i];
auto thread_group = CurrentThread::getGroup();
reading_pool->scheduleOrThrowOnError([&child, thread_group]
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
child->readPrefix();
});
}
}
catch (...)
{
reading_pool->wait();
throw;
}
reading_pool->wait();
}
......@@ -194,7 +201,7 @@ void MergingAggregatedMemoryEfficientBlockInputStream::start()
*/
for (size_t i = 0; i < merging_threads; ++i)
pool.schedule([this, thread_group = CurrentThread::getGroup()] () { mergeThread(thread_group); });
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]() { mergeThread(thread_group); });
}
}
......@@ -475,22 +482,29 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
}
else
{
for (auto & input : inputs)
try
{
if (need_that_input(input))
for (auto & input : inputs)
{
auto thread_group = CurrentThread::getGroup();
reading_pool->schedule([&input, &read_from_input, thread_group]
if (need_that_input(input))
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
auto thread_group = CurrentThread::getGroup();
reading_pool->scheduleOrThrowOnError([&input, &read_from_input, thread_group]
{
setThreadName("MergeAggReadThr");
if (thread_group)
CurrentThread::attachToIfDetached(thread_group);
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
read_from_input(input);
});
}
}
}
catch (...)
{
reading_pool->wait();
throw;
}
reading_pool->wait();
}
......
......@@ -129,7 +129,7 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
for (size_t view_num = 0; view_num < views.size(); ++view_num)
{
auto thread_group = CurrentThread::getGroup();
pool.schedule([=]
pool.scheduleOrThrowOnError([=]
{
setThreadName("PushingToViews");
if (thread_group)
......
......@@ -141,7 +141,7 @@ void DatabaseOrdinary::loadTables(
for (const auto & file_name : file_names)
{
pool.schedule([&]() { loadOneTable(file_name); });
pool.scheduleOrThrowOnError([&]() { loadOneTable(file_name); });
}
pool.wait();
......@@ -174,11 +174,16 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
}
};
for (const auto & table : tables)
try
{
thread_pool.schedule([&]() { startupOneTable(table.second); });
for (const auto & table : tables)
thread_pool.scheduleOrThrowOnError([&]() { startupOneTable(table.second); });
}
catch (...)
{
thread_pool.wait();
throw;
}
thread_pool.wait();
}
......
......@@ -41,7 +41,7 @@ private:
swapBuffers();
/// The data will be written in separate stream.
pool.schedule([this] { thread(); });
pool.scheduleOrThrowOnError([this] { thread(); });
}
public:
......
......@@ -1158,7 +1158,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
tasks[bucket] = std::packaged_task<Block()>(std::bind(converter, bucket, CurrentThread::getGroup()));
if (thread_pool)
thread_pool->schedule([bucket, &tasks] { tasks[bucket](); });
thread_pool->scheduleOrThrowOnError([bucket, &tasks] { tasks[bucket](); });
else
tasks[bucket]();
}
......@@ -1614,7 +1614,7 @@ private:
if (max_scheduled_bucket_num >= NUM_BUCKETS)
return;
parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
parallel_merge_data->pool.scheduleOrThrowOnError(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
max_scheduled_bucket_num, CurrentThread::getGroup()));
}
......@@ -1968,7 +1968,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
auto task = std::bind(merge_bucket, bucket, aggregates_pool, CurrentThread::getGroup());
if (thread_pool)
thread_pool->schedule(task);
thread_pool->scheduleOrThrowOnError(task);
else
task();
}
......
......@@ -327,7 +327,7 @@ void InterpreterSystemQuery::restartReplicas(Context & system_context)
ThreadPool pool(std::min(size_t(getNumberOfPhysicalCPUCores()), replica_names.size()));
for (auto & table : replica_names)
pool.schedule([&] () { tryRestartReplica(table.first, table.second, system_context); });
pool.scheduleOrThrowOnError([&]() { tryRestartReplica(table.first, table.second, system_context); });
pool.wait();
}
......
......@@ -140,7 +140,7 @@ try
size_t num_threads = 2;
ThreadPool pool(num_threads);
for (size_t i = 0; i < num_threads; ++i)
pool.schedule([i]() { do_io(i); });
pool.scheduleOrThrowOnError([i]() { do_io(i); });
pool.wait();
test_perf();
......
......@@ -85,7 +85,7 @@ namespace DB
// active_processors.insert(current_processor);
// }
//
// pool.schedule([processor = current_processor, &watch, this]
// pool.scheduleOrThrowOnError([processor = current_processor, &watch, this]
// {
// processor->work();
// {
......
......@@ -88,7 +88,7 @@ public:
void schedule(EventCounter & watch) override
{
active = true;
pool.schedule([&watch, this]
pool.scheduleOrThrowOnError([&watch, this]
{
usleep(sleep_useconds);
current_chunk = generate();
......
......@@ -339,11 +339,19 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
per_shard_jobs[current_selector[i]].shard_current_block_permuation.push_back(i);
}
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->schedule(runWritingJob(job, block));
try
{
/// Run jobs in parallel for each block and wait them
finished_jobs_count = 0;
for (size_t shard_index : ext::range(0, shards_info.size()))
for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs)
pool->scheduleOrThrowOnError(runWritingJob(job, block));
}
catch (...)
{
pool->wait();
throw;
}
try
{
......@@ -373,17 +381,27 @@ void DistributedBlockOutputStream::writeSuffix()
if (insert_sync && pool)
{
finished_jobs_count = 0;
for (auto & shard_jobs : per_shard_jobs)
for (JobReplica & job : shard_jobs.replicas_jobs)
try
{
for (auto & shard_jobs : per_shard_jobs)
{
if (job.stream)
for (JobReplica & job : shard_jobs.replicas_jobs)
{
pool->schedule([&job] ()
if (job.stream)
{
job.stream->writeSuffix();
});
pool->scheduleOrThrowOnError([&job]()
{
job.stream->writeSuffix();
});
}
}
}
}
catch (...)
{
pool->wait();
throw;
}
try
{
......
......@@ -802,7 +802,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (size_t i = 0; i < part_names_with_disks.size(); ++i)
{
pool.schedule([&, i]
pool.scheduleOrThrowOnError([&, i]
{
const auto & part_name = part_names_with_disks[i].first;
const auto part_disk_ptr = part_names_with_disks[i].second;
......@@ -1155,7 +1155,7 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
for (const DataPartPtr & part : parts_to_remove)
{
pool.schedule([&]
pool.scheduleOrThrowOnError([&]
{
LOG_DEBUG(log, "Removing part from filesystem " << part->name);
part->remove();
......
......@@ -216,7 +216,7 @@ std::vector<MergeTreeData::AlterDataPartTransactionPtr> StorageMergeTree::prepar
};
if (thread_pool)
thread_pool->schedule(job);
thread_pool->scheduleOrThrowOnError(job);
else
job();
}
......
......@@ -150,7 +150,7 @@ int mainImpl(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < threads; ++i)
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count));
pool.scheduleOrThrowOnError(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count));
pool.wait();
fsync(fd);
......
......@@ -175,7 +175,7 @@ int mainImpl(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < threads_count; ++i)
pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count));
pool.scheduleOrThrowOnError(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count));
pool.wait();
watch.stop();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册