提交 ee873c8e 编写于 作者: C chertus

Merge branch 'master' into decimal

......@@ -71,7 +71,7 @@ if (NOT MAKE_STATIC_LIBRARIES)
endif ()
if (SPLIT_SHARED_LIBRARIES)
set (SPLIT_SHARED SHARED)
set (LINK_MODE SHARED)
endif ()
if (USE_STATIC_LIBRARIES)
......
......@@ -26,9 +26,9 @@ endif ()
if (NOT Boost_SYSTEM_LIBRARY)
set (USE_INTERNAL_BOOST_LIBRARY 1)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_SYSTEM_LIBRARY boost_system_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY})
set (Boost_INCLUDE_DIRS)
......
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${OS_LINUX})
#if (OS_LINUX)
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${NOT_UNBUNDLED})
#endif ()
set (OPENSSL_USE_STATIC_LIBS ${USE_STATIC_LIBRARIES})
......
if (NOT OS_FREEBSD AND NOT APPLE)
option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED})
endif ()
option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_ZLIB_LIBRARY)
find_package (ZLIB)
......
......@@ -60,8 +60,8 @@ if (USE_INTERNAL_ZLIB_LIBRARY)
target_compile_definitions (zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
target_compile_definitions (zlibstatic PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
if(CMAKE_SYSTEM_PROCESSOR MATCHES "x86_64" OR CMAKE_SYSTEM_PROCESSOR MATCHES "AMD64")
target_compile_definitions (zlib PUBLIC X86_64)
target_compile_definitions (zlibstatic PUBLIC X86_64)
target_compile_definitions (zlib PUBLIC X86_64 X86_NOCHECK_SSE2 UNALIGNED_OK)
target_compile_definitions (zlibstatic PUBLIC X86_64 X86_NOCHECK_SSE2 UNALIGNED_OK)
endif ()
set_target_properties(example PROPERTIES EXCLUDE_FROM_ALL 1)
......@@ -104,14 +104,14 @@ endif ()
if (ENABLE_MYSQL AND USE_INTERNAL_MYSQL_LIBRARY)
add_subdirectory (mariadb-connector-c-cmake)
target_include_directories(mysqlclient PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
target_include_directories(mysqlclient PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
target_include_directories(mysqlclient BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
target_include_directories(mysqlclient BEFORE PRIVATE ${OPENSSL_INCLUDE_DIR})
endif ()
if (USE_INTERNAL_RDKAFKA_LIBRARY)
add_subdirectory (librdkafka-cmake)
target_include_directories(rdkafka PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
target_include_directories(rdkafka PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
target_include_directories(rdkafka BEFORE PRIVATE ${ZLIB_INCLUDE_DIR})
target_include_directories(rdkafka BEFORE PRIVATE ${OPENSSL_INCLUDE_DIR})
endif ()
if (ENABLE_ODBC AND USE_INTERNAL_ODBC_LIBRARY)
......
......@@ -16,7 +16,7 @@ if (NOT MSVC)
add_definitions(-Wno-unused-variable -Wno-deprecated-declarations)
endif ()
add_library(boost_program_options_internal
add_library(boost_program_options_internal ${LINK_MODE}
${LIBRARY_DIR}/libs/program_options/src/cmdline.cpp
${LIBRARY_DIR}/libs/program_options/src/config_file.cpp
${LIBRARY_DIR}/libs/program_options/src/convert.cpp
......@@ -29,7 +29,7 @@ ${LIBRARY_DIR}/libs/program_options/src/value_semantic.cpp
${LIBRARY_DIR}/libs/program_options/src/variables_map.cpp
${LIBRARY_DIR}/libs/program_options/src/winmain.cpp)
add_library(boost_filesystem_internal
add_library(boost_filesystem_internal ${LINK_MODE}
${LIBRARY_DIR}/libs/filesystem/src/codecvt_error_category.cpp
${LIBRARY_DIR}/libs/filesystem/src/operations.cpp
${LIBRARY_DIR}/libs/filesystem/src/path.cpp
......@@ -39,9 +39,11 @@ ${LIBRARY_DIR}/libs/filesystem/src/unique_path.cpp
${LIBRARY_DIR}/libs/filesystem/src/utf8_codecvt_facet.cpp
${LIBRARY_DIR}/libs/filesystem/src/windows_file_codecvt.cpp)
add_library(boost_system_internal
add_library(boost_system_internal ${LINK_MODE}
${LIBRARY_DIR}/libs/system/src/error_code.cpp)
target_link_libraries (boost_filesystem_internal PUBLIC boost_system_internal)
target_include_directories (boost_program_options_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_filesystem_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_system_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
......
SET(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/cctz)
add_library(cctz
add_library(cctz ${LINK_MODE}
${LIBRARY_DIR}/src/civil_time_detail.cc
${LIBRARY_DIR}/src/time_zone_fixed.cc
${LIBRARY_DIR}/src/time_zone_format.cc
......
......@@ -54,7 +54,7 @@ ${RDKAFKA_SOURCE_DIR}/lz4hc.c
${RDKAFKA_SOURCE_DIR}/rdgz.c
)
add_library(rdkafka STATIC ${SRCS})
add_library(rdkafka ${LINK_MODE} ${SRCS})
target_include_directories(rdkafka PRIVATE include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR})
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
......@@ -23,7 +23,7 @@ ${ODBC_SOURCE_DIR}/libltdl/loaders/preopen.c
${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/libltdl/libltdlcS.c
)
add_library(ltdl STATIC ${SRCS})
add_library(ltdl ${LINK_MODE} ${SRCS})
target_include_directories(ltdl PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/libltdl)
target_include_directories(ltdl PUBLIC ${ODBC_SOURCE_DIR}/libltdl)
......@@ -273,7 +273,7 @@ ${ODBC_SOURCE_DIR}/lst/lstSetFreeFunc.c
${ODBC_SOURCE_DIR}/lst/_lstVisible.c
)
add_library(unixodbc STATIC ${SRCS})
add_library(unixodbc ${LINK_MODE} ${SRCS})
target_link_libraries(unixodbc ltdl)
......
......@@ -125,6 +125,6 @@ IF (ZSTD_LEGACY_SUPPORT)
${LIBRARY_LEGACY_DIR}/zstd_v07.h)
ENDIF (ZSTD_LEGACY_SUPPORT)
ADD_LIBRARY(zstd ${Sources} ${Headers})
ADD_LIBRARY(zstd ${LINK_MODE} ${Sources} ${Headers})
target_include_directories (zstd PUBLIC ${LIBRARY_DIR})
......@@ -83,7 +83,7 @@ list (APPEND dbms_headers
list (APPEND dbms_sources src/TableFunctions/ITableFunction.cpp src/TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions/TableFunctionFactory.h)
add_library(clickhouse_common_io ${SPLIT_SHARED} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
add_library(clickhouse_common_io ${LINK_MODE} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
if (OS_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)
......@@ -96,7 +96,7 @@ if (MAKE_STATIC_LIBRARIES)
add_library(dbms ${dbms_headers} ${dbms_sources})
else ()
add_library(dbms SHARED ${dbms_headers} ${dbms_sources})
set_target_properties (dbms PROPERTIES SOVERSION ${VERSION_MAJOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse)
set_target_properties (dbms PROPERTIES SOVERSION ${VERSION_MAJOR}.${VERSION_MINOR} VERSION ${VERSION_SO} OUTPUT_NAME clickhouse)
install (TARGETS dbms LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT clickhouse)
endif ()
......
add_library (clickhouse-benchmark-lib ${SPLIT_SHARED} Benchmark.cpp)
add_library (clickhouse-benchmark-lib ${LINK_MODE} Benchmark.cpp)
target_link_libraries (clickhouse-benchmark-lib clickhouse-client-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-benchmark-lib SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
......
#include "TestHint.h"
#include "ConnectionParameters.h"
#include <port/unistd.h>
#include <stdlib.h>
......@@ -13,11 +14,11 @@
#include <optional>
#include <boost/program_options.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <Poco/String.h>
#include <Poco/File.h>
#include <Poco/Util/Application.h>
#include <common/readline_use.h>
#include <common/find_first_symbols.h>
#include <common/SetTerminalEcho.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
......@@ -57,7 +58,10 @@
#include <Common/InterruptListener.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <ext/scope_guard.h>
#if USE_READLINE
#include "Suggest.h"
#endif
/// http://en.wikipedia.org/wiki/ANSI_escape_code
......@@ -72,7 +76,6 @@
#define DISABLE_LINE_WRAPPING "\033[?7l"
#define ENABLE_LINE_WRAPPING "\033[?7h"
namespace DB
{
......@@ -173,6 +176,8 @@ private:
int expected_client_error = 0;
int actual_server_error = 0;
int actual_client_error = 0;
UInt64 server_revision = 0;
String server_version;
String server_display_name;
......@@ -188,65 +193,6 @@ private:
/// External tables info.
std::list<ExternalTable> external_tables;
struct ConnectionParameters
{
String host;
UInt16 port;
String default_database;
String user;
String password;
Protocol::Secure security;
Protocol::Compression compression;
ConnectionTimeouts timeouts;
ConnectionParameters() {}
ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
{
bool is_secure = config.getBool("secure", false);
security = is_secure
? Protocol::Secure::Enable
: Protocol::Secure::Disable;
host = config.getString("host", "localhost");
port = config.getInt("port",
config.getInt(is_secure ? "tcp_port_secure" : "tcp_port",
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", "");
user = config.getString("user", "");
if (config.getBool("ask-password", false))
{
if (config.has("password"))
throw Exception("Specified both --password and --ask-password. Remove one of them", ErrorCodes::BAD_ARGUMENTS);
std::cout << "Password for user " << user << ": ";
SetTerminalEcho(false);
SCOPE_EXIT({
SetTerminalEcho(true);
});
std::getline(std::cin, password);
std::cout << std::endl;
}
else
{
password = config.getString("password", "");
}
compression = config.getBool("compression", true)
? Protocol::Compression::Enable
: Protocol::Compression::Disable;
timeouts = ConnectionTimeouts(
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
}
};
ConnectionParameters connection_parameters;
......@@ -343,7 +289,6 @@ private:
|| (now.month() == 1 && now.day() <= 5);
}
int mainImpl()
{
registerFunctions();
......@@ -459,9 +404,26 @@ private:
if (print_time_to_stderr)
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
#if USE_READLINE
if (server_revision >= Suggest::MIN_SERVER_REVISION
&& !config().getBool("disable_suggestion", false))
{
/// Load suggestion data from the server.
Suggest::instance().load(connection_parameters, config().getInt("suggestion_limit"));
/// Added '.' to the default list. Because it is used to separate database and table.
rl_basic_word_break_characters = " \t\n\r\"\\'`@$><=;|&{(.";
/// Not append whitespace after single suggestion. Because whitespace after function name is meaningless.
rl_completion_append_character = '\0';
rl_completion_entry_function = Suggest::generator;
}
else
#else
/// Turn tab completion off.
rl_bind_key('\t', rl_insert);
#endif
/// Load command history if present.
if (config().has("history_file"))
history_file = config().getString("history_file");
......@@ -516,7 +478,6 @@ private:
loop();
std::cout << (isNewYearMode() ? "Happy new year." : "Bye.") << std::endl;
return 0;
}
else
......@@ -559,7 +520,6 @@ private:
UInt64 server_version_major = 0;
UInt64 server_version_minor = 0;
UInt64 server_version_patch = 0;
UInt64 server_revision = 0;
if (max_client_network_bandwidth)
{
......@@ -1570,7 +1530,7 @@ public:
("config-file,c", po::value<std::string>(), "config-file path")
("host,h", po::value<std::string>()->default_value("localhost"), "server host")
("port", po::value<int>()->default_value(9000), "server port")
("secure,s", "secure")
("secure,s", "Use TLS connection")
("user,u", po::value<std::string>()->default_value("default"), "user")
("password", po::value<std::string>(), "password")
("ask-password", "ask-password")
......@@ -1578,6 +1538,9 @@ public:
("query,q", po::value<std::string>(), "query")
("database,d", po::value<std::string>(), "database")
("pager", po::value<std::string>(), "pager")
("disable_suggestion,A", "Disable loading suggestion data. Note that suggestion data is loaded asynchronously through a second connection to ClickHouse server. Also it is reasonable to disable suggestion if you want to paste a query with TAB characters. Shorthand option -A is for those who get used to mysql client.")
("suggestion_limit", po::value<int>()->default_value(10000),
"Suggestion limit for how many databases, tables and columns to fetch.")
("multiline,m", "multiline")
("multiquery,n", "multiquery")
("format,f", po::value<std::string>(), "default output format")
......@@ -1721,12 +1684,15 @@ public:
config().setBool("compression", options["compression"].as<bool>());
if (options.count("server_logs_file"))
server_logs_file = options["server_logs_file"].as<std::string>();
if (options.count("disable_suggestion"))
config().setBool("disable_suggestion", true);
if (options.count("suggestion_limit"))
config().setInt("suggestion_limit", options["suggestion_limit"].as<int>());
}
};
}
int mainEntryClickHouseClient(int argc, char ** argv)
{
DB::Client client;
......
#pragma once
#include <iostream>
#include <Core/Types.h>
#include <Core/Protocol.h>
#include <Core/Defines.h>
#include <Common/Exception.h>
#include <IO/ConnectionTimeouts.h>
#include <common/SetTerminalEcho.h>
#include <ext/scope_guard.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
struct ConnectionParameters
{
String host;
UInt16 port;
String default_database;
String user;
String password;
Protocol::Secure security;
Protocol::Compression compression;
ConnectionTimeouts timeouts;
ConnectionParameters() {}
ConnectionParameters(const Poco::Util::AbstractConfiguration & config)
{
bool is_secure = config.getBool("secure", false);
security = is_secure
? Protocol::Secure::Enable
: Protocol::Secure::Disable;
host = config.getString("host", "localhost");
port = config.getInt("port",
config.getInt(is_secure ? "tcp_port_secure" : "tcp_port",
is_secure ? DBMS_DEFAULT_SECURE_PORT : DBMS_DEFAULT_PORT));
default_database = config.getString("database", "");
user = config.getString("user", "");
if (config.getBool("ask-password", false))
{
if (config.has("password"))
throw Exception("Specified both --password and --ask-password. Remove one of them", ErrorCodes::BAD_ARGUMENTS);
std::cout << "Password for user " << user << ": ";
SetTerminalEcho(false);
SCOPE_EXIT({
SetTerminalEcho(true);
});
std::getline(std::cin, password);
std::cout << std::endl;
}
else
{
password = config.getString("password", "");
}
compression = config.getBool("compression", true)
? Protocol::Compression::Enable
: Protocol::Compression::Disable;
timeouts = ConnectionTimeouts(
Poco::Timespan(config.getInt("connect_timeout", DBMS_DEFAULT_CONNECT_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("receive_timeout", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0),
Poco::Timespan(config.getInt("send_timeout", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
}
};
}
#pragma once
#include "ConnectionParameters.h"
#include <string>
#include <sstream>
#include <string.h>
#include <vector>
#include <algorithm>
#include <ext/singleton.h>
#include <common/readline_use.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnString.h>
#include <Client/Connection.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_PACKET_FROM_SERVER;
}
class Suggest : public ext::singleton<Suggest>
{
private:
/// The vector will be filled with completion words from the server and sorted.
using Words = std::vector<std::string>;
/// Keywords may be not up to date with ClickHouse parser.
Words words
{
"CREATE", "DATABASE", "IF", "NOT", "EXISTS", "TEMPORARY", "TABLE", "ON", "CLUSTER", "DEFAULT", "MATERIALIZED", "ALIAS", "ENGINE",
"AS", "VIEW", "POPULATE", "SETTINGS", "ATTACH", "DETACH", "DROP", "RENAME", "TO", "ALTER", "ADD", "MODIFY", "CLEAR", "COLUMN", "AFTER",
"COPY", "PROJECT", "PRIMARY", "KEY", "CHECK", "PARTITION", "PART", "FREEZE", "FETCH", "FROM", "SHOW", "INTO", "OUTFILE", "FORMAT", "TABLES",
"DATABASES", "LIKE", "PROCESSLIST", "CASE", "WHEN", "THEN", "ELSE", "END", "DESCRIBE", "DESC", "USE", "SET", "OPTIMIZE", "FINAL", "DEDUPLICATE",
"INSERT", "VALUES", "SELECT", "DISTINCT", "SAMPLE", "ARRAY", "JOIN", "GLOBAL", "LOCAL", "ANY", "ALL", "INNER", "LEFT", "RIGHT", "FULL", "OUTER",
"CROSS", "USING", "PREWHERE", "WHERE", "GROUP", "BY", "WITH", "TOTALS", "HAVING", "ORDER", "COLLATE", "LIMIT", "UNION", "AND", "OR", "ASC", "IN",
"KILL", "QUERY", "SYNC", "ASYNC", "TEST"
};
/// Words are fetched asynchonously.
std::thread loading_thread;
std::atomic<bool> ready{false};
/// Points to current word to suggest.
Words::const_iterator pos;
/// Points after the last possible match.
Words::const_iterator end;
/// Set iterators to the matched range of words if any.
void findRange(const char * prefix, size_t prefix_length)
{
std::string prefix_str(prefix);
std::tie(pos, end) = std::equal_range(words.begin(), words.end(), prefix_str,
[prefix_length](const std::string & s, const std::string & prefix) { return strncmp(s.c_str(), prefix.c_str(), prefix_length) < 0; });
}
/// Iterates through matched range.
char * nextMatch()
{
if (pos >= end)
return nullptr;
/// readline will free memory by itself.
char * word = strdup(pos->c_str());
++pos;
return word;
}
void loadImpl(Connection & connection, size_t suggestion_limit)
{
std::stringstream query;
query << "SELECT DISTINCT arrayJoin(extractAll(name, '[\\\\w_]{2,}')) AS res FROM ("
"SELECT name FROM system.functions"
" UNION ALL "
"SELECT name FROM system.table_engines"
" UNION ALL "
"SELECT name FROM system.formats"
" UNION ALL "
"SELECT name FROM system.table_functions"
" UNION ALL "
"SELECT name FROM system.data_type_families"
" UNION ALL "
"SELECT concat(func.name, comb.name) FROM system.functions AS func CROSS JOIN system.aggregate_function_combinators AS comb WHERE is_aggregate";
/// The user may disable loading of databases, tables, columns by setting suggestion_limit to zero.
if (suggestion_limit > 0)
{
String limit_str = toString(suggestion_limit);
query <<
" UNION ALL "
"SELECT name FROM system.databases LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.tables LIMIT " << limit_str
<< " UNION ALL "
"SELECT DISTINCT name FROM system.columns LIMIT " << limit_str;
}
query << ") WHERE notEmpty(res)";
fetch(connection, query.str());
}
void fetch(Connection & connection, const std::string & query)
{
connection.sendQuery(query);
while (true)
{
Connection::Packet packet = connection.receivePacket();
switch (packet.type)
{
case Protocol::Server::Data:
fillWordsFromBlock(packet.block);
continue;
case Protocol::Server::Progress:
continue;
case Protocol::Server::ProfileInfo:
continue;
case Protocol::Server::Totals:
continue;
case Protocol::Server::Extremes:
continue;
case Protocol::Server::Log:
continue;
case Protocol::Server::Exception:
packet.exception->rethrow();
return;
case Protocol::Server::EndOfStream:
return;
default:
throw Exception("Unknown packet from server", ErrorCodes::UNKNOWN_PACKET_FROM_SERVER);
}
}
}
void fillWordsFromBlock(const Block & block)
{
if (!block)
return;
if (block.columns() != 1)
throw Exception("Wrong number of columns received for query to read words for suggestion", ErrorCodes::LOGICAL_ERROR);
const ColumnString & column = typeid_cast<const ColumnString &>(*block.getByPosition(0).column);
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
words.emplace_back(column.getDataAt(i).toString());
}
public:
/// More old server versions cannot execute the query above.
static constexpr int MIN_SERVER_REVISION = 54406;
void load(const ConnectionParameters & connection_parameters, size_t suggestion_limit)
{
loading_thread = std::thread([connection_parameters, suggestion_limit, this]
{
try
{
Connection connection(
connection_parameters.host,
connection_parameters.port,
connection_parameters.default_database,
connection_parameters.user,
connection_parameters.password,
connection_parameters.timeouts,
"client",
connection_parameters.compression,
connection_parameters.security);
loadImpl(connection, suggestion_limit);
}
catch (...)
{
std::cerr << "Cannot load data for command line suggestions: " << getCurrentExceptionMessage(false) << "\n";
}
/// Note that keyword suggestions are available even if we cannot load data from server.
std::sort(words.begin(), words.end());
ready = true;
});
}
/// A function for readline.
static char * generator(const char * text, int state)
{
Suggest & suggest = Suggest::instance();
if (!suggest.ready)
return nullptr;
if (state == 0)
suggest.findRange(text, strlen(text));
/// Do not append whitespace after word. For unknown reason, rl_completion_append_character = '\0' does not work.
rl_completion_suppress_append = 1;
return suggest.nextMatch();
}
~Suggest()
{
if (loading_thread.joinable())
loading_thread.join();
}
};
}
add_library (clickhouse-compressor-lib ${SPLIT_SHARED} Compressor.cpp)
add_library (clickhouse-compressor-lib ${LINK_MODE} Compressor.cpp)
target_link_libraries (clickhouse-compressor-lib clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (CLICKHOUSE_SPLIT_BINARY)
......
......@@ -363,8 +363,8 @@ struct TaskCluster
struct MultiTransactionInfo
{
int32_t code;
zkutil::Requests requests;
zkutil::Responses responses;
Coordination::Requests requests;
Coordination::Responses responses;
};
......@@ -373,7 +373,7 @@ struct MultiTransactionInfo
static MultiTransactionInfo checkNoNodeAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
const String & checking_node_path,
zkutil::RequestPtr && op)
Coordination::RequestPtr && op)
{
MultiTransactionInfo info;
info.requests.emplace_back(zkutil::makeCreateRequest(checking_node_path, "", zkutil::CreateMode::Persistent));
......@@ -742,7 +742,7 @@ public:
{
auto zookeeper = context.getZooKeeper();
task_description_watch_callback = [this] (const ZooKeeperImpl::ZooKeeper::WatchResponse &)
task_description_watch_callback = [this] (const Coordination::WatchResponse &)
{
UInt64 version = ++task_descprtion_version;
LOG_DEBUG(log, "Task description should be updated, local version " << version);
......@@ -902,7 +902,7 @@ public:
task_description_watch_zookeeper = zookeeper;
String task_config_str;
zkutil::Stat stat;
Coordination::Stat stat;
int code;
zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
......@@ -1052,7 +1052,7 @@ protected:
{
updateConfigIfNeeded();
zkutil::Stat stat;
Coordination::Stat stat;
zookeeper->get(workers_version_path, &stat);
auto version = stat.version;
zookeeper->get(workers_path, &stat);
......@@ -1070,16 +1070,16 @@ protected:
}
else
{
zkutil::Requests ops;
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(workers_version_path, description, version));
ops.emplace_back(zkutil::makeCreateRequest(current_worker_path, description, zkutil::CreateMode::Ephemeral));
zkutil::Responses responses;
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == ZooKeeperImpl::ZooKeeper::ZOK || code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
if (code == Coordination::ZOK || code == Coordination::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION)
if (code == Coordination::ZBADVERSION)
{
++num_bad_version_errors;
......@@ -1093,7 +1093,7 @@ protected:
}
}
else
throw zkutil::KeeperException(code);
throw Coordination::Exception(code);
}
}
}
......@@ -1157,7 +1157,7 @@ protected:
zxid2.push_back(res.stat.pzxid);
}
}
catch (const zkutil::KeeperException & e)
catch (const Coordination::Exception & e)
{
LOG_INFO(log, "A ZooKeeper error occurred while checking partition " << partition_name
<< ". Will recheck the partition. Error: " << e.displayText());
......@@ -1242,9 +1242,9 @@ protected:
{
cleaner_holder = zkutil::EphemeralNodeHolder::create(dirt_cleaner_path, *zookeeper, host_id);
}
catch (const zkutil::KeeperException & e)
catch (const Coordination::Exception & e)
{
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
if (e.code == Coordination::ZNODEEXISTS)
{
LOG_DEBUG(log, "Partition " << task_partition.name << " is cleaning now by somebody, sleep");
std::this_thread::sleep_for(default_sleep_time);
......@@ -1254,7 +1254,7 @@ protected:
throw;
}
zkutil::Stat stat;
Coordination::Stat stat;
if (zookeeper->exists(current_partition_active_workers_dir, &stat))
{
if (stat.numChildren != 0)
......@@ -1291,7 +1291,7 @@ protected:
}
/// Remove the locking node
zkutil::Requests requests;
Coordination::Requests requests;
requests.emplace_back(zkutil::makeRemoveRequest(dirt_cleaner_path, -1));
requests.emplace_back(zkutil::makeRemoveRequest(is_dirty_flag_path, -1));
zookeeper->multi(requests);
......@@ -1503,8 +1503,8 @@ protected:
auto create_is_dirty_node = [&] ()
{
auto code = zookeeper->tryCreate(is_dirty_flag_path, current_task_status_path, zkutil::CreateMode::Persistent);
if (code && code != ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
throw zkutil::KeeperException(code, is_dirty_flag_path);
if (code && code != Coordination::ZNODEEXISTS)
throw Coordination::Exception(code, is_dirty_flag_path);
};
/// Returns SELECT query filtering current partition and applying user filter
......@@ -1552,9 +1552,9 @@ protected:
{
partition_task_node_holder = zkutil::EphemeralNodeHolder::create(current_task_is_active_path, *zookeeper, host_id);
}
catch (const zkutil::KeeperException & e)
catch (const Coordination::Exception & e)
{
if (e.code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
if (e.code == Coordination::ZNODEEXISTS)
{
LOG_DEBUG(log, "Someone is already processing " << current_task_is_active_path);
return PartitionTaskStatus::Active;
......@@ -1605,7 +1605,7 @@ protected:
if (count != 0)
{
zkutil::Stat stat_shards;
Coordination::Stat stat_shards;
zookeeper->get(task_partition.getPartitionShardsPath(), &stat_shards);
if (stat_shards.numChildren == 0)
......@@ -1698,7 +1698,7 @@ protected:
output = io_insert.out;
}
std::future<zkutil::ExistsResponse> future_is_dirty_checker;
std::future<Coordination::ExistsResponse> future_is_dirty_checker;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
constexpr size_t check_period_milliseconds = 500;
......@@ -1716,9 +1716,9 @@ protected:
/// Otherwise, the insertion will slow a little bit
if (watch.elapsedMilliseconds() >= check_period_milliseconds)
{
zkutil::ExistsResponse status = future_is_dirty_checker.get();
Coordination::ExistsResponse status = future_is_dirty_checker.get();
if (status.error != ZooKeeperImpl::ZooKeeper::ZNONODE)
if (status.error != Coordination::ZNONODE)
throw Exception("Partition is dirty, cancel INSERT SELECT", ErrorCodes::UNFINISHED);
}
......@@ -2023,13 +2023,13 @@ private:
/// Auto update config stuff
UInt64 task_descprtion_current_version = 1;
std::atomic<UInt64> task_descprtion_version{1};
zkutil::WatchCallback task_description_watch_callback;
Coordination::WatchCallback task_description_watch_callback;
/// ZooKeeper session used to set the callback
zkutil::ZooKeeperPtr task_description_watch_zookeeper;
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
zkutil::Stat task_descprtion_current_stat;
Coordination::Stat task_descprtion_current_stat;
std::unique_ptr<TaskCluster> task_cluster;
......
add_library (clickhouse-extract-from-config-lib ${SPLIT_SHARED} ExtractFromConfig.cpp)
add_library (clickhouse-extract-from-config-lib ${LINK_MODE} ExtractFromConfig.cpp)
target_link_libraries (clickhouse-extract-from-config-lib clickhouse_common_config clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (CLICKHOUSE_SPLIT_BINARY)
......
add_library (clickhouse-format-lib ${SPLIT_SHARED} Format.cpp)
add_library (clickhouse-format-lib ${LINK_MODE} Format.cpp)
target_link_libraries (clickhouse-format-lib dbms clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
if (CLICKHOUSE_SPLIT_BINARY)
add_executable (clickhouse-format clickhouse-format.cpp)
......
......@@ -104,8 +104,8 @@ try
if (!config().has("query") && !config().has("table-structure")) /// Nothing to process
{
if (!config().hasOption("silent"))
std::cerr << "There are no queries to process." << std::endl;
if (config().hasOption("verbose"))
std::cerr << "There are no queries to process." << '\n';
return Application::EXIT_OK;
}
......@@ -200,8 +200,7 @@ try
}
catch (const Exception & e)
{
if (!config().hasOption("silent"))
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace"));
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
......@@ -274,7 +273,7 @@ void LocalServer::processQueries()
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(*context);
bool echo_query = config().hasOption("echo") || config().hasOption("verbose");
bool echo_queries = config().hasOption("echo") || config().hasOption("verbose");
std::exception_ptr exception;
for (const auto & query : queries)
......@@ -282,8 +281,12 @@ void LocalServer::processQueries()
ReadBufferFromString read_buf(query);
WriteBufferFromFileDescriptor write_buf(STDOUT_FILENO);
if (echo_query)
std::cerr << query << "\n";
if (echo_queries)
{
writeString(query, write_buf);
writeChar('\n', write_buf);
write_buf.next();
}
try
{
......@@ -297,8 +300,7 @@ void LocalServer::processQueries()
if (!exception)
exception = std::current_exception();
if (!config().has("silent"))
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace"));
std::cerr << getCurrentExceptionMessage(config().hasOption("stacktrace")) << '\n';
}
}
......@@ -360,7 +362,7 @@ void LocalServer::setupUsers()
static void showClientVersion()
{
std::cout << DBMS_NAME << " client version " << VERSION_STRING << "." << std::endl;
std::cout << DBMS_NAME << " client version " << VERSION_STRING << "." << '\n';
}
std::string LocalServer::getHelpHeader() const
......@@ -421,7 +423,6 @@ void LocalServer::init(int argc, char ** argv)
("format,f", po::value<std::string>(), "default output format (clickhouse-client compatibility)")
("output-format", po::value<std::string>(), "default output format")
("silent,s", "quiet mode, do not print errors")
("stacktrace", "print stack traces of exceptions")
("echo", "print query before execution")
("verbose", "print query and other debugging info")
......@@ -477,8 +478,6 @@ void LocalServer::init(int argc, char ** argv)
if (options.count("output-format"))
config().setString("output-format", options["output-format"].as<std::string>());
if (options.count("silent"))
config().setBool("silent", true);
if (options.count("stacktrace"))
config().setBool("stacktrace", true);
if (options.count("echo"))
......@@ -507,7 +506,7 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
......
......@@ -10,6 +10,9 @@
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <Common/HTMLForm.h>
#include <Parsers/ParserQueryWithOutput.h>
#include <Parsers/parseQuery.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include "validateODBCConnectionString.h"
......@@ -51,7 +54,8 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
......@@ -68,9 +72,15 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
process_error("No 'connection_string' in request URL");
return;
}
std::string schema_name = "";
std::string table_name = params.get("table");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
if (params.has("schema"))
{
schema_name = params.get("schema");
LOG_TRACE(log, "Will fetch info for table '" << schema_name + "." + table_name << "'");
} else
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
LOG_TRACE(log, "Got connection str '" << connection_string << "'");
try
......@@ -86,7 +96,18 @@ void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & reques
SCOPE_EXIT(SQLFreeStmt(hstmt, SQL_DROP));
/// TODO Why not do SQLColumns instead?
std::string query = "SELECT * FROM " + table_name + " WHERE 1 = 0";
std::string name = schema_name.empty() ? table_name : schema_name + "." + table_name;
std::stringstream ss;
std::string input = "SELECT * FROM " + name + " WHERE 1 = 0";
ParserQueryWithOutput parser;
ASTPtr select = parseQuery(parser, input.data(), input.data() + input.size(), "", 0);
IAST::FormatSettings settings(ss, true);
settings.always_quote_identifiers = true;
settings.identifier_quoting_style = IdentifierQuotingStyle::DoubleQuotes;
select->format(settings);
std::string query = ss.str();
if (Poco::Data::ODBC::Utility::isError(Poco::Data::ODBC::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(&query[0]), query.size())))
throw Poco::Data::ODBC::DescriptorException(session.dbc());
......
......@@ -49,7 +49,8 @@ void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Ne
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
auto process_error = [&response, this](const std::string & message)
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
......
add_library (clickhouse-performance-test-lib ${SPLIT_SHARED} PerformanceTest.cpp)
add_library (clickhouse-performance-test-lib ${LINK_MODE} PerformanceTest.cpp)
target_link_libraries (clickhouse-performance-test-lib clickhouse_common_io dbms ${Boost_PROGRAM_OPTIONS_LIBRARY})
target_include_directories (clickhouse-performance-test-lib SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
......
......@@ -88,7 +88,7 @@ void TCPHandler::runImpl()
try
{
/// We try to send error information to the client.
sendException(e);
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
}
catch (...) {}
......@@ -103,7 +103,7 @@ void TCPHandler::runImpl()
Exception e("Database " + default_database + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_ERROR(log, "Code: " << e.code() << ", e.displayText() = " << e.displayText()
<< ", Stack trace:\n\n" << e.getStackTrace().toString());
sendException(e);
sendException(e, connection_context.getSettingsRef().calculate_text_stack_trace);
return;
}
......@@ -133,6 +133,8 @@ void TCPHandler::runImpl()
std::unique_ptr<Exception> exception;
bool network_error = false;
bool send_exception_with_stack_trace = connection_context.getSettingsRef().calculate_text_stack_trace;
try
{
/// Restore context of request.
......@@ -149,6 +151,8 @@ void TCPHandler::runImpl()
CurrentThread::initializeQuery();
send_exception_with_stack_trace = query_context.getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client?
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& query_context.getSettingsRef().send_logs_level.value != "none")
......@@ -158,7 +162,8 @@ void TCPHandler::runImpl()
CurrentThread::attachInternalTextLogsQueue(state.logs_queue);
}
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context) {
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context)
{
if (&context != &query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
......@@ -245,7 +250,7 @@ void TCPHandler::runImpl()
tryLogCurrentException(log, "Can't send logs to client");
}
sendException(*exception);
sendException(*exception, send_exception_with_stack_trace);
}
}
catch (...)
......@@ -829,10 +834,10 @@ void TCPHandler::sendLogData(const Block & block)
}
void TCPHandler::sendException(const Exception & e)
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{
writeVarUInt(Protocol::Server::Exception, *out);
writeException(e, *out);
writeException(e, *out, with_stack_trace);
out->next();
}
......
......@@ -146,7 +146,7 @@ private:
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendException(const Exception & e);
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();
void sendEndOfStream();
......
......@@ -48,7 +48,7 @@ private:
Mean mean;
Weight weight;
WeightedValue operator+ (const WeightedValue& other)
WeightedValue operator+ (const WeightedValue & other)
{
return {mean + other.weight * (other.mean - mean) / (other.weight + weight), other.weight + weight};
}
......@@ -263,7 +263,7 @@ public:
compress(max_bins);
}
void merge(const AggregateFunctionHistogramData& other, UInt32 max_bins)
void merge(const AggregateFunctionHistogramData & other, UInt32 max_bins)
{
lower_bound = std::min(lower_bound, other.lower_bound);
upper_bound = std::max(lower_bound, other.upper_bound);
......@@ -354,7 +354,7 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto& data = this->data(const_cast<AggregateDataPtr>(place));
auto & data = this->data(const_cast<AggregateDataPtr>(place));
auto & to_array = static_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = to_array.getOffsets();
......
......@@ -14,7 +14,10 @@ AggregateFunctionPtr createAggregateFunctionRetention(const std::string & name,
{
assertNoParameters(name, params);
if (arguments.size() > AggregateFunctionRetentionData::max_events )
if (arguments.size() < 2)
throw Exception("Not enough event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (arguments.size() > AggregateFunctionRetentionData::max_events)
throw Exception("Too many event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionRetention>(arguments);
......
......@@ -126,19 +126,23 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & data_to = static_cast<ColumnArray &>(to).getData();
auto & data_to = static_cast<ColumnUInt8 &>(static_cast<ColumnArray &>(to).getData()).getData();
auto & offsets_to = static_cast<ColumnArray &>(to).getOffsets();
ColumnArray::Offset current_offset = data_to.size();
data_to.resize(current_offset + events_size);
const bool first_flag = this->data(place).events.test(0);
data_to.insert(first_flag ? Field(static_cast<UInt64>(1)) : Field(static_cast<UInt64>(0)));
for (const auto i : ext::range(1, events_size))
data_to[current_offset] = first_flag;
++current_offset;
for (size_t i = 1; i < events_size; ++i)
{
if (first_flag && this->data(place).events.test(i))
data_to.insert(Field(static_cast<UInt64>(1)));
else
data_to.insert(Field(static_cast<UInt64>(0)));
data_to[current_offset] = (first_flag && this->data(place).events.test(i));
++current_offset;
}
offsets_to.push_back(offsets_to.size() == 0 ? events_size : offsets_to.back() + events_size);
offsets_to.push_back(current_offset);
}
const char * getHeaderFilePath() const override
......
# TODO: make separate lib datastream, block, ...
#include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
#add_headers_and_sources(clickhouse_client .)
#add_library(clickhouse_client ${SPLIT_SHARED} ${clickhouse_client_headers} ${clickhouse_client_sources})
#add_library(clickhouse_client ${LINK_MODE} ${clickhouse_client_headers} ${clickhouse_client_sources})
#target_link_libraries (clickhouse_client clickhouse_common_io ${Poco_Net_LIBRARY})
#target_include_directories (clickhouse_client PRIVATE ${DBMS_INCLUDE_DIR})
......@@ -68,7 +68,7 @@ public:
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override
{
auto & column_unique = static_cast<const IColumnUnique&>(rhs);
auto & column_unique = static_cast<const IColumnUnique &>(rhs);
return getNestedColumn()->compareAt(n, m, *column_unique.getNestedColumn(), nan_direction_hint);
}
......
......@@ -112,8 +112,10 @@ public:
std::vector<MutableColumnPtr> scatter(ColumnIndex num_columns, const Selector & selector) const override;
void gather(ColumnGathererStream & gatherer_stream) override ;
void getExtremes(Field & min, Field & max) const override {
void gather(ColumnGathererStream & gatherer_stream) override;
void getExtremes(Field & min, Field & max) const override
{
return getDictionary().index(getIndexes(), 0)->getExtremes(min, max); /// TODO: optimize
}
......
......@@ -3,7 +3,7 @@ include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_common_config .)
add_library(clickhouse_common_config ${SPLIT_SHARED} ${clickhouse_common_config_headers} ${clickhouse_common_config_sources})
add_library(clickhouse_common_config ${LINK_MODE} ${clickhouse_common_config_headers} ${clickhouse_common_config_sources})
target_link_libraries (clickhouse_common_config clickhouse_common_zookeeper string_utils)
target_include_directories (clickhouse_common_config PRIVATE ${DBMS_INCLUDE_DIR})
......@@ -540,7 +540,7 @@ ConfigProcessor::LoadedConfig ConfigProcessor::loadConfigWithZooKeeperIncludes(
if (!fallback_to_preprocessed)
throw;
const auto * zk_exception = dynamic_cast<const zkutil::KeeperException *>(ex.nested());
const auto * zk_exception = dynamic_cast<const Coordination::Exception *>(ex.nested());
if (!zk_exception)
throw;
......
......@@ -3,6 +3,7 @@
#include <sstream>
#include <IO/ReadHelpers.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Path.h>
#include <Poco/Util/AbstractConfiguration.h>
......@@ -13,10 +14,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int EXTERNAL_SERVER_IS_NOT_RESPONDING;
}
ODBCBridgeHelper::ODBCBridgeHelper(
const Configuration & config_, const Poco::Timespan & http_timeout_, const std::string & connection_string_)
: config(config_), http_timeout(http_timeout_), connection_string(connection_string_)
......@@ -29,16 +32,17 @@ ODBCBridgeHelper::ODBCBridgeHelper(
ping_url.setScheme("http");
ping_url.setPath(PING_HANDLER);
}
void ODBCBridgeHelper::startODBCBridge() const
{
Poco::Path path{config.getString("application.dir", "")};
path.setFileName("clickhouse-odbc-bridge");
path.setFileName("clickhouse");
if (!path.isFile())
throw Exception("clickhouse-odbc-bridge is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
if (!Poco::File(path).exists())
throw Exception("clickhouse binary is not found", ErrorCodes::EXTERNAL_EXECUTABLE_NOT_FOUND);
std::stringstream command;
command << path.toString() << ' ';
command << path.toString() << " odbc-bridge ";
command << "--http-port " << config.getUInt("odbc_bridge.port", DEFAULT_PORT) << ' ';
command << "--listen-host " << config.getString("odbc_bridge.listen_host", DEFAULT_HOST) << ' ';
command << "--http-timeout " << http_timeout.totalMicroseconds() << ' ';
......
......@@ -7,8 +7,11 @@
#include <sstream>
#include <Common/StackTrace.h>
#include <Common/SimpleCache.h>
#include <common/demangle.h>
/// Arcadia compatibility DEVTOOLS-3976
#if defined(BACKTRACE_INCLUDE)
#include BACKTRACE_INCLUDE
......@@ -19,12 +22,16 @@
StackTrace::StackTrace()
{
frames_size = BACKTRACE_FUNC(frames, STACK_TRACE_MAX_DEPTH);
frames_size = BACKTRACE_FUNC(frames.data(), STACK_TRACE_MAX_DEPTH);
for (size_t i = frames_size; i < STACK_TRACE_MAX_DEPTH; ++i)
frames[i] = nullptr;
}
std::string StackTrace::toString() const
std::string StackTrace::toStringImpl(const Frames & frames, size_t frames_size)
{
char ** symbols = backtrace_symbols(frames, frames_size);
char ** symbols = backtrace_symbols(frames.data(), frames_size);
std::stringstream res;
if (!symbols)
......@@ -72,3 +79,13 @@ std::string StackTrace::toString() const
free(symbols);
return res.str();
}
std::string StackTrace::toString() const
{
/// Calculation of stack trace text is extremely slow.
/// We use simple cache because otherwise the server could be overloaded by trash queries.
static SimpleCache<decltype(StackTrace::toStringImpl), &StackTrace::toStringImpl> func_cached;
return func_cached(frames, frames_size);
}
#pragma once
#include <string>
#include <array>
#define STACK_TRACE_MAX_DEPTH 32
......@@ -17,6 +18,9 @@ public:
private:
using Frame = void*;
Frame frames[STACK_TRACE_MAX_DEPTH];
using Frames = std::array<Frame, STACK_TRACE_MAX_DEPTH>;
Frames frames;
size_t frames_size;
static std::string toStringImpl(const Frames & frames, size_t frames_size);
};
......@@ -5,9 +5,10 @@ StopwatchRUsage::Timestamp StopwatchRUsage::Timestamp::current()
{
StopwatchRUsage::Timestamp res;
::rusage rusage;
::rusage rusage {};
#if !defined(__APPLE__)
::getrusage(RUSAGE_THREAD, &rusage);
#endif
res.user_ns = rusage.ru_utime.tv_sec * 1000000000UL + rusage.ru_utime.tv_usec * 1000UL;
res.sys_ns = rusage.ru_stime.tv_sec * 1000000000UL + rusage.ru_stime.tv_usec * 1000UL;
return res;
......
......@@ -5,5 +5,5 @@ include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_common_stringutils .)
add_library(string_utils ${SPLIT_SHARED} ${clickhouse_common_stringutils_headers} ${clickhouse_common_stringutils_sources})
add_library(string_utils ${LINK_MODE} ${clickhouse_common_stringutils_headers} ${clickhouse_common_stringutils_sources})
target_include_directories (string_utils PRIVATE ${DBMS_INCLUDE_DIR})
#pragma once
#include <string_view>
using StringView = std::string_view;
......@@ -4,12 +4,7 @@
#include <Core/Types.h>
#include <boost/noncopyable.hpp>
#if defined(__linux__)
struct taskstats;
#else
struct taskstats {};
#endif
namespace DB
{
......
......@@ -8,6 +8,8 @@
#if defined(__linux__)
#include <linux/taskstats.h>
#else
struct taskstats {};
#endif
......@@ -87,8 +89,10 @@ struct RUsageCounters
static RUsageCounters current(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
::rusage rusage;
::rusage rusage {};
#if !defined(__APPLE__)
::getrusage(RUSAGE_THREAD, &rusage);
#endif
return RUsageCounters(rusage, real_time_);
}
......@@ -154,7 +158,7 @@ struct TasksStatsCounters
{
::taskstats stat;
static TasksStatsCounters current() { return {}; }
static TasksStatsCounters current();
static void incrementProfileEvents(const TasksStatsCounters &, const TasksStatsCounters &, ProfileEvents::Counters &) {}
static void updateProfileEvents(TasksStatsCounters &, ProfileEvents::Counters &) {}
};
......
......@@ -2,7 +2,7 @@ include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_common_zookeeper .)
add_library(clickhouse_common_zookeeper ${SPLIT_SHARED} ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
add_library(clickhouse_common_zookeeper ${LINK_MODE} ${clickhouse_common_zookeeper_headers} ${clickhouse_common_zookeeper_sources})
target_link_libraries (clickhouse_common_zookeeper clickhouse_common_io)
......
#include <string.h>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/IKeeper.h>
namespace DB
{
namespace ErrorCodes
{
extern const int KEEPER_EXCEPTION;
}
}
namespace ProfileEvents
{
extern const Event ZooKeeperUserExceptions;
extern const Event ZooKeeperHardwareExceptions;
extern const Event ZooKeeperOtherExceptions;
}
namespace Coordination
{
Exception::Exception(const std::string & msg, const int32_t code, int)
: DB::Exception(msg, DB::ErrorCodes::KEEPER_EXCEPTION), code(code)
{
if (Coordination::isUserError(code))
ProfileEvents::increment(ProfileEvents::ZooKeeperUserExceptions);
else if (Coordination::isHardwareError(code))
ProfileEvents::increment(ProfileEvents::ZooKeeperHardwareExceptions);
else
ProfileEvents::increment(ProfileEvents::ZooKeeperOtherExceptions);
}
Exception::Exception(const std::string & msg, const int32_t code)
: Exception(msg + " (" + errorMessage(code) + ")", code, 0)
{
}
Exception::Exception(const int32_t code)
: Exception(errorMessage(code), code, 0)
{
}
Exception::Exception(const int32_t code, const std::string & path)
: Exception(std::string{errorMessage(code)} + ", path: " + path, code, 0)
{
}
Exception::Exception(const Exception & exc)
: DB::Exception(exc), code(exc.code)
{
}
using namespace DB;
void addRootPath(String & path, const String & root_path)
{
if (path.empty())
throw Exception("Path cannot be empty", ZBADARGUMENTS);
if (path[0] != '/')
throw Exception("Path must begin with /", ZBADARGUMENTS);
if (root_path.empty())
return;
if (path.size() == 1) /// "/"
path = root_path;
else
path = root_path + path;
}
void removeRootPath(String & path, const String & root_path)
{
if (root_path.empty())
return;
if (path.size() <= root_path.size())
throw Exception("Received path is not longer than root_path", ZDATAINCONSISTENCY);
path = path.substr(root_path.size());
}
const char * errorMessage(int32_t code)
{
switch (code)
{
case ZOK: return "Ok";
case ZSYSTEMERROR: return "System error";
case ZRUNTIMEINCONSISTENCY: return "Run time inconsistency";
case ZDATAINCONSISTENCY: return "Data inconsistency";
case ZCONNECTIONLOSS: return "Connection loss";
case ZMARSHALLINGERROR: return "Marshalling error";
case ZUNIMPLEMENTED: return "Unimplemented";
case ZOPERATIONTIMEOUT: return "Operation timeout";
case ZBADARGUMENTS: return "Bad arguments";
case ZINVALIDSTATE: return "Invalid zhandle state";
case ZAPIERROR: return "API error";
case ZNONODE: return "No node";
case ZNOAUTH: return "Not authenticated";
case ZBADVERSION: return "Bad version";
case ZNOCHILDRENFOREPHEMERALS: return "No children for ephemerals";
case ZNODEEXISTS: return "Node exists";
case ZNOTEMPTY: return "Not empty";
case ZSESSIONEXPIRED: return "Session expired";
case ZINVALIDCALLBACK: return "Invalid callback";
case ZINVALIDACL: return "Invalid ACL";
case ZAUTHFAILED: return "Authentication failed";
case ZCLOSING: return "ZooKeeper is closing";
case ZNOTHING: return "(not error) no server responses to process";
case ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
}
if (code > 0)
return strerror(code);
return "unknown error";
}
bool isHardwareError(int32_t zk_return_code)
{
return zk_return_code == ZINVALIDSTATE
|| zk_return_code == ZSESSIONEXPIRED
|| zk_return_code == ZSESSIONMOVED
|| zk_return_code == ZCONNECTIONLOSS
|| zk_return_code == ZMARSHALLINGERROR
|| zk_return_code == ZOPERATIONTIMEOUT;
}
bool isUserError(int32_t zk_return_code)
{
return zk_return_code == ZNONODE
|| zk_return_code == ZBADVERSION
|| zk_return_code == ZNOCHILDRENFOREPHEMERALS
|| zk_return_code == ZNODEEXISTS
|| zk_return_code == ZNOTEMPTY;
}
void CreateRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void RemoveRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void ExistsRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void GetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void SetRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void ListRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void CheckRequest::addRootPath(const String & root_path) { Coordination::addRootPath(path, root_path); }
void MultiRequest::addRootPath(const String & root_path)
{
for (auto & request : requests)
request->addRootPath(root_path);
}
void CreateResponse::removeRootPath(const String & root_path) { Coordination::removeRootPath(path_created, root_path); }
void WatchResponse::removeRootPath(const String & root_path) { Coordination::removeRootPath(path, root_path); }
void MultiResponse::removeRootPath(const String & root_path)
{
for (auto & response : responses)
response->removeRootPath(root_path);
}
}
#pragma once
#include <Core/Types.h>
#include <Common/Exception.h>
#include <vector>
#include <memory>
#include <cstdint>
#include <functional>
/** Generic interface for ZooKeeper-like services.
* Possible examples are:
* - ZooKeeper client itself;
* - fake ZooKeeper client for testing;
* - ZooKeeper emulation layer on top of Etcd, FoundationDB, whatever.
*/
namespace Coordination
{
using namespace DB;
struct ACL
{
static constexpr int32_t Read = 1;
static constexpr int32_t Write = 2;
static constexpr int32_t Create = 4;
static constexpr int32_t Delete = 8;
static constexpr int32_t Admin = 16;
static constexpr int32_t All = 0x1F;
int32_t permissions;
String scheme;
String id;
};
using ACLs = std::vector<ACL>;
struct Stat
{
int64_t czxid;
int64_t mzxid;
int64_t ctime;
int64_t mtime;
int32_t version;
int32_t cversion;
int32_t aversion;
int64_t ephemeralOwner;
int32_t dataLength;
int32_t numChildren;
int64_t pzxid;
};
struct Request;
using RequestPtr = std::shared_ptr<Request>;
using Requests = std::vector<RequestPtr>;
struct Request
{
virtual ~Request() {}
virtual String getPath() const = 0;
virtual void addRootPath(const String & /* root_path */) {}
};
struct Response;
using ResponsePtr = std::shared_ptr<Response>;
using Responses = std::vector<ResponsePtr>;
using ResponseCallback = std::function<void(const Response &)>;
struct Response
{
int32_t error = 0;
virtual ~Response() {}
virtual void removeRootPath(const String & /* root_path */) {}
};
struct WatchResponse : virtual Response
{
int32_t type = 0;
int32_t state = 0;
String path;
void removeRootPath(const String & root_path) override;
};
using WatchCallback = std::function<void(const WatchResponse &)>;
struct CreateRequest : virtual Request
{
String path;
String data;
bool is_ephemeral = false;
bool is_sequential = false;
ACLs acls;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct CreateResponse : virtual Response
{
String path_created;
void removeRootPath(const String & root_path) override;
};
struct RemoveRequest : virtual Request
{
String path;
int32_t version = -1;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct RemoveResponse : virtual Response
{
};
struct ExistsRequest : virtual Request
{
String path;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct ExistsResponse : virtual Response
{
Stat stat;
};
struct GetRequest : virtual Request
{
String path;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct GetResponse : virtual Response
{
String data;
Stat stat;
};
struct SetRequest : virtual Request
{
String path;
String data;
int32_t version = -1;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct SetResponse : virtual Response
{
Stat stat;
};
struct ListRequest : virtual Request
{
String path;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct ListResponse : virtual Response
{
std::vector<String> names;
Stat stat;
};
struct CheckRequest : virtual Request
{
String path;
int32_t version = -1;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct CheckResponse : virtual Response
{
};
struct MultiRequest : virtual Request
{
Requests requests;
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
};
struct MultiResponse : virtual Response
{
Responses responses;
void removeRootPath(const String & root_path) override;
};
/// This response may be received only as an element of responses in MultiResponse.
struct ErrorResponse : virtual Response
{
};
using CreateCallback = std::function<void(const CreateResponse &)>;
using RemoveCallback = std::function<void(const RemoveResponse &)>;
using ExistsCallback = std::function<void(const ExistsResponse &)>;
using GetCallback = std::function<void(const GetResponse &)>;
using SetCallback = std::function<void(const SetResponse &)>;
using ListCallback = std::function<void(const ListResponse &)>;
using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
enum Error
{
ZOK = 0,
/** System and server-side errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value, but lesser than ZAPIERROR, are system errors.
*/
ZSYSTEMERROR = -1,
ZRUNTIMEINCONSISTENCY = -2, /// A runtime inconsistency was found
ZDATAINCONSISTENCY = -3, /// A data inconsistency was found
ZCONNECTIONLOSS = -4, /// Connection to the server has been lost
ZMARSHALLINGERROR = -5, /// Error while marshalling or unmarshalling data
ZUNIMPLEMENTED = -6, /// Operation is unimplemented
ZOPERATIONTIMEOUT = -7, /// Operation timeout
ZBADARGUMENTS = -8, /// Invalid arguments
ZINVALIDSTATE = -9, /// Invliad zhandle state
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value are API errors.
*/
ZAPIERROR = -100,
ZNONODE = -101, /// Node does not exist
ZNOAUTH = -102, /// Not authenticated
ZBADVERSION = -103, /// Version conflict
ZNOCHILDRENFOREPHEMERALS = -108, /// Ephemeral nodes may not have children
ZNODEEXISTS = -110, /// The node already exists
ZNOTEMPTY = -111, /// The node has children
ZSESSIONEXPIRED = -112, /// The session has been expired by the server
ZINVALIDCALLBACK = -113, /// Invalid callback specified
ZINVALIDACL = -114, /// Invalid ACL specified
ZAUTHFAILED = -115, /// Client authentication failed
ZCLOSING = -116, /// ZooKeeper is closing
ZNOTHING = -117, /// (not error) no server responses to process
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
};
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
bool isHardwareError(int32_t code);
/// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here.
bool isUserError(int32_t code);
const char * errorMessage(int32_t code);
/// For watches.
enum State
{
EXPIRED_SESSION = -112,
AUTH_FAILED = -113,
CONNECTING = 1,
ASSOCIATING = 2,
CONNECTED = 3,
NOTCONNECTED = 999
};
enum Event
{
CREATED = 1,
DELETED = 2,
CHANGED = 3,
CHILD = 4,
SESSION = -1,
NOTWATCHING = -2
};
class Exception : public DB::Exception
{
private:
/// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
Exception(const std::string & msg, const int32_t code, int);
public:
explicit Exception(const int32_t code);
Exception(const std::string & msg, const int32_t code);
Exception(const int32_t code, const std::string & path);
Exception(const Exception & exc);
const char * name() const throw() override { return "Coordination::Exception"; }
const char * className() const throw() override { return "Coordination::Exception"; }
Exception * clone() const override { return new Exception(*this); }
const int32_t code;
};
/** Usage scenario:
* - create an object and issue commands;
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
* for example, just signal a condvar / fulfull a promise.
* - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
* - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event.
* - data for callbacks must be alive when ZooKeeper instance is alive.
*/
class IKeeper
{
public:
virtual ~IKeeper() {}
/// If expired, you can only destroy the object. All other methods will throw exception.
virtual bool isExpired() const = 0;
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;
/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks
/// (don't destroy callback data before it will be called).
///
/// All callbacks are executed sequentially (the execution of callbacks is serialized).
///
/// If an exception is thrown inside the callback, the session will expire,
/// and all other callbacks will be called with "Session expired" error.
virtual void create(
const String & path,
const String & data,
bool is_ephemeral,
bool is_sequential,
const ACLs & acls,
CreateCallback callback) = 0;
virtual void remove(
const String & path,
int32_t version,
RemoveCallback callback) = 0;
virtual void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch) = 0;
virtual void get(
const String & path,
GetCallback callback,
WatchCallback watch) = 0;
virtual void set(
const String & path,
const String & data,
int32_t version,
SetCallback callback) = 0;
virtual void list(
const String & path,
ListCallback callback,
WatchCallback watch) = 0;
virtual void check(
const String & path,
int32_t version,
CheckCallback callback) = 0;
virtual void multi(
const Requests & requests,
MultiCallback callback) = 0;
};
}
......@@ -20,7 +20,7 @@ public:
size_t result = 0;
std::string result_str;
zkutil::Stat stat;
Coordination::Stat stat;
bool success = false;
auto zookeeper = zookeeper_holder->getZooKeeper();
......@@ -29,11 +29,11 @@ public:
if (zookeeper->tryGet(path, result_str, &stat))
{
result = std::stol(result_str) + 1;
success = zookeeper->trySet(path, std::to_string(result), stat.version) == ZooKeeperImpl::ZooKeeper::ZOK;
success = zookeeper->trySet(path, std::to_string(result), stat.version) == Coordination::ZOK;
}
else
{
success = zookeeper->tryCreate(path, std::to_string(result), zkutil::CreateMode::Persistent) == ZooKeeperImpl::ZooKeeper::ZOK;
success = zookeeper->tryCreate(path, std::to_string(result), zkutil::CreateMode::Persistent) == Coordination::ZOK;
}
}
while (!success);
......
#pragma once
#include "Types.h"
#include <Common/ZooKeeper/Types.h>
namespace zkutil
{
using KeeperException = ZooKeeperImpl::Exception;
using KeeperException = Coordination::Exception;
class KeeperMultiException : public KeeperException
{
public:
Requests requests;
Responses responses;
Coordination::Requests requests;
Coordination::Responses responses;
size_t failed_op_index = 0;
std::string getPathForFirstFailedOp() const;
/// If it is user error throws KeeperMultiException else throws ordinary KeeperException
/// If it is ZOK does nothing
static void check(int32_t code, const Requests & requests, const Responses & responses);
static void check(int32_t code, const Coordination::Requests & requests, const Coordination::Responses & responses);
KeeperMultiException(int32_t code, const Requests & requests, const Responses & responses);
KeeperMultiException(int32_t code, const Coordination::Requests & requests, const Coordination::Responses & responses);
private:
static size_t getFailedOpIndex(int32_t code, const Responses & responses);
static size_t getFailedOpIndex(int32_t code, const Coordination::Responses & responses);
};
}
......@@ -122,7 +122,7 @@ private:
{
DB::tryLogCurrentException(log);
if (e.code == ZooKeeperImpl::ZooKeeper::ZSESSIONEXPIRED)
if (e.code == Coordination::ZSESSIONEXPIRED)
return;
}
catch (...)
......
......@@ -18,17 +18,17 @@ bool Lock::tryLock()
std::string dummy;
int32_t code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy);
if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS)
if (code == Coordination::ZNODEEXISTS)
{
locked.reset(nullptr);
}
else if (code == ZooKeeperImpl::ZooKeeper::ZOK)
else if (code == Coordination::ZOK)
{
locked.reset(new ZooKeeperHandler(zookeeper));
}
else
{
throw zkutil::KeeperException(code);
throw Coordination::Exception(code);
}
}
return bool(locked);
......@@ -50,7 +50,7 @@ Lock::Status Lock::tryCheck() const
auto zookeeper = zookeeper_holder->getZooKeeper();
Status lock_status;
Stat stat;
Coordination::Stat stat;
std::string dummy;
bool result = zookeeper->tryGet(lock_path, dummy, &stat);
if (!result)
......
#pragma once
#include <common/Types.h>
#include <future>
#include <memory>
#include <vector>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Poco/Event.h>
namespace zkutil
{
using Stat = ZooKeeperImpl::ZooKeeper::Stat;
using Strings = std::vector<std::string>;
......@@ -24,44 +24,14 @@ namespace CreateMode
using EventPtr = std::shared_ptr<Poco::Event>;
/// Callback to call when the watch fires.
/// Because callbacks are called in the single "completion" thread internal to libzookeeper,
/// they must execute as quickly as possible (preferably just set some notification).
using WatchCallback = ZooKeeperImpl::ZooKeeper::WatchCallback;
using Request = ZooKeeperImpl::ZooKeeper::Request;
using Response = ZooKeeperImpl::ZooKeeper::Response;
using RequestPtr = ZooKeeperImpl::ZooKeeper::RequestPtr;
using ResponsePtr = ZooKeeperImpl::ZooKeeper::ResponsePtr;
using Requests = ZooKeeperImpl::ZooKeeper::Requests;
using Responses = ZooKeeperImpl::ZooKeeper::Responses;
using CreateRequest = ZooKeeperImpl::ZooKeeper::CreateRequest;
using RemoveRequest = ZooKeeperImpl::ZooKeeper::RemoveRequest;
using ExistsRequest = ZooKeeperImpl::ZooKeeper::ExistsRequest;
using GetRequest = ZooKeeperImpl::ZooKeeper::GetRequest;
using SetRequest = ZooKeeperImpl::ZooKeeper::SetRequest;
using ListRequest = ZooKeeperImpl::ZooKeeper::ListRequest;
using CheckRequest = ZooKeeperImpl::ZooKeeper::CheckRequest;
using CreateResponse = ZooKeeperImpl::ZooKeeper::CreateResponse;
using RemoveResponse = ZooKeeperImpl::ZooKeeper::RemoveResponse;
using ExistsResponse = ZooKeeperImpl::ZooKeeper::ExistsResponse;
using GetResponse = ZooKeeperImpl::ZooKeeper::GetResponse;
using SetResponse = ZooKeeperImpl::ZooKeeper::SetResponse;
using ListResponse = ZooKeeperImpl::ZooKeeper::ListResponse;
using CheckResponse = ZooKeeperImpl::ZooKeeper::CheckResponse;
/// Gets multiple asynchronous results
/// Each pair, the first is path, the second is response eg. CreateResponse, RemoveResponse
template <typename R>
using AsyncResponses = std::vector<std::pair<std::string, std::future<R>>>;
RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode);
RequestPtr makeRemoveRequest(const std::string & path, int version);
RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);
RequestPtr makeCheckRequest(const std::string & path, int version);
Coordination::RequestPtr makeCreateRequest(const std::string & path, const std::string & data, int create_mode);
Coordination::RequestPtr makeRemoveRequest(const std::string & path, int version);
Coordination::RequestPtr makeSetRequest(const std::string & path, const std::string & data, int version);
Coordination::RequestPtr makeCheckRequest(const std::string & path, int version);
}
......@@ -10,6 +10,7 @@
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <port/unistd.h>
......@@ -109,20 +110,20 @@ public:
/// * The node has children.
int32_t tryRemove(const std::string & path, int32_t version = -1);
bool exists(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
bool exists(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
bool existsWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
std::string get(const std::string & path, Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Stat * stat, WatchCallback watch_callback);
std::string get(const std::string & path, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr);
std::string getWatch(const std::string & path, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
/// Doesn't not throw in the following cases:
/// * The node doesn't exist. Returns false in this case.
bool tryGet(const std::string & path, std::string & res, Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr);
bool tryGet(const std::string & path, std::string & res, Coordination::Stat * stat = nullptr, const EventPtr & watch = nullptr, int * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback, int * code = nullptr);
bool tryGetWatch(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback, int * code = nullptr);
void set(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
int32_t version = -1, Coordination::Stat * stat = nullptr);
/// Creates the node if it doesn't exist. Updates its contents otherwise.
void createOrUpdate(const std::string & path, const std::string & data, int32_t mode);
......@@ -131,34 +132,34 @@ public:
/// * The node doesn't exist.
/// * Versions do not match.
int32_t trySet(const std::string & path, const std::string & data,
int32_t version = -1, Stat * stat = nullptr);
int32_t version = -1, Coordination::Stat * stat = nullptr);
Strings getChildren(const std::string & path,
Stat * stat = nullptr,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
Strings getChildrenWatch(const std::string & path,
Stat * stat,
WatchCallback watch_callback);
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
/// Doesn't not throw in the following cases:
/// * The node doesn't exist.
int32_t tryGetChildren(const std::string & path, Strings & res,
Stat * stat = nullptr,
Coordination::Stat * stat = nullptr,
const EventPtr & watch = nullptr);
int32_t tryGetChildrenWatch(const std::string & path, Strings & res,
Stat * stat,
WatchCallback watch_callback);
Coordination::Stat * stat,
Coordination::WatchCallback watch_callback);
/// Performs several operations in a transaction.
/// Throws on every error.
Responses multi(const Requests & requests);
Coordination::Responses multi(const Coordination::Requests & requests);
/// Throws only if some operation has returned an "unexpected" error
/// - an error that would cause the corresponding try- method to throw.
int32_t tryMulti(const Requests & requests, Responses & responses);
int32_t tryMulti(const Coordination::Requests & requests, Coordination::Responses & responses);
/// Throws nothing (even session expired errors)
int32_t tryMultiNoThrow(const Requests & requests, Responses & responses);
int32_t tryMultiNoThrow(const Coordination::Requests & requests, Coordination::Responses & responses);
Int64 getClientID();
......@@ -190,24 +191,24 @@ public:
///
/// Future should not be destroyed before the result is gotten.
using FutureCreate = std::future<ZooKeeperImpl::ZooKeeper::CreateResponse>;
using FutureCreate = std::future<Coordination::CreateResponse>;
FutureCreate asyncCreate(const std::string & path, const std::string & data, int32_t mode);
using FutureGet = std::future<ZooKeeperImpl::ZooKeeper::GetResponse>;
using FutureGet = std::future<Coordination::GetResponse>;
FutureGet asyncGet(const std::string & path);
FutureGet asyncTryGet(const std::string & path);
using FutureExists = std::future<ZooKeeperImpl::ZooKeeper::ExistsResponse>;
using FutureExists = std::future<Coordination::ExistsResponse>;
FutureExists asyncExists(const std::string & path);
using FutureGetChildren = std::future<ZooKeeperImpl::ZooKeeper::ListResponse>;
using FutureGetChildren = std::future<Coordination::ListResponse>;
FutureGetChildren asyncGetChildren(const std::string & path);
using FutureSet = std::future<ZooKeeperImpl::ZooKeeper::SetResponse>;
using FutureSet = std::future<Coordination::SetResponse>;
FutureSet asyncSet(const std::string & path, const std::string & data, int32_t version = -1);
using FutureRemove = std::future<ZooKeeperImpl::ZooKeeper::RemoveResponse>;
using FutureRemove = std::future<Coordination::RemoveResponse>;
FutureRemove asyncRemove(const std::string & path, int32_t version = -1);
/// Doesn't throw in the following cases:
......@@ -216,11 +217,11 @@ public:
/// * The node has children
FutureRemove asyncTryRemove(const std::string & path, int32_t version = -1);
using FutureMulti = std::future<ZooKeeperImpl::ZooKeeper::MultiResponse>;
FutureMulti asyncMulti(const Requests & ops);
using FutureMulti = std::future<Coordination::MultiResponse>;
FutureMulti asyncMulti(const Coordination::Requests & ops);
/// Like the previous one but don't throw any exceptions on future.get()
FutureMulti tryAsyncMulti(const Requests & ops);
FutureMulti tryAsyncMulti(const Coordination::Requests & ops);
static std::string error2string(int32_t code);
......@@ -235,13 +236,13 @@ private:
/// The following methods don't throw exceptions but return error codes.
int32_t createImpl(const std::string & path, const std::string & data, int32_t mode, std::string & path_created);
int32_t removeImpl(const std::string & path, int32_t version);
int32_t getImpl(const std::string & path, std::string & res, Stat * stat, WatchCallback watch_callback);
int32_t setImpl(const std::string & path, const std::string & data, int32_t version, Stat * stat);
int32_t getChildrenImpl(const std::string & path, Strings & res, Stat * stat, WatchCallback watch_callback);
int32_t multiImpl(const Requests & requests, Responses & responses);
int32_t existsImpl(const std::string & path, Stat * stat_, WatchCallback watch_callback);
int32_t getImpl(const std::string & path, std::string & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
int32_t setImpl(const std::string & path, const std::string & data, int32_t version, Coordination::Stat * stat);
int32_t getChildrenImpl(const std::string & path, Strings & res, Coordination::Stat * stat, Coordination::WatchCallback watch_callback);
int32_t multiImpl(const Coordination::Requests & requests, Coordination::Responses & responses);
int32_t existsImpl(const std::string & path, Coordination::Stat * stat_, Coordination::WatchCallback watch_callback);
std::unique_ptr<ZooKeeperImpl::ZooKeeper> impl;
std::unique_ptr<Coordination::IKeeper> impl;
std::string hosts;
std::string identity;
......
......@@ -20,7 +20,7 @@ public:
/// вызывать из одного потока - не thread safe
template <typename... Args>
void init(Args&&... args);
void init(Args &&... args);
/// был ли класс инициализирован
bool isInitialized() const { return ptr != nullptr; }
......@@ -76,7 +76,7 @@ private:
};
template <typename... Args>
void ZooKeeperHolder::init(Args&&... args)
void ZooKeeperHolder::init(Args &&... args)
{
ptr = std::make_shared<ZooKeeper>(std::forward<Args>(args)...);
}
......
......@@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
......@@ -78,349 +79,25 @@ namespace CurrentMetrics
}
namespace ZooKeeperImpl
namespace Coordination
{
using namespace DB;
struct ZooKeeperRequest;
class Exception : public DB::Exception
{
private:
/// Delegate constructor, used to minimize repetition; last parameter used for overload resolution.
Exception(const std::string & msg, const int32_t code, int);
public:
explicit Exception(const int32_t code);
Exception(const std::string & msg, const int32_t code);
Exception(const int32_t code, const std::string & path);
Exception(const Exception & exc);
const char * name() const throw() override { return "ZooKeeperImpl::Exception"; }
const char * className() const throw() override { return "ZooKeeperImpl::Exception"; }
Exception * clone() const override { return new Exception(*this); }
const int32_t code;
};
/** Usage scenario:
* - create an object and issue commands;
* - you provide callbacks for your commands; callbacks are invoked in internal thread and must be cheap:
* for example, just signal a condvar / fulfull a promise.
* - you also may provide callbacks for watches; they are also invoked in internal thread and must be cheap.
* - whenever you receive exception with ZSESSIONEXPIRED code or method isExpired returns true,
* the ZooKeeper instance is no longer usable - you may only destroy it and probably create another.
* - whenever session is expired or ZooKeeper instance is destroying, all callbacks are notified with special event.
* - data for callbacks must be alive when ZooKeeper instance is alive.
/** Usage scenario: look at the documentation for IKeeper class.
*/
class ZooKeeper
class ZooKeeper : public IKeeper
{
public:
using Addresses = std::vector<Poco::Net::SocketAddress>;
struct ACL
{
static constexpr int32_t Read = 1;
static constexpr int32_t Write = 2;
static constexpr int32_t Create = 4;
static constexpr int32_t Delete = 8;
static constexpr int32_t Admin = 16;
static constexpr int32_t All = 0x1F;
int32_t permissions;
String scheme;
String id;
void write(WriteBuffer & out) const;
};
using ACLs = std::vector<ACL>;
struct Stat
{
int64_t czxid;
int64_t mzxid;
int64_t ctime;
int64_t mtime;
int32_t version;
int32_t cversion;
int32_t aversion;
int64_t ephemeralOwner;
int32_t dataLength;
int32_t numChildren;
int64_t pzxid;
void read(ReadBuffer & in);
};
using XID = int32_t;
using OpNum = int32_t;
struct Response;
using ResponsePtr = std::shared_ptr<Response>;
using Responses = std::vector<ResponsePtr>;
using ResponseCallback = std::function<void(const Response &)>;
struct Response
{
int32_t error = 0;
virtual ~Response() {}
virtual void readImpl(ReadBuffer &) = 0;
virtual void removeRootPath(const String & /* root_path */) {}
};
struct Request;
using RequestPtr = std::shared_ptr<Request>;
using Requests = std::vector<RequestPtr>;
struct Request
{
XID xid = 0;
bool has_watch = false;
virtual ~Request() {}
virtual RequestPtr clone() const = 0;
virtual OpNum getOpNum() const = 0;
/// Writes length, xid, op_num, then the rest.
void write(WriteBuffer & out) const;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual ResponsePtr makeResponse() const = 0;
virtual void addRootPath(const String & /* root_path */) {}
virtual String getPath() const = 0;
};
struct HeartbeatRequest final : Request
{
RequestPtr clone() const override { return std::make_shared<HeartbeatRequest>(*this); }
OpNum getOpNum() const override { return 11; }
void writeImpl(WriteBuffer &) const override {}
ResponsePtr makeResponse() const override;
String getPath() const override { return {}; }
};
struct HeartbeatResponse final : Response
{
void readImpl(ReadBuffer &) override {}
};
struct WatchResponse final : Response
{
int32_t type = 0;
int32_t state = 0;
String path;
void readImpl(ReadBuffer &) override;
void removeRootPath(const String & root_path) override;
};
using WatchCallback = std::function<void(const WatchResponse &)>;
struct AuthRequest final : Request
{
int32_t type = 0; /// ignored by the server
String scheme;
String data;
RequestPtr clone() const override { return std::make_shared<AuthRequest>(*this); }
OpNum getOpNum() const override { return 100; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
String getPath() const override { return {}; }
};
struct AuthResponse final : Response
{
void readImpl(ReadBuffer &) override {}
};
struct CloseRequest final : Request
{
RequestPtr clone() const override { return std::make_shared<CloseRequest>(*this); }
OpNum getOpNum() const override { return -11; }
void writeImpl(WriteBuffer &) const override {}
ResponsePtr makeResponse() const override;
String getPath() const override { return {}; }
};
struct CloseResponse final : Response
{
void readImpl(ReadBuffer &) override;
};
struct CreateRequest final : Request
{
String path;
String data;
bool is_ephemeral = false;
bool is_sequential = false;
ACLs acls;
RequestPtr clone() const override { return std::make_shared<CreateRequest>(*this); }
OpNum getOpNum() const override { return 1; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct CreateResponse final : Response
{
String path_created;
void readImpl(ReadBuffer &) override;
void removeRootPath(const String & root_path) override;
};
struct RemoveRequest final : Request
{
String path;
int32_t version = -1;
RequestPtr clone() const override { return std::make_shared<RemoveRequest>(*this); }
OpNum getOpNum() const override { return 2; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct RemoveResponse final : Response
{
void readImpl(ReadBuffer &) override {}
};
struct ExistsRequest final : Request
{
String path;
RequestPtr clone() const override { return std::make_shared<ExistsRequest>(*this); }
OpNum getOpNum() const override { return 3; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct ExistsResponse final : Response
{
Stat stat;
void readImpl(ReadBuffer &) override;
};
struct GetRequest final : Request
{
String path;
RequestPtr clone() const override { return std::make_shared<GetRequest>(*this); }
OpNum getOpNum() const override { return 4; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct GetResponse final : Response
{
String data;
Stat stat;
void readImpl(ReadBuffer &) override;
};
struct SetRequest final : Request
{
String path;
String data;
int32_t version = -1;
RequestPtr clone() const override { return std::make_shared<SetRequest>(*this); }
OpNum getOpNum() const override { return 5; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct SetResponse final : Response
{
Stat stat;
void readImpl(ReadBuffer &) override;
};
struct ListRequest final : Request
{
String path;
RequestPtr clone() const override { return std::make_shared<ListRequest>(*this); }
OpNum getOpNum() const override { return 12; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct ListResponse final : Response
{
std::vector<String> names;
Stat stat;
void readImpl(ReadBuffer &) override;
};
struct CheckRequest final : Request
{
String path;
int32_t version = -1;
RequestPtr clone() const override { return std::make_shared<CheckRequest>(*this); }
OpNum getOpNum() const override { return 13; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
};
struct CheckResponse final : Response
{
void readImpl(ReadBuffer &) override {}
};
struct MultiRequest final : Request
{
Requests requests;
RequestPtr clone() const override;
OpNum getOpNum() const override { return 14; }
void writeImpl(WriteBuffer &) const override;
ResponsePtr makeResponse() const override;
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
};
struct MultiResponse final : Response
{
Responses responses;
MultiResponse(const Requests & requests);
void readImpl(ReadBuffer &) override;
void removeRootPath(const String & root_path) override;
};
/// This response may be received only as an element of responses in MultiResponse.
struct ErrorResponse final : Response
{
void readImpl(ReadBuffer &) override;
};
/** Connection to addresses is performed in order. If you want, shuffle them manually.
* Operation timeout couldn't be greater than session timeout.
* Operation timeout applies independently for network read, network write, waiting for events and synchronization.
......@@ -438,30 +115,13 @@ public:
/// If expired, you can only destroy the object. All other methods will throw exception.
bool isExpired() const { return expired; }
bool isExpired() const override { return expired; }
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const { return session_id; }
using CreateCallback = std::function<void(const CreateResponse &)>;
using RemoveCallback = std::function<void(const RemoveResponse &)>;
using ExistsCallback = std::function<void(const ExistsResponse &)>;
using GetCallback = std::function<void(const GetResponse &)>;
using SetCallback = std::function<void(const SetResponse &)>;
using ListCallback = std::function<void(const ListResponse &)>;
using CheckCallback = std::function<void(const CheckResponse &)>;
using MultiCallback = std::function<void(const MultiResponse &)>;
/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks
/// (don't destroy callback data before it will be called).
///
/// All callbacks are executed sequentially (the execution of callbacks is serialized).
///
/// If an exception is thrown inside the callback, the session will expire,
/// and all other callbacks will be called with "Session expired" error.
int64_t getSessionID() const override { return session_id; }
/// See the documentation about semantics of these methods in IKeeper class.
void create(
const String & path,
......@@ -469,114 +129,42 @@ public:
bool is_ephemeral,
bool is_sequential,
const ACLs & acls,
CreateCallback callback);
CreateCallback callback) override;
void remove(
const String & path,
int32_t version,
RemoveCallback callback);
RemoveCallback callback) override;
void exists(
const String & path,
ExistsCallback callback,
WatchCallback watch);
WatchCallback watch) override;
void get(
const String & path,
GetCallback callback,
WatchCallback watch);
WatchCallback watch) override;
void set(
const String & path,
const String & data,
int32_t version,
SetCallback callback);
SetCallback callback) override;
void list(
const String & path,
ListCallback callback,
WatchCallback watch);
WatchCallback watch) override;
void check(
const String & path,
int32_t version,
CheckCallback callback);
CheckCallback callback) override;
void multi(
const Requests & requests,
MultiCallback callback);
enum Error
{
ZOK = 0,
/** System and server-side errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value, but lesser than ZAPIERROR, are system errors.
*/
ZSYSTEMERROR = -1,
ZRUNTIMEINCONSISTENCY = -2, /// A runtime inconsistency was found
ZDATAINCONSISTENCY = -3, /// A data inconsistency was found
ZCONNECTIONLOSS = -4, /// Connection to the server has been lost
ZMARSHALLINGERROR = -5, /// Error while marshalling or unmarshalling data
ZUNIMPLEMENTED = -6, /// Operation is unimplemented
ZOPERATIONTIMEOUT = -7, /// Operation timeout
ZBADARGUMENTS = -8, /// Invalid arguments
ZINVALIDSTATE = -9, /// Invliad zhandle state
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
* to indicate a range. Specifically error codes greater than this
* value are API errors.
*/
ZAPIERROR = -100,
ZNONODE = -101, /// Node does not exist
ZNOAUTH = -102, /// Not authenticated
ZBADVERSION = -103, /// Version conflict
ZNOCHILDRENFOREPHEMERALS = -108, /// Ephemeral nodes may not have children
ZNODEEXISTS = -110, /// The node already exists
ZNOTEMPTY = -111, /// The node has children
ZSESSIONEXPIRED = -112, /// The session has been expired by the server
ZINVALIDCALLBACK = -113, /// Invalid callback specified
ZINVALIDACL = -114, /// Invalid ACL specified
ZAUTHFAILED = -115, /// Client authentication failed
ZCLOSING = -116, /// ZooKeeper is closing
ZNOTHING = -117, /// (not error) no server responses to process
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored
};
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
static bool isHardwareError(int32_t code);
/// Valid errors sent from the server about database state (like "no node"). Logical and authentication errors (like "bad arguments") are not here.
static bool isUserError(int32_t code);
static const char * errorMessage(int32_t code);
/// For watches.
enum State
{
EXPIRED_SESSION = -112,
AUTH_FAILED = -113,
CONNECTING = 1,
ASSOCIATING = 2,
CONNECTED = 3,
NOTCONNECTED = 999
};
enum Event
{
CREATED = 1,
DELETED = 2,
CHANGED = 3,
CHILD = 4,
SESSION = -1,
NOTWATCHING = -2
};
MultiCallback callback) override;
private:
String root_path;
......@@ -599,7 +187,7 @@ private:
struct RequestInfo
{
RequestPtr request;
std::shared_ptr<ZooKeeperRequest> request;
ResponseCallback callback;
WatchCallback watch;
clock::time_point time;
......
......@@ -41,18 +41,18 @@ std::optional<std::string> ZooKeeperNodeCache::get(const std::string & path)
if (nonexistent_nodes.count(path))
return std::nullopt;
auto watch_callback = [context=context](const ZooKeeperImpl::ZooKeeper::WatchResponse & response)
auto watch_callback = [context=context](const Coordination::WatchResponse & response)
{
if (!(response.type != ZooKeeperImpl::ZooKeeper::SESSION || response.state == ZooKeeperImpl::ZooKeeper::EXPIRED_SESSION))
if (!(response.type != Coordination::SESSION || response.state == Coordination::EXPIRED_SESSION))
return;
bool changed = false;
{
std::lock_guard<std::mutex> lock(context->mutex);
if (response.type != ZooKeeperImpl::ZooKeeper::SESSION)
if (response.type != Coordination::SESSION)
changed = context->invalidated_paths.emplace(response.path).second;
else if (response.state == ZooKeeperImpl::ZooKeeper::EXPIRED_SESSION)
else if (response.state == Coordination::EXPIRED_SESSION)
{
context->zookeeper = nullptr;
context->invalidated_paths.clear();
......
......@@ -33,7 +33,7 @@ TEST(zkutil, multi_nice_exception_msg)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
zkutil::Requests ops;
Coordination::Requests ops;
ASSERT_NO_THROW(
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
......@@ -70,7 +70,7 @@ TEST(zkutil, multi_nice_exception_msg)
TEST(zkutil, multi_async)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
zkutil::Requests ops;
Coordination::Requests ops;
zookeeper->tryRemoveRecursive("/clickhouse_test/zkutil_multi");
......@@ -88,13 +88,13 @@ TEST(zkutil, multi_async)
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.error == ZooKeeperImpl::ZooKeeper::ZOK);
ASSERT_TRUE(res.error == Coordination::ZOK);
ASSERT_EQ(res.responses.size(), 2);
}
EXPECT_ANY_THROW
(
std::vector<std::future<ZooKeeperImpl::ZooKeeper::MultiResponse>> futures;
std::vector<std::future<Coordination::MultiResponse>> futures;
for (size_t i = 0; i < 10000; ++i)
{
......@@ -124,7 +124,7 @@ TEST(zkutil, multi_async)
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS);
ASSERT_TRUE(res.error == Coordination::ZNODEEXISTS);
ASSERT_EQ(res.responses.size(), 2);
}
}
......@@ -176,11 +176,11 @@ TEST(zkutil, multi_create_sequential)
zookeeper->tryRemoveRecursive(base_path);
zookeeper->createAncestors(base_path + "/");
zkutil::Requests ops;
Coordination::Requests ops;
String sequential_node_prefix = base_path + "/queue-";
ops.emplace_back(zkutil::makeCreateRequest(sequential_node_prefix, "", zkutil::CreateMode::EphemeralSequential));
auto results = zookeeper->multi(ops);
const auto & sequential_node_result_op = typeid_cast<const zkutil::CreateResponse &>(*results.at(0));
const auto & sequential_node_result_op = dynamic_cast<const Coordination::CreateResponse &>(*results.at(0));
EXPECT_FALSE(sequential_node_result_op.path_created.empty());
EXPECT_GT(sequential_node_result_op.path_created.length(), sequential_node_prefix.length());
......
......@@ -34,11 +34,11 @@ int main(int argc, char ** argv)
while (true)
{
{
zkutil::Requests ops;
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest("/test/zk_expiration_test", "hello", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest("/test/zk_expiration_test", -1));
zkutil::Responses responses;
Coordination::Responses responses;
int32_t code = zk.tryMultiNoThrow(ops, responses);
std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(code) << std::endl;
......@@ -57,7 +57,7 @@ int main(int argc, char ** argv)
sleep(1);
}
}
catch (zkutil::KeeperException & e)
catch (Coordination::Exception & e)
{
std::cerr << "KeeperException: " << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
......
......@@ -23,7 +23,7 @@ try
{
while (true)
{
std::vector<std::future<zkutil::GetResponse>> futures;
std::vector<std::future<Coordination::GetResponse>> futures;
for (auto & node : nodes)
futures.push_back(zookeeper.asyncGet("/tmp/" + node));
......
......@@ -20,7 +20,7 @@ try
std::cout << "create path" << std::endl;
zk.create("/test", "old", zkutil::CreateMode::Persistent);
zkutil::Stat stat;
Coordination::Stat stat;
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
std::cout << "get path" << std::endl;
......@@ -38,13 +38,13 @@ try
zk.remove("/test");
zkutil::Requests ops;
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest("/test", "multi1", CreateMode::Persistent));
ops.emplace_back(zkutil::makeSetRequest("/test", "multi2", -1));
ops.emplace_back(zkutil::makeRemoveRequest("/test", -1));
std::cout << "multi" << std::endl;
zkutil::Responses res = zk.multi(ops);
std::cout << "path created: " << typeid_cast<const CreateResponse &>(*res[0]).path_created << std::endl;
Coordination::Responses res = zk.multi(ops);
std::cout << "path created: " << dynamic_cast<const Coordination::CreateResponse &>(*res[0]).path_created << std::endl;
return 0;
}
......
......@@ -7,7 +7,7 @@
#include <boost/algorithm/string.hpp>
using namespace ZooKeeperImpl;
using namespace Coordination;
int main(int argc, char ** argv)
......@@ -38,10 +38,10 @@ try
std::cout << "create\n";
zk.create("/test", "old", false, false, {},
[&](const ZooKeeper::CreateResponse & response)
[&](const CreateResponse & response)
{
if (response.error)
std::cerr << "Error (create) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Error (create) " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Created path: " << response.path_created << '\n';
......@@ -53,19 +53,19 @@ try
std::cout << "get\n";
zk.get("/test",
[&](const ZooKeeper::GetResponse & response)
[&](const GetResponse & response)
{
if (response.error)
std::cerr << "Error (get) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Error (get) " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Value: " << response.data << '\n';
//event.set();
},
[](const ZooKeeper::WatchResponse & response)
[](const WatchResponse & response)
{
if (response.error)
std::cerr << "Watch (get) on /test, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Watch (get) on /test, Error " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (get) on /test, path: " << response.path << ", type: " << response.type << '\n';
});
......@@ -75,10 +75,10 @@ try
std::cout << "set\n";
zk.set("/test", "new", -1,
[&](const ZooKeeper::SetResponse & response)
[&](const SetResponse & response)
{
if (response.error)
std::cerr << "Error (set) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Error (set) " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Set\n";
......@@ -90,10 +90,10 @@ try
std::cout << "list\n";
zk.list("/",
[&](const ZooKeeper::ListResponse & response)
[&](const ListResponse & response)
{
if (response.error)
std::cerr << "Error (list) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Error (list) " << response.error << ": " << errorMessage(response.error) << '\n';
else
{
std::cerr << "Children:\n";
......@@ -103,10 +103,10 @@ try
//event.set();
},
[](const ZooKeeper::WatchResponse & response)
[](const WatchResponse & response)
{
if (response.error)
std::cerr << "Watch (list) on /, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Watch (list) on /, Error " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (list) on /, path: " << response.path << ", type: " << response.type << '\n';
});
......@@ -116,19 +116,19 @@ try
std::cout << "exists\n";
zk.exists("/test",
[&](const ZooKeeper::ExistsResponse & response)
[&](const ExistsResponse & response)
{
if (response.error)
std::cerr << "Error (exists) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Error (exists) " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Exists\n";
//event.set();
},
[](const ZooKeeper::WatchResponse & response)
[](const WatchResponse & response)
{
if (response.error)
std::cerr << "Watch (exists) on /test, Error " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Watch (exists) on /test, Error " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Watch (exists) on /test, path: " << response.path << ", type: " << response.type << '\n';
});
......@@ -137,10 +137,10 @@ try
std::cout << "remove\n";
zk.remove("/test", -1, [&](const ZooKeeper::RemoveResponse & response)
zk.remove("/test", -1, [&](const RemoveResponse & response)
{
if (response.error)
std::cerr << "Error (remove) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
std::cerr << "Error (remove) " << response.error << ": " << errorMessage(response.error) << '\n';
else
std::cerr << "Removed\n";
......@@ -151,43 +151,43 @@ try
std::cout << "multi\n";
ZooKeeper::Requests ops;
Requests ops;
{
ZooKeeper::CreateRequest create_request;
CreateRequest create_request;
create_request.path = "/test";
create_request.data = "multi1";
ops.emplace_back(std::make_shared<ZooKeeper::CreateRequest>(std::move(create_request)));
ops.emplace_back(std::make_shared<CreateRequest>(std::move(create_request)));
}
{
ZooKeeper::SetRequest set_request;
SetRequest set_request;
set_request.path = "/test";
set_request.data = "multi2";
ops.emplace_back(std::make_shared<ZooKeeper::SetRequest>(std::move(set_request)));
ops.emplace_back(std::make_shared<SetRequest>(std::move(set_request)));
}
{
ZooKeeper::RemoveRequest remove_request;
RemoveRequest remove_request;
remove_request.path = "/test";
ops.emplace_back(std::make_shared<ZooKeeper::RemoveRequest>(std::move(remove_request)));
ops.emplace_back(std::make_shared<RemoveRequest>(std::move(remove_request)));
}
zk.multi(ops, [&](const ZooKeeper::MultiResponse & response)
zk.multi(ops, [&](const MultiResponse & response)
{
if (response.error)
std::cerr << "Error (multi) " << response.error << ": " << errorMessage(response.error) << '\n';
else
{
if (response.error)
std::cerr << "Error (multi) " << response.error << ": " << ZooKeeper::errorMessage(response.error) << '\n';
else
{
for (const auto & elem : response.responses)
if (elem->error)
std::cerr << "Error (elem) " << elem->error << ": " << ZooKeeper::errorMessage(elem->error) << '\n';
for (const auto & elem : response.responses)
if (elem->error)
std::cerr << "Error (elem) " << elem->error << ": " << errorMessage(elem->error) << '\n';
std::cerr << "Created path: " << typeid_cast<const ZooKeeper::CreateResponse &>(*response.responses[0]).path_created << '\n';
}
std::cerr << "Created path: " << dynamic_cast<const CreateResponse &>(*response.responses[0]).path_created << '\n';
}
event.set();
});
event.set();
});
event.wait();
return 0;
......
......@@ -5,12 +5,12 @@
int main()
try
{
ZooKeeperImpl::ZooKeeper zookeeper({Poco::Net::SocketAddress{"localhost:2181"}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000});
Coordination::ZooKeeper zookeeper({Poco::Net::SocketAddress{"localhost:2181"}}, "", "", "", {30, 0}, {0, 50000}, {0, 50000});
zookeeper.create("/test", "hello", false, false, {}, [](const ZooKeeperImpl::ZooKeeper::CreateResponse & response)
zookeeper.create("/test", "hello", false, false, {}, [](const Coordination::CreateResponse & response)
{
if (response.error)
std::cerr << "Error " << response.error << ": " << ZooKeeperImpl::ZooKeeper::errorMessage(response.error) << "\n";
std::cerr << "Error " << response.error << ": " << Coordination::errorMessage(response.error) << "\n";
else
std::cerr << "Path created: " << response.path_created << "\n";
});
......
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <thread>
#include <fstream>
#if defined(__x86_64__)
......@@ -14,25 +13,6 @@
unsigned getNumberOfPhysicalCPUCores()
{
#if defined(__linux__)
/// On Linux we try to look at Cgroups limit if it is available.
std::ifstream cgroup_read_in("/sys/fs/cgroup/cpu/cpu.cfs_quota_us");
if (cgroup_read_in.is_open())
{
std::string allocated_cpus_share_str{ std::istreambuf_iterator<char>(cgroup_read_in), std::istreambuf_iterator<char>() };
int allocated_cpus_share_int = std::stoi(allocated_cpus_share_str);
cgroup_read_in.close();
// If a valid value is present
if (allocated_cpus_share_int > 0)
{
unsigned allocated_cpus = (allocated_cpus_share_int + 999) / 1000;
return allocated_cpus;
}
}
#endif
#if defined(__x86_64__)
cpu_raw_data_t raw_data;
if (0 != cpuid_get_raw_data(&raw_data))
......
#pragma once
#include <string>
/// Maps 0..15 to 0..9A..F or 0..9a..f correspondingly.
......
......@@ -39,13 +39,17 @@ std::string getThreadName()
{
std::string name(16, '\0');
#if defined(__FreeBSD__) || defined(__APPLE__)
if (pthread_get_name_np(pthread_self(), name.data(), name.size());
throw DB::Exception("Cannot get thread name with pthread_get_name_np()", DB::ErrorCodes::PTHREAD_ERROR);
#if defined(__APPLE__)
if (pthread_getname_np(pthread_self(), name.data(), name.size()))
throw DB::Exception("Cannot get thread name with pthread_getname_np()", DB::ErrorCodes::PTHREAD_ERROR);
#elif defined(__FreeBSD__)
// TODO: make test. freebsd will have this function soon https://freshbsd.org/commit/freebsd/r337983
// if (pthread_get_name_np(pthread_self(), name.data(), name.size()))
// throw DB::Exception("Cannot get thread name with pthread_get_name_np()", DB::ErrorCodes::PTHREAD_ERROR);
#else
if (0 != prctl(PR_GET_NAME, name.data(), 0, 0, 0))
#endif
DB::throwFromErrno("Cannot get thread name with prctl(PR_GET_NAME)");
#endif
name.resize(std::strlen(name.data()));
return name;
......
......@@ -190,7 +190,7 @@ bool test_concurrent()
bool res = true;
auto load_func = [](const std::string& result, std::chrono::seconds sleep_for, bool throw_exc)
auto load_func = [](const std::string & result, std::chrono::seconds sleep_for, bool throw_exc)
{
std::this_thread::sleep_for(sleep_for);
if (throw_exc)
......
......@@ -145,9 +145,9 @@ void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex>
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
zkutil::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
Coordination::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
{
return [t = shared_from_this()](const ZooKeeperImpl::ZooKeeper::WatchResponse &)
return [t = shared_from_this()](const Coordination::WatchResponse &)
{
t->schedule();
};
......
......@@ -58,8 +58,8 @@ public:
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get zkutil::WatchCallback needed for notifications from ZooKeeper watches.
zkutil::WatchCallback getWatchCallback();
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback();
private:
friend class TaskNotification;
......
......@@ -76,16 +76,11 @@ private:
/// Cache required fields
struct Source
{
const IColumn * column;
size_t pos;
size_t size;
const IColumn * column = nullptr;
size_t pos = 0;
size_t size = 0;
Block block;
Source(Block && block_, const String & name) : block(std::move(block_))
{
update(name);
}
void update(const String & name)
{
column = block.getByName(name).column.get();
......@@ -94,7 +89,6 @@ private:
}
};
void init();
void fetchNewBlock(Source & source, size_t source_num);
String column_name;
......
......@@ -669,6 +669,9 @@ void DataTypeWithDictionary::deserializeBinaryBulkWithMultipleStreams(
}
};
if (!settings.continuous_reading)
state_with_dictionary->num_pending_rows = 0;
bool first_dictionary = true;
while (limit)
{
......
#pragma once
#include <Dictionaries/Embedded/GeodataProviders/Types.h>
#include <string>
struct RegionEntry
{
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册