diff --git a/dbms/src/DataStreams/NullAndDoCopyBlockInputStream.h b/dbms/src/DataStreams/NullAndDoCopyBlockInputStream.h index e3a0d923747dcd3b559a0a3bb77da27095b00b3b..8fe05c387a39386fd58eb9898b69660d2a45be78 100644 --- a/dbms/src/DataStreams/NullAndDoCopyBlockInputStream.h +++ b/dbms/src/DataStreams/NullAndDoCopyBlockInputStream.h @@ -49,6 +49,10 @@ public: protected: Block readImpl() override { + /// We do not use cancel flag here. + /// If query was cancelled, it will be processed by child streams. + /// Part of the data will be processed. + if (input_streams.size() == 1 && output_streams.size() == 1) copyData(*input_streams.at(0), *output_streams.at(0)); else diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp index b3820828977c5a3f784a73eb09855ba0e89487a5..0a8e0521950e788488216faeabc385e94419bfaf 100644 --- a/dbms/src/DataStreams/copyData.cpp +++ b/dbms/src/DataStreams/copyData.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -54,69 +55,77 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall inline void doNothing(const Block &) {} +namespace +{ -static void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) + +struct ParallelInsertsHandler { - if (froms.size() == tos.size()) + using CencellationHook = std::function; + + explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_) + : outputs(output_streams.size()), cancellation_hook(std::move(cancellation_hook_)) + { + exceptions.resize(output_streams.size()); + + for (auto & output : output_streams) + outputs.push(output.get()); + } + + void onBlock(Block & block, size_t /*thread_num*/) + { + IBlockOutputStream * out = nullptr; + + outputs.pop(out); + out->write(block); + outputs.push(out); + } + + void onFinishThread(size_t /*thread_num*/) {} + void onFinish() {} + + void onException(std::exception_ptr & exception, size_t thread_num) { - std::vector threads; - threads.reserve(froms.size()); - for (size_t i = 0; i < froms.size(); i++) - { - threads.emplace_back([from = froms.at(i), to = tos.at(i)]() - { - from->readPrefix(); - to->writePrefix(); - while (Block block = from->read()) - to->write(block); - from->readSuffix(); - to->writeSuffix(); - }); - } - for (auto & thread : threads) - thread.join(); + exceptions[thread_num] = std::move(exception); + cancellation_hook(); } - else + + void rethrowFirstException() { - ConcurrentBoundedQueue queue(froms.size()); - std::vector to_threads; - for (auto & to : tos) - { - to_threads.emplace_back([&queue, to]() - { - to->writePrefix(); - Block block; - while (true) - { - queue.pop(block); - if (!block) - break; - to->write(block); - } - to->writeSuffix(); - }); - } - - std::vector from_threads_; - from_threads_.reserve(froms.size()); - for (auto & from : froms) - { - from_threads_.emplace_back([&queue, from]() - { - from->readPrefix(); - while (Block block = from->read()) - queue.push(block); - from->readSuffix(); - }); - } - - for (auto & thread : from_threads_) - thread.join(); - for (size_t i = 0; i < tos.size(); i++) - queue.push({}); - for (auto & thread : to_threads) - thread.join(); + for (auto & exception : exceptions) + if (exception) + std::rethrow_exception(exception); } + + ConcurrentBoundedQueue outputs; + std::vector exceptions; + CencellationHook cancellation_hook; +}; + +} + +static void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) +{ + for (auto & output : tos) + output->writePrefix(); + + using Processor = ParallelInputsProcessor; + Processor * processor_ptr = nullptr; + + ParallelInsertsHandler handler(tos, [&processor_ptr]() { processor_ptr->cancel(false); }); + ParallelInputsProcessor processor(froms, nullptr, froms.size(), handler); + processor_ptr = &processor; + + processor.process(); + processor.wait(); + handler.rethrowFirstException(); + + /// readPrefix is called in ParallelInputsProcessor. + for (auto & input : froms) + input->readSuffix(); + + for (auto & output : tos) + output->writeSuffix(); } void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic * is_cancelled)