提交 33cff5ee 编写于 作者: A Alexey Milovidov

Merge remote-tracking branch 'origin/master' into sanych73-prepared_statements

......@@ -15,9 +15,14 @@ IF(CMAKE_COMPILER_IS_GNUCXX)
STRING(REGEX MATCHALL "[0-9]+" GCC_COMPILER_VERSION ${GCC_COMPILER_VERSION})
LIST(LENGTH GCC_COMPILER_VERSION GCC_COMPILER_VERSION_LENGTH)
LIST(GET GCC_COMPILER_VERSION 0 GCC_COMPILER_VERSION_MAJOR)
LIST(GET GCC_COMPILER_VERSION 0 GCC_COMPILER_VERSION_MINOR)
if (GCC_COMPILER_VERSION_LENGTH GREATER 1)
LIST(GET GCC_COMPILER_VERSION 1 GCC_COMPILER_VERSION_MINOR)
else ()
set (GCC_COMPILER_VERSION_MINOR 0)
endif ()
SET(GCC_COMPILER_VERSION_MAJOR ${GCC_COMPILER_VERSION_MAJOR} CACHE INTERNAL "gcc major version")
SET(GCC_COMPILER_VERSION_MINOR ${GCC_COMPILER_VERSION_MINOR} CACHE INTERNAL "gcc minor version")
......
......@@ -30,10 +30,18 @@ template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
{
auto on_error = []
auto on_error = [&]
{
if constexpr (std::is_same_v<ReturnType, void>)
{
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
throw DB::Exception("Cannot schedule a task", DB::ErrorCodes::CANNOT_SCHEDULE_TASK);
}
else
return false;
};
......
......@@ -41,9 +41,6 @@ target_link_libraries (compact_array PRIVATE clickhouse_common_io ${Boost_FILESY
add_executable (radix_sort radix_sort.cpp)
target_link_libraries (radix_sort PRIVATE clickhouse_common_io)
add_executable (shell_command_test shell_command_test.cpp)
target_link_libraries (shell_command_test PRIVATE clickhouse_common_io)
add_executable (arena_with_free_lists arena_with_free_lists.cpp)
target_link_libraries (arena_with_free_lists PRIVATE clickhouse_compression clickhouse_common_io)
......@@ -53,15 +50,6 @@ target_link_libraries (pod_array PRIVATE clickhouse_common_io)
add_executable (thread_creation_latency thread_creation_latency.cpp)
target_link_libraries (thread_creation_latency PRIVATE clickhouse_common_io)
add_executable (thread_pool thread_pool.cpp)
target_link_libraries (thread_pool PRIVATE clickhouse_common_io)
add_executable (thread_pool_2 thread_pool_2.cpp)
target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)
add_executable (thread_pool_3 thread_pool_3.cpp)
target_link_libraries (thread_pool_3 PRIVATE clickhouse_common_io)
add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)
......
......@@ -4,48 +4,62 @@
#include <IO/copyData.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <chrono>
#include <thread>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
using namespace DB;
int main(int, char **)
try
TEST(ShellCommand, Execute)
{
{
auto command = ShellCommand::execute("echo 'Hello, world!'");
auto command = ShellCommand::execute("echo 'Hello, world!'");
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
std::string res;
readStringUntilEOF(res, command->out);
command->wait();
command->wait();
}
EXPECT_EQ(res, "Hello, world!\n");
}
{
auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"});
TEST(ShellCommand, ExecuteDirect)
{
auto command = ShellCommand::executeDirect("/bin/echo", {"Hello, world!"});
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
std::string res;
readStringUntilEOF(res, command->out);
command->wait();
command->wait();
}
EXPECT_EQ(res, "Hello, world!\n");
}
{
auto command = ShellCommand::execute("cat");
TEST(ShellCommand, ExecuteWithInput)
{
auto command = ShellCommand::execute("cat");
String in_str = "Hello, world!\n";
ReadBufferFromString in(in_str);
copyData(in, command->in);
command->in.close();
String in_str = "Hello, world!\n";
ReadBufferFromString in(in_str);
copyData(in, command->in);
command->in.close();
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
copyData(command->out, out);
std::string res;
readStringUntilEOF(res, command->out);
command->wait();
command->wait();
}
EXPECT_EQ(res, "Hello, world!\n");
}
TEST(ShellCommand, AutoWait)
{
// <defunct> hunting:
for (int i = 0; i < 1000; ++i)
{
......@@ -56,8 +70,3 @@ try
// std::cerr << "inspect me: ps auxwwf" << "\n";
// std::this_thread::sleep_for(std::chrono::seconds(100));
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << "\n";
return 1;
}
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
/** Reproduces bug in ThreadPool.
* It get stuck if we call 'wait' many times from many other threads simultaneously.
*/
int main(int, char **)
TEST(ThreadPool, ConcurrentWait)
{
auto worker = []
{
......@@ -29,6 +36,4 @@ int main(int, char **)
waiting_pool.schedule([&pool]{ pool.wait(); });
waiting_pool.wait();
return 0;
}
#include <mutex>
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
/// Test for thread self-removal when number of free threads in pool is too large.
/// Just checks that nothing weird happens.
template <typename Pool>
void test()
int test()
{
Pool pool(10, 2, 10);
std::mutex mutex;
std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ std::lock_guard lock(mutex); std::cerr << '.'; });
pool.schedule([&]{ ++counter; });
pool.wait();
return counter;
}
int main(int, char **)
TEST(ThreadPool, ThreadRemoval)
{
test<FreeThreadPool>();
std::cerr << '\n';
test<ThreadPool>();
std::cerr << '\n';
return 0;
EXPECT_EQ(test<FreeThreadPool>(), 10);
EXPECT_EQ(test<ThreadPool>(), 10);
}
......@@ -2,10 +2,17 @@
#include <iostream>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
int main(int, char **)
TEST(ThreadPool, Loop)
{
std::atomic<size_t> res{0};
std::atomic<int> res{0};
for (size_t i = 0; i < 1000; ++i)
{
......@@ -16,6 +23,5 @@ int main(int, char **)
pool.wait();
}
std::cerr << res << "\n";
return 0;
EXPECT_EQ(res, 16000);
}
#include <iostream>
#include <stdexcept>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
bool check()
{
ThreadPool pool(10);
pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
try
{
for (size_t i = 0; i < 100; ++i)
pool.schedule([]{}); /// An exception will be rethrown from this method.
}
catch (const std::runtime_error &)
{
return true;
}
pool.wait();
return false;
}
TEST(ThreadPool, ExceptionFromSchedule)
{
EXPECT_TRUE(check());
}
......@@ -277,6 +277,7 @@ struct ContextShared
/// Preemptive destruction is important, because these objects may have a refcount to ContextShared (cyclic reference).
/// TODO: Get rid of this.
system_logs.reset();
embedded_dictionaries.reset();
external_dictionaries.reset();
external_models.reset();
......
......@@ -87,10 +87,9 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
{
std::unique_lock lock(mutex);
const auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
if (!is_unlimited_query && max_size && processes.size() >= max_size)
{
auto max_wait_ms = settings.queue_max_wait_ms.totalMilliseconds();
if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&]{ return processes.size() < max_size; }))
throw Exception("Too many simultaneous queries. Maximum: " + toString(max_size), ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
}
......@@ -117,20 +116,41 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
+ ", maximum: " + settings.max_concurrent_queries_for_user.toString(),
ErrorCodes::TOO_MANY_SIMULTANEOUS_QUERIES);
auto range = user_process_list->second.queries.equal_range(client_info.current_query_id);
if (range.first != range.second)
auto running_query = user_process_list->second.queries.find(client_info.current_query_id);
if (running_query != user_process_list->second.queries.end())
{
if (!settings.replace_running_query)
throw Exception("Query with id = " + client_info.current_query_id + " is already running.",
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
/// Ask queries to cancel. They will check this flag.
for (auto it = range.first; it != range.second; ++it)
it->second->is_killed.store(true, std::memory_order_relaxed);
}
running_query->second->is_killed.store(true, std::memory_order_relaxed);
if (!max_wait_ms || !have_space.wait_for(lock, std::chrono::milliseconds(max_wait_ms), [&]
{
running_query = user_process_list->second.queries.find(client_info.current_query_id);
if (running_query == user_process_list->second.queries.end())
return true;
running_query->second->is_killed.store(true, std::memory_order_relaxed);
return false;
}))
throw Exception("Query with id = " + client_info.current_query_id + " is already running and can't be stopped",
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
}
}
}
/// Check other users running query with our query_id
for (const auto & user_process_list : user_to_queries)
{
if (user_process_list.first == client_info.current_user)
continue;
if (auto running_query = user_process_list.second.queries.find(client_info.current_query_id); running_query != user_process_list.second.queries.end())
throw Exception("Query with id = " + client_info.current_query_id + " is already running by user " + user_process_list.first,
ErrorCodes::QUERY_WITH_SAME_ID_IS_ALREADY_RUNNING);
}
auto process_it = processes.emplace(processes.end(),
query_, client_info, settings.max_memory_usage, settings.memory_tracker_fault_probability, priorities.insert(settings.priority));
......@@ -226,17 +246,12 @@ ProcessListEntry::~ProcessListEntry()
bool found = false;
auto range = user_process_list.queries.equal_range(query_id);
if (range.first != range.second)
if (auto running_query = user_process_list.queries.find(query_id); running_query != user_process_list.queries.end())
{
for (auto jt = range.first; jt != range.second; ++jt)
if (running_query->second == process_list_element_ptr)
{
if (jt->second == process_list_element_ptr)
{
user_process_list.queries.erase(jt);
found = true;
break;
}
user_process_list.queries.erase(running_query->first);
found = true;
}
}
......@@ -245,8 +260,7 @@ ProcessListEntry::~ProcessListEntry()
LOG_ERROR(&Logger::get("ProcessList"), "Logical error: cannot find query by query_id and pointer to ProcessListElement in ProcessListForUser");
std::terminate();
}
parent.have_space.notify_one();
parent.have_space.notify_all();
/// If there are no more queries for the user, then we will reset memory tracker and network throttler.
if (user_process_list.queries.empty())
......
......@@ -203,7 +203,7 @@ struct ProcessListForUser
ProcessListForUser();
/// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled.
using QueryToElement = std::unordered_multimap<String, QueryStatus *>;
using QueryToElement = std::unordered_map<String, QueryStatus *>;
QueryToElement queries;
ProfileEvents::Counters user_performance_counters{VariableContext::User, &ProfileEvents::global_counters};
......
......@@ -9,3 +9,16 @@ $CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d
sleep 0.1 # First query (usually) should be received by the server after this sleep.
$CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL?query_id=hello&replace_running_query=1" -d 'SELECT 0'
wait
${CLICKHOUSE_CLIENT} --user=readonly --query_id=42 --query='SELECT 1, sleep(1)' &
sleep 0.1
( ${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 43' ||: ) 2>&1 | grep -F 'is already running by user' > /dev/null
wait
${CLICKHOUSE_CLIENT} --query='SELECT 3, sleep(1)' &
sleep 0.1
${CLICKHOUSE_CLIENT} --query_id=42 --query='SELECT 2, sleep(1)' &
sleep 0.1
( ${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --queue_max_wait_ms=500 --query='SELECT 43' ||: ) 2>&1 | grep -F 'cant be stopped' > /dev/null
${CLICKHOUSE_CLIENT} --query_id=42 --replace_running_query=1 --query='SELECT 44'
wait
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册