提交 d7d6c4b9 编写于 作者: A Alexey Milovidov

Merge branch 'master' of github.com:yandex/ClickHouse

......@@ -45,10 +45,10 @@ message (STATUS "CMAKE_BUILD_TYPE: " ${CMAKE_BUILD_TYPE} )
set (CMAKE_CONFIGURATION_TYPES "RelWithDebInfo;Debug;Release;MinSizeRel;ASan;UBSan" CACHE STRING "" FORCE)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
set (AARCH64 1)
set (ARCH_AARCH64 1)
endif ()
if (AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")
set (ARM 1)
if (ARCH_AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")
set (ARCH_ARM 1)
endif ()
set (COMMON_WARNING_FLAGS "-Wall") # -Werror is also added inside directories with our own code.
......@@ -175,6 +175,7 @@ if (NOT OPENSSL_FOUND)
message (FATAL_ERROR "Need openssl for build. debian tip: sudo apt install libssl-dev")
endif ()
include (cmake/lib_name.cmake)
include (cmake/find_icu4c.cmake)
include (cmake/find_boost.cmake)
# openssl, zlib before poco
......@@ -183,19 +184,24 @@ include (cmake/find_zstd.cmake)
include (cmake/find_poco.cmake)
include (cmake/find_lz4.cmake)
include (cmake/find_sparsehash.cmake)
include (cmake/find_libtool.cmake)
include (cmake/find_rt.cmake)
include (cmake/find_readline_edit.cmake)
include (cmake/find_zookeeper.cmake)
include (cmake/find_double-conversion.cmake)
include (cmake/find_re2.cmake)
include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)
find_contrib_lib(metrohash)
find_contrib_lib(btrie)
find_contrib_lib(double-conversion)
# Need to process before "contrib" dir:
include (libs/libcommon/cmake/find_gperftools.cmake)
include (libs/libcommon/cmake/find_jemalloc.cmake)
include (libs/libcommon/cmake/find_cctz.cmake)
include (libs/libmysqlxx/cmake/find_mysqlclient.cmake)
include (libs/libdaemon/cmake/find_unwind.cmake)
include (cmake/lib_name.cmake)
set (FULL_C_FLAGS "${CMAKE_C_FLAGS} ${CMAKE_C_FLAGS_${CMAKE_BUILD_TYPE}}")
......
# - Try to find btrie headers and libraries.
#
# Usage of this module as follows:
#
# find_package(btrie)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# BTRIE_ROOT_DIR Set this variable to the root installation of
# btrie if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# BTRIE_FOUND System has btrie libs/headers
# BTRIE_LIBRARIES The btrie library/libraries
# BTRIE_INCLUDE_DIR The location of btrie headers
find_path(BTRIE_ROOT_DIR
NAMES include/btrie.h
)
find_library(BTRIE_LIBRARIES
NAMES btrie
PATHS ${BTRIE_ROOT_DIR}/lib ${BTRIE_LIBRARIES_PATHS}
)
find_path(BTRIE_INCLUDE_DIR
NAMES btrie.h
PATHS ${BTRIE_ROOT_DIR}/include ${BTRIE_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(btrie DEFAULT_MSG
BTRIE_LIBRARIES
BTRIE_INCLUDE_DIR
)
mark_as_advanced(
BTRIE_ROOT_DIR
BTRIE_LIBRARIES
BTRIE_INCLUDE_DIR
)
# - Try to find cityhash headers and libraries.
#
# Usage of this module as follows:
#
# find_package(cityhash)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# CITYHASH_ROOT_DIR Set this variable to the root installation of
# cityhash if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# CITYHASH_FOUND System has cityhash libs/headers
# CITYHASH_LIBRARIES The cityhash library/libraries
# CITYHASH_INCLUDE_DIR The location of cityhash headers
find_path(CITYHASH_ROOT_DIR
NAMES include/city.h
)
find_library(CITYHASH_LIBRARIES
NAMES cityhash
PATHS ${CITYHASH_ROOT_DIR}/lib ${CITYHASH_LIBRARIES_PATHS}
)
find_path(CITYHASH_INCLUDE_DIR
NAMES city.h
PATHS ${CITYHASH_ROOT_DIR}/include ${CITYHASH_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(cityhash DEFAULT_MSG
CITYHASH_LIBRARIES
CITYHASH_INCLUDE_DIR
)
mark_as_advanced(
CITYHASH_ROOT_DIR
CITYHASH_LIBRARIES
CITYHASH_INCLUDE_DIR
)
# - Try to find double-conversion headers and libraries.
#
# Usage of this module as follows:
#
# find_package(double-conversion)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# DOUBLE_CONVERSION_ROOT_DIR Set this variable to the root installation of
# double-conversion if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# DOUBLE_CONVERSION_FOUND System has double-conversion libs/headers
# DOUBLE_CONVERSION_LIBRARIES The double-conversion library/libraries
# DOUBLE_CONVERSION_INCLUDE_DIR The location of double-conversion headers
find_path(DOUBLE_CONVERSION_ROOT_DIR
NAMES include/double-conversion.h
)
find_library(DOUBLE_CONVERSION_LIBRARIES
NAMES double-conversion
PATHS ${DOUBLE_CONVERSION_ROOT_DIR}/lib ${BTRIE_CITYHASH_PATHS}
)
find_path(DOUBLE_CONVERSION_INCLUDE_DIR
NAMES double-conversion.h
PATHS ${DOUBLE_CONVERSION_ROOT_DIR}/include ${DOUBLE_CONVERSION_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(double_conversion DEFAULT_MSG
DOUBLE_CONVERSION_LIBRARIES
DOUBLE_CONVERSION_INCLUDE_DIR
)
mark_as_advanced(
DOUBLE_CONVERSION_ROOT_DIR
DOUBLE_CONVERSION_LIBRARIES
DOUBLE_CONVERSION_INCLUDE_DIR
)
# - Try to find farmhash headers and libraries.
#
# Usage of this module as follows:
#
# find_package(farmhash)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# FARMHASH_ROOT_DIR Set this variable to the root installation of
# farmhash if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# FARMHASH_FOUND System has farmhash libs/headers
# FARMHASH_LIBRARIES The farmhash library/libraries
# FARMHASH_INCLUDE_DIR The location of farmhash headers
find_path(FARMHASH_ROOT_DIR
NAMES include/farmhash.h
)
find_library(FARMHASH_LIBRARIES
NAMES farmhash
PATHS ${FARMHASH_ROOT_DIR}/lib ${FARMHASH_LIBRARIES_PATHS}
)
find_path(FARMHASH_INCLUDE_DIR
NAMES farmhash.h
PATHS ${FARMHASH_ROOT_DIR}/include ${FARMHASH_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(farmhash DEFAULT_MSG
FARMHASH_LIBRARIES
FARMHASH_INCLUDE_DIR
)
mark_as_advanced(
FARMHASH_ROOT_DIR
FARMHASH_LIBRARIES
FARMHASH_INCLUDE_DIR
)
# - Try to find metrohash headers and libraries.
#
# Usage of this module as follows:
#
# find_package(metrohash)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# METROHASH_ROOT_DIR Set this variable to the root installation of
# metrohash if the module has problems finding
# the proper installation path.
#
# Variables defined by this module:
#
# METROHASH_FOUND System has metrohash libs/headers
# METROHASH_LIBRARIES The metrohash library/libraries
# METROHASH_INCLUDE_DIR The location of metrohash headers
find_path(METROHASH_ROOT_DIR
NAMES include/metrohash.h
)
find_library(METROHASH_LIBRARIES
NAMES metrohash
PATHS ${METROHASH_ROOT_DIR}/lib ${METROHASH_LIBRARIES_PATHS}
)
find_path(METROHASH_INCLUDE_DIR
NAMES metrohash.h
PATHS ${METROHASH_ROOT_DIR}/include ${METROHASH_INCLUDE_PATHS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(metrohash DEFAULT_MSG
METROHASH_LIBRARIES
METROHASH_INCLUDE_DIR
)
mark_as_advanced(
METROHASH_ROOT_DIR
METROHASH_LIBRARIES
METROHASH_INCLUDE_DIR
)
macro(find_contrib_lib LIB_NAME)
string(TOLOWER ${LIB_NAME} LIB_NAME_LC)
string(TOUPPER ${LIB_NAME} LIB_NAME_UC)
string(REPLACE "-" "_" LIB_NAME_UC ${LIB_NAME_UC})
option (USE_INTERNAL_${LIB_NAME_UC}_LIBRARY "Use bundled library ${LIB_NAME} instead of system" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_${LIB_NAME_UC}_LIBRARY)
find_package ("${LIB_NAME}")
endif ()
if (NOT ${LIB_NAME_UC}_FOUND)
set (USE_INTERNAL_${LIB_NAME_UC}_LIBRARY 1)
set (${LIB_NAME_UC}_LIBRARIES ${LIB_NAME_LC})
set (${LIB_NAME_UC}_INCLUDE_DIR ${${LIB_NAME_UC}_CONTRIB_INCLUDE_DIR})
endif ()
message (STATUS "Using ${LIB_NAME}: ${${LIB_NAME_UC}_INCLUDE_DIR} : ${${LIB_NAME_UC}_LIBRARIES}")
endmacro()
option (USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY "Set to FALSE to use system double-conversion library instead of bundled" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY)
find_library (DOUBLE_CONVERSION_LIBRARY double-conversion)
find_path (DOUBLE_CONVERSION_INCLUDE_DIR NAMES double-conversion/double-conversion.h PATHS ${DOUBLE_CONVERSION_INCLUDE_PATHS})
endif ()
if (DOUBLE_CONVERSION_LIBRARY AND DOUBLE_CONVERSION_INCLUDE_DIR)
include_directories (${DOUBLE_CONVERSION_INCLUDE_DIR})
else ()
set (USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY 1)
set (DOUBLE_CONVERSION_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libdouble-conversion")
include_directories (BEFORE ${DOUBLE_CONVERSION_INCLUDE_DIR})
set (DOUBLE_CONVERSION_LIBRARY double-conversion)
endif ()
message (STATUS "Using double-conversion: ${DOUBLE_CONVERSION_INCLUDE_DIR} : ${DOUBLE_CONVERSION_LIBRARY}")
......@@ -9,8 +9,6 @@ if (LZ4_LIBRARY AND LZ4_INCLUDE_DIR)
include_directories (${LZ4_INCLUDE_DIR})
else ()
set (USE_INTERNAL_LZ4_LIBRARY 1)
set (LZ4_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/liblz4/include/lz4")
include_directories (BEFORE ${LZ4_INCLUDE_DIR})
set (LZ4_LIBRARY lz4)
endif ()
......
......@@ -10,6 +10,7 @@ else ()
set (USE_INTERNAL_POCO_LIBRARY 1)
include (${ClickHouse_SOURCE_DIR}/cmake/find_ltdl.cmake)
include (${ClickHouse_SOURCE_DIR}/contrib/libpoco/cmake/FindODBC.cmake)
list (APPEND Poco_INCLUDE_DIRS
......@@ -29,6 +30,7 @@ else ()
if (ODBC_FOUND)
set (Poco_DataODBC_FOUND 1)
set (Poco_DataODBC_LIBRARY PocoDataODBC)
list (APPEND Poco_DataODBC_LIBRARY ${LTDL_LIB})
list (APPEND Poco_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/libpoco/Data/ODBC/include/")
endif ()
......
......@@ -10,10 +10,6 @@ if (RE2_LIBRARY AND RE2_INCLUDE_DIR)
set (RE2_ST_LIBRARY ${RE2_LIBRARY})
else ()
set (USE_INTERNAL_RE2_LIBRARY 1)
set (RE2_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libre2")
set (RE2_ST_INCLUDE_DIR "${ClickHouse_BINARY_DIR}/contrib/libre2")
include_directories (BEFORE ${RE2_INCLUDE_DIR})
include_directories (BEFORE ${RE2_ST_INCLUDE_DIR})
set (RE2_LIBRARY re2)
set (RE2_ST_LIBRARY re2_st)
set (USE_RE2_ST 1)
......
......@@ -9,7 +9,6 @@ if (SPARCEHASH_INCLUDE_DIR)
else ()
set (USE_INTERNAL_SPARCEHASH_LIBRARY 1)
set (SPARCEHASH_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libsparsehash")
include_directories (BEFORE ${SPARCEHASH_INCLUDE_DIR})
endif ()
message (STATUS "Using sparsehash: ${SPARCEHASH_INCLUDE_DIR}")
......@@ -2,15 +2,11 @@ option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library inste
if (NOT USE_INTERNAL_ZLIB_LIBRARY)
find_package (ZLIB)
if (ZLIB_FOUND)
include_directories (${ZLIB_INCLUDE_DIRS})
endif ()
endif ()
if (NOT ZLIB_FOUND)
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libzlib-ng")
include_directories (BEFORE ${ZLIB_INCLUDE_DIR})
if (USE_STATIC_LIBRARIES)
set (ZLIB_LIBRARIES zlibstatic)
else ()
......
......@@ -9,8 +9,6 @@ if (ZOOKEEPER_LIBRARY AND ZOOKEEPER_INCLUDE_DIR)
include_directories (${ZOOKEEPER_INCLUDE_DIR})
else ()
set (USE_INTERNAL_ZOOKEEPER_LIBRARY 1)
set (ZOOKEEPER_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libzookeeper/include")
include_directories (BEFORE ${ZOOKEEPER_INCLUDE_DIR})
set (ZOOKEEPER_LIBRARY zookeeper_mt)
endif ()
......
......@@ -9,8 +9,6 @@ if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
include_directories (${ZSTD_INCLUDE_DIR})
else ()
set (USE_INTERNAL_ZSTD_LIBRARY 1)
set (ZSTD_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libzstd/include/zstd")
include_directories (BEFORE ${ZSTD_INCLUDE_DIR})
set (ZSTD_LIBRARY zstd)
endif ()
......
set(CITYHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(CPUID_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcpuid/include)
set(DIVIDE_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libdivide)
set(BTRIE_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libbtrie/include)
set(CITYHASH_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(MYSQLXX_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libmysqlxx/include)
set(POCOEXT_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libpocoext/include)
set(CITYHASH_CONTRIB_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(COMMON_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libcommon/include ${ClickHouse_BINARY_DIR}/libs/libcommon/include)
set(DBMS_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/dbms/src ${ClickHouse_BINARY_DIR}/dbms/src)
set(DOUBLE_CONVERSION_CONTRIB_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libdouble-conversion)
get_property (dirs TARGET dbms PROPERTY INCLUDE_DIRECTORIES)
# TODO? Maybe recursive collect on all deps
get_property (dirs1 TARGET dbms PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
get_property (dirs1 TARGET common PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
list(REMOVE_DUPLICATES dirs)
file (WRITE ${CMAKE_CURRENT_BINARY_DIR}/include_directories.txt "")
foreach (dir ${dirs})
......
......@@ -57,7 +57,7 @@ check_cxx_source_compiles("
}
" HAVE_POPCNT)
if (HAVE_POPCNT AND NOT AARCH64)
if (HAVE_POPCNT AND NOT ARCH_AARCH64)
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${TEST_FLAG}")
endif ()
......
......@@ -28,10 +28,21 @@ if (USE_INTERNAL_ZOOKEEPER_LIBRARY)
add_subdirectory (libzookeeper)
endif ()
add_subdirectory (libcityhash)
add_subdirectory (libfarmhash)
add_subdirectory (libmetrohash)
add_subdirectory (libbtrie)
if (USE_INTERNAL_CITYHASH_LIBRARY)
add_subdirectory (libcityhash)
endif ()
if (USE_INTERNAL_FARMHASH_LIBRARY)
add_subdirectory (libfarmhash)
endif ()
if (USE_INTERNAL_METROHASH_LIBRARY)
add_subdirectory (libmetrohash)
endif ()
if (USE_INTERNAL_BTRIE_LIBRARY)
add_subdirectory (libbtrie)
endif ()
if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind)
......@@ -49,6 +60,6 @@ if (ENABLE_LIBTCMALLOC AND USE_INTERNAL_GPERFTOOLS_LIBRARY)
add_subdirectory (libtcmalloc)
endif ()
if (NOT ARM)
if (NOT ARCH_ARM)
add_subdirectory (libcpuid)
endif ()
......@@ -16,3 +16,5 @@ include/cpuid/rdtsc.h
include/cpuid/recog_amd.h
include/cpuid/recog_intel.h
)
target_include_directories (cpuid PUBLIC include)
......@@ -18,3 +18,5 @@ double-conversion/strtod.cc
double-conversion/strtod.h
double-conversion/utils.h
)
target_include_directories (double-conversion PUBLIC .)
......@@ -6,4 +6,4 @@ add_library (lz4
include/lz4/lz4hc.h
include/lz4/lz4opt.h)
target_include_directories(lz4 PUBLIC include)
target_include_directories(lz4 PUBLIC include/lz4)
......@@ -41,6 +41,9 @@ add_library (re2_st ${re2_sources})
target_compile_definitions (re2 PRIVATE NDEBUG)
target_compile_definitions (re2_st PRIVATE NDEBUG NO_THREADS re2=re2_st)
target_include_directories (re2 PUBLIC .)
target_include_directories (re2_st PRIVATE . PUBLIC ${CMAKE_CURRENT_BINARY_DIR})
file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/re2_st)
foreach (FILENAME filtered_re2.h re2.h set.h stringpiece.h variadic_function.h)
add_custom_command (OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/re2_st/${FILENAME}"
......
......@@ -92,3 +92,5 @@ IF (ZSTD_LEGACY_SUPPORT)
ENDIF (ZSTD_LEGACY_SUPPORT)
ADD_LIBRARY(zstd ${Sources} ${Headers})
target_include_directories (zstd PUBLIC include/zstd)
......@@ -28,6 +28,7 @@ add_subdirectory (src)
add_library(string_utils
src/Common/StringUtils.h
src/Common/StringUtils.cpp)
target_include_directories (string_utils PRIVATE ${DBMS_INCLUDE_DIR})
set(dbms_headers)
set(dbms_sources)
......@@ -150,7 +151,7 @@ if (NOT CMAKE_BUILD_TYPE STREQUAL "Debug")
PROPERTIES COMPILE_FLAGS -g0)
endif ()
if (NOT ARM)
if (NOT ARCH_ARM)
set (LINK_LIBRARIES_ONLY_ON_X86_64 cpuid)
endif()
......@@ -163,12 +164,13 @@ endif()
target_link_libraries (dbms
common
${MYSQLXX_LIBRARY}
cityhash farmhash metrohash
${FARMHASH_LIBRARIES}
${METROHASH_LIBRARIES}
${LZ4_LIBRARY}
${ZSTD_LIBRARY}
${ZOOKEEPER_LIBRARY}
string_utils
${DOUBLE_CONVERSION_LIBRARY}
${DOUBLE_CONVERSION_LIBRARIES}
${ZLIB_LIBRARIES}
${LINK_LIBRARIES_ONLY_ON_X86_64}
${RE2_LIBRARY}
......@@ -176,7 +178,7 @@ target_link_libraries (dbms
${OPENSSL_CRYPTO_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Poco_Data_LIBRARY}
btrie
${BTRIE_LIBRARIES}
)
if (Poco_DataODBC_FOUND)
......@@ -200,19 +202,18 @@ endif ()
target_link_libraries (dbms
${PLATFORM_LIBS}
${CMAKE_DL_LIBS}
${LTDL_LIB}
${CMAKE_THREAD_LIBS_INIT}
)
target_include_directories (dbms BEFORE PRIVATE ${CPUID_INCLUDE_DIR})
target_include_directories (dbms BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${BTRIE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${CITYHASH_INCLUDE_DIR})
target_include_directories (dbms PUBLIC ${MYSQLXX_INCLUDE_DIR})
target_include_directories (dbms PRIVATE ${POCOEXT_INCLUDE_DIR})
target_include_directories (dbms PRIVATE ${COMMON_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
# only for copy_headers.sh:
target_include_directories (dbms PRIVATE ${COMMON_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${DOUBLE_CONVERSION_INCLUDE_DIR})
if (ENABLE_TESTS)
add_subdirectory (tests)
# attach all dbms gtest sources
......
......@@ -28,5 +28,4 @@ list(REMOVE_ITEM clickhouse_aggregate_functions_headers
add_library(clickhouse_aggregate_functions ${clickhouse_aggregate_functions_sources})
target_link_libraries(clickhouse_aggregate_functions dbms)
target_include_directories (clickhouse_aggregate_functions BEFORE PUBLIC ${CITYHASH_INCLUDE_DIR})
target_include_directories (clickhouse_aggregate_functions PRIVATE ${COMMON_INCLUDE_DIR})
......@@ -44,18 +44,18 @@
  */
/// The maximum degree of buffer size before the values are discarded
#define UNIQUES_HASH_MAX_SIZE_DEGREE 17
#define UNIQUES_HASH_MAX_SIZE_DEGREE 17
/// The maximum number of elements before the values are discarded
#define UNIQUES_HASH_MAX_SIZE (1 << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1))
#define UNIQUES_HASH_MAX_SIZE (1ULL << (UNIQUES_HASH_MAX_SIZE_DEGREE - 1))
/** The number of least significant bits used for thinning. The remaining high-order bits are used to determine the position in the hash table.
  * (high-order bits are taken because the younger bits will be constant after dropping some of the values)
  */
#define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE)
#define UNIQUES_HASH_BITS_FOR_SKIP (32 - UNIQUES_HASH_MAX_SIZE_DEGREE)
/// Initial buffer size degree
#define UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE 4
#define UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE 4
/** This hash function is not the most optimal, but UniquesHashSet states counted with it,
......@@ -71,15 +71,15 @@ struct UniquesHashSetDefaultHash
template <typename Hash = UniquesHashSetDefaultHash>
class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1 << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>
class UniquesHashSet : private HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>
{
private:
using Value_t = UInt64;
using HashValue_t = UInt32;
using Allocator = HashTableAllocatorWithStackMemory<(1 << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>;
using Allocator = HashTableAllocatorWithStackMemory<(1ULL << UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE) * sizeof(UInt32)>;
UInt32 m_size; /// Number of elements
UInt8 size_degree; /// The size of the table as a power of 2
UInt32 m_size; /// Number of elements
UInt8 size_degree; /// The size of the table as a power of 2
UInt8 skip_degree; /// Skip elements not divisible by 2 ^ skip_degree
bool has_zero; /// The hash table contains an element with a hash value of 0.
......@@ -92,7 +92,7 @@ private:
void alloc(UInt8 new_size_degree)
{
buf = reinterpret_cast<HashValue_t *>(Allocator::alloc((1 << new_size_degree) * sizeof(buf[0])));
buf = reinterpret_cast<HashValue_t *>(Allocator::alloc((1ULL << new_size_degree) * sizeof(buf[0])));
size_degree = new_size_degree;
}
......@@ -105,10 +105,10 @@ private:
}
}
inline size_t buf_size() const { return 1 << size_degree; }
inline size_t max_fill() const { return 1 << (size_degree - 1); }
inline size_t mask() const { return buf_size() - 1; }
inline size_t place(HashValue_t x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
inline size_t buf_size() const { return 1ULL << size_degree; }
inline size_t max_fill() const { return 1ULL << (size_degree - 1); }
inline size_t mask() const { return buf_size() - 1; }
inline size_t place(HashValue_t x) const { return (x >> UNIQUES_HASH_BITS_FOR_SKIP) & mask(); }
/// The value is divided by 2 ^ skip_degree
inline bool good(HashValue_t hash) const
......@@ -157,7 +157,7 @@ private:
new_size_degree = size_degree + 1;
/// Expand the space.
buf = reinterpret_cast<HashValue_t *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1 << new_size_degree) * sizeof(buf[0])));
buf = reinterpret_cast<HashValue_t *>(Allocator::realloc(buf, old_size * sizeof(buf[0]), (1ULL << new_size_degree) * sizeof(buf[0])));
size_degree = new_size_degree;
/** Now some items may need to be moved to a new location.
......@@ -327,12 +327,12 @@ public:
if (0 == skip_degree)
return m_size;
size_t res = m_size * (1 << skip_degree);
size_t res = m_size * (1ULL << skip_degree);
/** Pseudo-random remainder - in order to be not visible,
* that the number is divided by the power of two.
*/
res += (intHashCRC32(m_size) & ((1 << skip_degree) - 1));
res += (intHashCRC32(m_size) & ((1ULL << skip_degree) - 1));
/** Correction of a systematic error due to collisions during hashing in UInt32.
* `fixed_res(res)` formula
......@@ -435,7 +435,7 @@ public:
if (rhs_size > UNIQUES_HASH_MAX_SIZE)
throw Poco::Exception("Cannot read UniquesHashSet: too large size_degree.");
if ((1U << size_degree) < rhs_size)
if ((1ULL << size_degree) < rhs_size)
{
UInt8 new_size_degree = std::max(UNIQUES_HASH_SET_INITIAL_SIZE_DEGREE, static_cast<int>(log2(rhs_size - 1)) + 2);
resize(new_size_degree);
......
......@@ -39,7 +39,7 @@ namespace ErrorCodes
*
* PS. This is also required, because tcmalloc can not allocate a chunk of memory greater than 16 GB.
*/
static constexpr size_t MMAP_THRESHOLD = 64 * (1 << 20);
static constexpr size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
static constexpr size_t MMAP_MIN_ALIGNMENT = 4096;
static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
......@@ -56,7 +56,7 @@ void * Allocator<clear_memory_>::alloc(size_t size, size_t alignment)
if (alignment > MMAP_MIN_ALIGNMENT)
throw DB::Exception("Too large alignment: more than page size.", DB::ErrorCodes::BAD_ARGUMENTS);
buf = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
buf = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mmap.", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
......
......@@ -70,7 +70,7 @@ public:
}
/// no block of corresponding size, allocate a new one
return pool.alloc(1 << (list_idx + 1));
return pool.alloc(1ULL << (list_idx + 1));
}
void free(char * ptr, const size_t size)
......
......@@ -155,9 +155,9 @@ struct HashTableGrower
UInt8 size_degree = initial_size_degree;
/// The size of the hash table in the cells.
size_t bufSize() const { return 1 << size_degree; }
size_t bufSize() const { return 1ULL << size_degree; }
size_t maxFill() const { return 1 << (size_degree - 1); }
size_t maxFill() const { return 1ULL << (size_degree - 1); }
size_t mask() const { return bufSize() - 1; }
/// From the hash value, get the cell number in the hash table.
......@@ -200,7 +200,7 @@ struct HashTableGrower
template <size_t key_bits>
struct HashTableFixedGrower
{
size_t bufSize() const { return 1 << key_bits; }
size_t bufSize() const { return 1ULL << key_bits; }
size_t place(size_t x) const { return x; }
/// You could write __builtin_unreachable(), but the compiler does not optimize everything, and it turns out less efficiently.
size_t next(size_t pos) const { return pos + 1; }
......
......@@ -4,7 +4,7 @@
/** Two-level hash table.
* Represents 256 (or 1 << BITS_FOR_BUCKET) small hash tables (buckets of the first level).
* Represents 256 (or 1ULL << BITS_FOR_BUCKET) small hash tables (buckets of the first level).
* To determine which one to use, one of the bytes of the hash function is taken.
*
* Usually works a little slower than a simple hash table.
......@@ -47,7 +47,7 @@ protected:
public:
using Impl = ImplTable;
static constexpr size_t NUM_BUCKETS = 1 << BITS_FOR_BUCKET;
static constexpr size_t NUM_BUCKETS = 1ULL << BITS_FOR_BUCKET;
static constexpr size_t MAX_BUCKET = NUM_BUCKETS - 1;
size_t hash(const Key & x) const { return Hash::operator()(x); }
......
......@@ -11,7 +11,7 @@ namespace zkutil
{
using ACLPtr = const ACL_vector *;
using Stat = Stat;
using Stat = ::Stat;
struct Op
{
......@@ -19,6 +19,8 @@ public:
Op() : data(new zoo_op_t) {}
virtual ~Op() {}
virtual std::unique_ptr<Op> clone() const = 0;
virtual std::string describe() = 0;
std::unique_ptr<zoo_op_t> data;
......@@ -31,21 +33,32 @@ public:
struct Op::Remove : public Op
{
Remove(const std::string & path_, int32_t version) :
path(path_)
Remove(const std::string & path_, int32_t version_) :
path(path_), version(version_)
{
zoo_delete_op_init(data.get(), path.c_str(), version);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Remove(path, version));
}
std::string describe() override { return "command: remove, path: " + path; }
private:
std::string path;
int32_t version;
};
struct Op::Create : public Op
{
Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags);
Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_);
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Create(path, value, acl, flags));
}
std::string getPathCreated()
{
......@@ -62,17 +75,24 @@ struct Op::Create : public Op
private:
std::string path;
std::string value;
ACLPtr acl;
int32_t flags;
std::vector<char> created_path;
};
struct Op::SetData : public Op
{
SetData(const std::string & path_, const std::string & value_, int32_t version) :
path(path_), value(value_)
SetData(const std::string & path_, const std::string & value_, int32_t version_) :
path(path_), value(value_), version(version_)
{
zoo_set_op_init(data.get(), path.c_str(), value.c_str(), value.size(), version, &stat);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new SetData(path, value, version));
}
std::string describe() override
{
return
......@@ -85,21 +105,28 @@ struct Op::SetData : public Op
private:
std::string path;
std::string value;
int32_t version;
Stat stat;
};
struct Op::Check : public Op
{
Check(const std::string & path_, int32_t version) :
path(path_)
Check(const std::string & path_, int32_t version_) :
path(path_), version(version_)
{
zoo_check_op_init(data.get(), path.c_str(), version);
}
std::unique_ptr<Op> clone() const override
{
return std::unique_ptr<zkutil::Op>(new Check(path, version));
}
std::string describe() override { return "command: check, path: " + path; }
private:
std::string path;
int32_t version;
};
struct OpResult : public zoo_op_result_t
......
......@@ -555,7 +555,7 @@ int32_t ZooKeeper::multiImpl(const Ops & ops_, OpResultsPtr * out_results_)
for (const auto & op : ops_)
ops.push_back(*(op->data));
int32_t code = zoo_multi(impl, ops.size(), ops.data(), out_results->data());
int32_t code = zoo_multi(impl, static_cast<int>(ops.size()), ops.data(), out_results->data());
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
......@@ -612,15 +612,13 @@ int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_resul
return code;
}
static const int BATCH_SIZE = 100;
void ZooKeeper::removeChildrenRecursive(const std::string & path)
{
Strings children = getChildren(path);
while (!children.empty())
{
zkutil::Ops ops;
for (size_t i = 0; i < BATCH_SIZE && !children.empty(); ++i)
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
removeChildrenRecursive(path + "/" + children.back());
ops.emplace_back(std::make_unique<Op::Remove>(path + "/" + children.back(), -1));
......@@ -639,7 +637,7 @@ void ZooKeeper::tryRemoveChildrenRecursive(const std::string & path)
{
zkutil::Ops ops;
Strings batch;
for (size_t i = 0; i < BATCH_SIZE && !children.empty(); ++i)
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
batch.push_back(path + "/" + children.back());
children.pop_back();
......@@ -712,8 +710,8 @@ ZooKeeperPtr ZooKeeper::startNewSession() const
return std::make_shared<ZooKeeper>(hosts, session_timeout_ms);
}
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl, int32_t flags)
: path(path_), value(value_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
Op::Create::Create(const std::string & path_, const std::string & value_, ACLPtr acl_, int32_t flags_)
: path(path_), value(value_), acl(acl_), flags(flags_), created_path(path.size() + ZooKeeper::SEQUENTIAL_SUFFIX_SIZE)
{
zoo_create_op_init(data.get(), path.c_str(), value.c_str(), value.size(), acl, flags, created_path.data(), created_path.size());
}
......@@ -904,4 +902,72 @@ ZooKeeper::RemoveFuture ZooKeeper::asyncRemove(const std::string & path)
return future;
}
ZooKeeper::MultiFuture ZooKeeper::asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception)
{
size_t count = ops_.size();
OpResultsPtr results(new OpResults(count));
/// We need to hold all references to ops data until the end of multi callback
struct OpsHolder
{
std::shared_ptr<zkutil::Ops> ops_ptr = std::make_shared<zkutil::Ops>();
std::shared_ptr<std::vector<zoo_op_t>> ops_raw_ptr = std::make_shared<std::vector<zoo_op_t>>();
} holder;
for (const auto & op : ops_)
{
holder.ops_ptr->emplace_back(op->clone());
holder.ops_raw_ptr->push_back(*holder.ops_ptr->back()->data);
}
MultiFuture future{ [throw_exception, results, holder] (int rc) {
OpResultsAndCode res;
res.code = rc;
res.results = results;
res.ops_ptr = holder.ops_ptr;
if (throw_exception && rc != ZOK)
throw zkutil::KeeperException(rc);
return res;
}};
if (ops_.empty())
{
(**future.task)(ZOK);
return future;
}
/// Workaround of the libzookeeper bug.
/// TODO: check if the bug is fixed in the latest version of libzookeeper.
if (expired())
throw KeeperException(ZINVALIDSTATE);
auto & ops = *holder.ops_raw_ptr;
int32_t code = zoo_amulti(impl, static_cast<int>(ops.size()), ops.data(), results->data(),
[] (int rc, const void * data)
{
MultiFuture::TaskPtr owned_task =
std::move(const_cast<MultiFuture::TaskPtr &>(*static_cast<const MultiFuture::TaskPtr *>(data)));
(*owned_task)(rc);
}, future.task.get());
ProfileEvents::increment(ProfileEvents::ZooKeeperMulti);
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
if (code != ZOK)
throw KeeperException(code);
return future;
}
ZooKeeper::MultiFuture ZooKeeper::tryAsyncMulti(const zkutil::Ops & ops)
{
return asyncMultiImpl(ops, false);
}
ZooKeeper::MultiFuture ZooKeeper::asyncMulti(const zkutil::Ops & ops)
{
return asyncMultiImpl(ops, true);
}
}
......@@ -31,6 +31,9 @@ const UInt32 DEFAULT_SESSION_TIMEOUT = 30000;
const UInt32 MEDIUM_SESSION_TIMEOUT = 120000;
const UInt32 BIG_SESSION_TIMEOUT = 600000;
/// Preferred size of multi() command (in number of ops)
constexpr size_t MULTI_BATCH_SIZE = 100;
struct WatchContext;
......@@ -46,7 +49,7 @@ struct WatchContext;
/// Modifying methods do not retry, because it leads to problems of the double-delete type.
///
/// Methods with names not starting at try- raise KeeperException on any error.
class ZooKeeper
class ZooKeeper
{
public:
using Ptr = std::shared_ptr<ZooKeeper>;
......@@ -241,7 +244,7 @@ public:
/// The caller is responsible for ensuring that the context lives until the callback
/// is finished and we can't simply pass ownership of the context into function object.
/// Instead, we save the context in a Future object and return it to the caller.
/// The cantext will live until the Future lives.
/// The context will live until the Future lives.
/// Context data is wrapped in an unique_ptr so that its address (which is passed to
/// libzookeeper) remains unchanged after the Future is returned from the function.
///
......@@ -320,6 +323,19 @@ public:
RemoveFuture asyncRemove(const std::string & path);
struct OpResultsAndCode
{
OpResultsPtr results;
std::shared_ptr<Ops> ops_ptr;
int code;
};
using MultiFuture = Future<OpResultsAndCode, int>;
MultiFuture asyncMulti(const Ops & ops);
/// Like the previous one but don't throw any exceptions on future.get()
MultiFuture tryAsyncMulti(const Ops & ops);
static std::string error2string(int32_t code);
/// Max size of node contents in bytes.
......@@ -378,6 +394,8 @@ private:
int32_t multiImpl(const Ops & ops, OpResultsPtr * out_results = nullptr);
int32_t existsImpl(const std::string & path, Stat * stat_, WatchCallback watch_callback);
MultiFuture asyncMultiImpl(const zkutil::Ops & ops_, bool throw_exception);
std::string hosts;
int32_t session_timeout_ms;
......
......@@ -17,4 +17,4 @@ add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect dbms)
add_executable (zkutil_test_multi_exception zkutil_test_multi_exception.cpp)
target_link_libraries (zkutil_test_multi_exception dbms)
target_link_libraries (zkutil_test_multi_exception dbms gtest_main)
#include <iostream>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/Exception.h>
#include <iostream>
#include <chrono>
#include <gtest/gtest.h>
using namespace DB;
int main()
TEST(zkutil, multi_nice_exception_msg)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
try
{
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
ASSERT_NO_THROW(
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
);
try
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/c", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi/c", -1));
......@@ -27,6 +31,7 @@ int main()
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
FAIL();
}
catch (...)
{
......@@ -34,16 +39,73 @@ int main()
String msg = getCurrentExceptionMessage(false);
if (msg.find("/clickhouse_test_zkutil_multi/a") == std::string::npos || msg.find("#2") == std::string::npos)
bool msg_has_reqired_patterns = msg.find("/clickhouse_test_zkutil_multi/a") != std::string::npos && msg.find("#2") != std::string::npos;
EXPECT_TRUE(msg_has_reqired_patterns) << msg;
}
}
TEST(zkutil, multi_async)
{
auto zookeeper = std::make_unique<zkutil::ZooKeeper>("localhost:2181");
auto acl = zookeeper->getDefaultACL();
zkutil::Ops ops;
zookeeper->tryRemoveRecursive("/clickhouse_test_zkutil_multi");
{
ops.clear();
auto fut = zookeeper->asyncMulti(ops);
}
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "", acl, zkutil::CreateMode::Persistent));
auto fut = zookeeper->tryAsyncMulti(ops);
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.code == ZOK);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
EXPECT_ANY_THROW
(
std::vector<zkutil::ZooKeeper::MultiFuture> futures;
for (size_t i = 0; i < 10000; ++i)
{
std::cerr << "Wrong: " << msg;
return -1;
ops.clear();
ops.emplace_back(new zkutil::Op::Remove("/clickhouse_test_zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Check("/clickhouse_test_zkutil_multi", -1));
ops.emplace_back(new zkutil::Op::SetData("/clickhouse_test_zkutil_multi", "xxx", 42));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
futures.emplace_back(zookeeper->asyncMulti(ops));
}
std::cout << "Ok: " << msg;
return 0;
}
futures[0].get();
);
std::cerr << "Unexpected";
return -1;
/// Check there are no segfaults for remaining 999 futures
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
{
ops.clear();
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi", "_", acl, zkutil::CreateMode::Persistent));
ops.emplace_back(new zkutil::Op::Create("/clickhouse_test_zkutil_multi/a", "_", acl, zkutil::CreateMode::Persistent));
auto fut = zookeeper->tryAsyncMulti(ops);
ops.clear();
auto res = fut.get();
ASSERT_TRUE(res.code == ZNODEEXISTS);
ASSERT_EQ(res.results->size(), 2);
ASSERT_EQ(res.ops_ptr->size(), 2);
}
}
......@@ -5,7 +5,7 @@
namespace DB
{
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
std::vector<std::string> getMultipleKeysFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
{
std::vector<std::string> values;
Poco::Util::AbstractConfiguration::Keys config_keys;
......@@ -20,7 +20,7 @@ std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfigura
}
std::vector<std::string> getMultipleValuesFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
std::vector<std::string> getMultipleValuesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name)
{
std::vector<std::string> values;
for (const auto & key : DB::getMultipleKeysFromConfig(config, root, name))
......
......@@ -12,7 +12,7 @@ namespace Util
namespace DB
{
/// get all internal key names for given key
std::vector<std::string> getMultipleKeysFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
std::vector<std::string> getMultipleKeysFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
/// Get all values for given key
std::vector<std::string> getMultipleValuesFromConfig(Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
std::vector<std::string> getMultipleValuesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & root, const std::string & name);
}
......@@ -35,6 +35,15 @@ std::ostream & operator<<(std::ostream & stream, const DB::IColumn & what);
#include <Client/Connection.h>
std::ostream & operator<<(std::ostream & stream, const DB::Connection::Packet & what);
#include <Common/PODArray.h>
template <typename T, size_t INITIAL_SIZE, typename TAllocator, size_t pad_right_>
std::ostream & operator<<(std::ostream & stream, const DB::PODArray<T, INITIAL_SIZE, TAllocator, pad_right_> & what)
{
stream << "PODArray(size = " << what.size() << ", capacity = " << what.capacity() << ")";
dumpContainer(stream, what);
return stream;
};
/// some operator<< should be declared before operator<<(... std::shared_ptr<>)
#include <common/iostream_debug_helpers.h>
......@@ -204,7 +204,7 @@ void report(const char * name, size_t n, double elapsed, UInt64 tsc_diff, size_t
std::cerr << name << std::endl
<< "Done in " << elapsed
<< " (" << n / elapsed << " elem/sec."
<< ", " << n * sizeof(UInt64) / elapsed / (1 << 30) << " GiB/sec."
<< ", " << n * sizeof(UInt64) / elapsed / (1ULL << 30) << " GiB/sec."
<< ", " << (tsc_diff * 1.0 / n) << " tick/elem)"
<< "; res = " << res
<< std::endl << std::endl;
......
......@@ -3,6 +3,7 @@ target_link_libraries (exception dbms)
add_executable (string_pool string_pool.cpp)
target_link_libraries (string_pool dbms)
target_include_directories (string_pool BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
add_executable (field field.cpp)
target_link_libraries (field dbms)
......
......@@ -137,6 +137,11 @@ void CacheDictionary::isInImpl(
{
out[out_idx] = 1;
}
/// Loop detected
else if (children[new_children_idx] == parents[parents_idx])
{
out[out_idx] = 1;
}
/// Found intermediate parent, add this value to search at next loop iteration
else
{
......
......@@ -66,7 +66,7 @@ add_library(clickhouse_functions ${clickhouse_functions_sources})
target_link_libraries(clickhouse_functions dbms)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${CITYHASH_INCLUDE_DIR})
target_include_directories (clickhouse_functions BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})
if (USE_VECTORCLASS)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${VECTORCLASS_INCLUDE_DIR})
......
......@@ -1401,7 +1401,7 @@ bool FunctionArrayUniq::executeNumber(const ColumnArray * array, const IColumn *
const typename ColumnVector<T>::Container_t & values = nested->getData();
using Set = ClearableHashSet<T, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
......@@ -1447,7 +1447,7 @@ bool FunctionArrayUniq::executeString(const ColumnArray * array, const IColumn *
const ColumnArray::Offsets_t & offsets = array->getOffsets();
using Set = ClearableHashSet<StringRef, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
......@@ -1514,7 +1514,7 @@ bool FunctionArrayUniq::execute128bit(
return false;
using Set = ClearableHashSet<UInt128, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
/// Suppose that, for a given row, each of the N columns has an array whose length is M.
/// Denote arr_i each of these arrays (1 <= i <= N). Then the following is performed:
......@@ -1575,7 +1575,7 @@ void FunctionArrayUniq::executeHashed(
size_t count = columns.size();
using Set = ClearableHashSet<UInt128, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
Set set;
size_t prev_off = 0;
......@@ -1727,7 +1727,7 @@ bool FunctionArrayEnumerateUniq::executeNumber(const ColumnArray * array, const
const typename ColumnVector<T>::Container_t & values = nested->getData();
using ValuesToIndices = ClearableHashMap<T, UInt32, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
......@@ -1772,7 +1772,7 @@ bool FunctionArrayEnumerateUniq::executeString(const ColumnArray * array, const
size_t prev_off = 0;
using ValuesToIndices = ClearableHashMap<StringRef, UInt32, StringRefHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(StringRef)>>;
const PaddedPODArray<UInt8> * null_map_data = nullptr;
if (null_map)
......@@ -1840,7 +1840,7 @@ bool FunctionArrayEnumerateUniq::execute128bit(
return false;
using ValuesToIndices = ClearableHashMap<UInt128, UInt32, UInt128HashCRC32, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
ValuesToIndices indices;
size_t prev_off = 0;
......@@ -1886,7 +1886,7 @@ void FunctionArrayEnumerateUniq::executeHashed(
size_t count = columns.size();
using ValuesToIndices = ClearableHashMap<UInt128, UInt32, UInt128TrivialHash, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1 << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(UInt128)>>;
ValuesToIndices indices;
size_t prev_off = 0;
......
......@@ -1431,9 +1431,16 @@ private:
if (0 == id)
continue;
auto & hierarchy = hierarchies[i];
/// Checking for loop
if (std::find(std::begin(hierarchy), std::end(hierarchy), id) != std::end(hierarchy))
continue;
all_zeroes = false;
/// place id at it's corresponding place
hierarchies[i].push_back(id);
hierarchy.push_back(id);
++total_count;
}
......
......@@ -4,7 +4,7 @@ add_executable (number_traits number_traits.cpp)
target_link_libraries (number_traits dbms)
add_executable (functions_arithmetic functions_arithmetic.cpp)
target_link_libraries (functions_arithmetic dbms)
target_link_libraries (functions_arithmetic dbms clickhouse_functions)
add_executable (logical_functions_performance logical_functions_performance.cpp)
target_link_libraries (logical_functions_performance dbms)
......@@ -331,7 +331,7 @@ int main(int argc, char ** argv)
{
try
{
size_t block_size = 1 << 20;
size_t block_size = 1ULL << 20;
if (argc > 1)
{
block_size = atoi(argv[1]);
......
......@@ -170,13 +170,10 @@ void readStringInto(Vector & s, ReadBuffer & buf)
{
while (!buf.eof())
{
size_t bytes = 0;
for (; buf.position() + bytes != buf.buffer().end(); ++bytes)
if (buf.position()[bytes] == '\t' || buf.position()[bytes] == '\n')
break;
const char * next_pos = find_first_symbols<'\t', '\n'>(buf.position(), buf.buffer().end());
appendToStringOrVector(s, buf.position(), buf.position() + bytes);
buf.position() += bytes;
appendToStringOrVector(s, buf.position(), next_pos);
buf.position() += next_pos - buf.position(); /// Code looks complicated, because "buf.position() = next_pos" doens't work due to const-ness.
if (buf.hasPendingData())
return;
......
......@@ -494,7 +494,7 @@ inline void readFloatText(T & x, ReadBuffer & buf)
readFloatTextImpl<T, void>(x, buf);
}
/// rough; all until '\n' or '\t'
/// simple: all until '\n' or '\t'
void readString(String & s, ReadBuffer & buf);
void readEscapedString(String & s, ReadBuffer & buf);
......
......@@ -28,6 +28,10 @@ set (INTERNAL_COMPILER_CUSTOM_ROOT ON CACHE BOOL "")
list(GET Poco_INCLUDE_DIRS 0 Poco_Foundation_INCLUDE_DIR)
list(GET Poco_INCLUDE_DIRS 1 Poco_Util_INCLUDE_DIR)
if (NOT DOUBLE_CONVERSION_INCLUDE_DIR)
get_target_property(DOUBLE_CONVERSION_INCLUDE_DIR ${DOUBLE_CONVERSION_LIBRARIES} INTERFACE_INCLUDE_DIRECTORIES)
endif ()
string (REPLACE ${ClickHouse_SOURCE_DIR} ${INTERNAL_COMPILER_HEADERS} INTERNAL_DOUBLE_CONVERSION_INCLUDE_DIR ${DOUBLE_CONVERSION_INCLUDE_DIR})
string (REPLACE ${ClickHouse_SOURCE_DIR} ${INTERNAL_COMPILER_HEADERS} INTERNAL_Boost_INCLUDE_DIRS ${Boost_INCLUDE_DIRS})
string (REPLACE ${ClickHouse_SOURCE_DIR} ${INTERNAL_COMPILER_HEADERS} INTERNAL_Poco_Foundation_INCLUDE_DIR ${Poco_Foundation_INCLUDE_DIR})
......
......@@ -4,6 +4,7 @@
#include <Dictionaries/IDictionarySource.h>
#include <Common/StringUtils.h>
#include <Common/MemoryTracker.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <ext/scope_guard.h>
#include <Poco/Util/Application.h>
#include <Poco/Glob.h>
......@@ -64,17 +65,17 @@ ExternalDictionaries::~ExternalDictionaries()
reloading_thread.join();
}
namespace
{
std::set<std::string> getDictionariesConfigPaths(const Poco::Util::AbstractConfiguration & config)
std::set<std::string> getDictionariesConfigPaths(const Poco::Util::AbstractConfiguration & config)
{
std::set<std::string> files;
auto patterns = getMultipleValuesFromConfig(config, "", "dictionaries_config");
for (auto & pattern : patterns)
{
auto pattern = config.getString("dictionaries_config", "");
if (pattern.empty())
return {};
continue;
std::set<std::string> files;
if (pattern[0] != '/')
{
const auto app_config_path = config.getString("config-file", "config.xml");
......@@ -82,13 +83,14 @@ namespace
const auto absolute_path = config_dir + pattern;
Poco::Glob::glob(absolute_path, files, 0);
if (!files.empty())
return files;
continue;
}
Poco::Glob::glob(pattern, files, 0);
return files;
}
return files;
}
}
void ExternalDictionaries::reloadImpl(const bool throw_on_error)
......
......@@ -11,15 +11,18 @@ add_executable (aggregate aggregate.cpp)
target_link_libraries (aggregate dbms)
add_executable (hash_map hash_map.cpp)
target_include_directories (hash_map BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (hash_map dbms)
add_executable (hash_map2 hash_map2.cpp)
target_include_directories (hash_map2 BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (hash_map2 dbms)
add_executable (hash_map3 hash_map3.cpp)
target_link_libraries (hash_map3 dbms)
add_executable (hash_map_string hash_map_string.cpp)
target_include_directories (hash_map_string BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (hash_map_string dbms)
add_executable (hash_map_string_2 hash_map_string_2.cpp)
......@@ -31,9 +34,11 @@ target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE
target_include_directories (hash_map_string_3 BEFORE PRIVATE ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)
add_executable (hash_map_string_small hash_map_string_small.cpp)
target_include_directories (hash_map_string_small BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (hash_map_string_small dbms)
add_executable (two_level_hash_map two_level_hash_map.cpp)
target_include_directories (two_level_hash_map BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (two_level_hash_map dbms)
add_executable (compiler_test compiler_test.cpp)
......
......@@ -46,15 +46,15 @@ struct Grower : public HashTableGrower<>
static const size_t initial_size_degree = 16;
Grower() { size_degree = initial_size_degree; }
// size_t max_fill = (1 << initial_size_degree) * 0.9;
// size_t max_fill = (1ULL << initial_size_degree) * 0.9;
/// The size of the hash table in the cells.
size_t bufSize() const { return 1 << size_degree; }
size_t bufSize() const { return 1ULL << size_degree; }
size_t maxFill() const { return 1 << (size_degree - 1); }
// size_t maxFill() const { return max_fill; }
size_t maxFill() const { return 1ULL << (size_degree - 1); }
// size_t maxFill() const { return max_fill; }
size_t mask() const { return bufSize() - 1; }
size_t mask() const { return bufSize() - 1; }
/// From the hash value, get the cell number in the hash table.
size_t place(size_t x) const { return x & mask(); }
......@@ -69,7 +69,7 @@ struct Grower : public HashTableGrower<>
void increaseSize()
{
size_degree += size_degree >= 23 ? 1 : 2;
// max_fill = (1 << size_degree) * 0.9;
// max_fill = (1ULL << size_degree) * 0.9;
}
/// Set the buffer size by the number of elements in the hash table. Used when deserializing a hash table.
......
......@@ -249,13 +249,13 @@ struct Grower : public HashTableGrower<>
static const size_t initial_size_degree = 16;
Grower() { size_degree = initial_size_degree; }
size_t max_fill = (1 << initial_size_degree) * 0.9;
size_t max_fill = (1ULL << initial_size_degree) * 0.9;
/// The size of the hash table in the cells.
size_t bufSize() const { return 1 << size_degree; }
size_t bufSize() const { return 1ULL << size_degree; }
size_t maxFill() const { return max_fill /*1 << (size_degree - 1)*/; }
size_t mask() const { return bufSize() - 1; }
size_t maxFill() const { return max_fill /*1 << (size_degree - 1)*/; }
size_t mask() const { return bufSize() - 1; }
/// From the hash value, get the cell number in the hash table.
size_t place(size_t x) const { return x & mask(); }
......@@ -270,7 +270,7 @@ struct Grower : public HashTableGrower<>
void increaseSize()
{
size_degree += size_degree >= 23 ? 1 : 2;
max_fill = (1 << size_degree) * 0.9;
max_fill = (1ULL << size_degree) * 0.9;
}
/// Set the buffer size by the number of elements in the hash table. Used when deserializing a hash table.
......
......@@ -3,14 +3,19 @@
# each of them is built and linked as a separate library, defined below.
add_library(clickhouse-server
Server.cpp
ConfigReloader.cpp
HTTPHandler.cpp
TCPHandler.cpp
InterserverIOHTTPHandler.cpp
MetricsTransmitter.cpp
ConfigReloader.cpp
NotFoundHandler.cpp
PingRequestHandler.cpp
ReplicasStatusHandler.cpp
RootRequestHandler.cpp
Server.cpp
StatusFile.cpp
ReplicasStatusHandler.cpp)
TCPHandler.cpp
)
target_link_libraries(clickhouse-server daemon clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions)
target_include_directories (clickhouse-server PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include)
......
#include <chrono>
#include <iomanip>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/File.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/NetException.h>
#include <ext/scope_guard.h>
......@@ -16,6 +19,7 @@
#include <IO/ConcatReadBuffer.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
......@@ -184,7 +188,7 @@ void HTTPHandler::pushDelayedResults(Output & used_output)
}
HTTPHandler::HTTPHandler(Server & server_)
HTTPHandler::HTTPHandler(IServer & server_)
: server(server_)
, log(&Logger::get("HTTPHandler"))
{
......@@ -224,8 +228,8 @@ void HTTPHandler::processQuery(
std::string quota_key = request.get("X-ClickHouse-Quota", params.get("quota_key", ""));
std::string query_id = params.get("query_id", "");
Context context = *server.global_context;
context.setGlobalContext(*server.global_context);
Context context = server.context();
context.setGlobalContext(server.context());
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
......
#pragma once
#include "IServer.h"
#include <Poco/Net/HTTPRequestHandler.h>
#include <Common/CurrentMetrics.h>
#include "Server.h"
#include <Common/HTMLForm.h>
namespace CurrentMetrics
......@@ -19,7 +23,7 @@ class CascadeWriteBuffer;
class HTTPHandler : public Poco::Net::HTTPRequestHandler
{
public:
explicit HTTPHandler(Server & server_);
explicit HTTPHandler(IServer & server_);
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
......@@ -47,12 +51,11 @@ private:
}
};
Server & server;
IServer & server;
Logger * log;
CurrentMetrics::Increment metric_increment{CurrentMetrics::HTTPConnection};
Logger * log;
/// Also initializes 'used_output'.
void processQuery(
Poco::Net::HTTPServerRequest & request,
......@@ -60,8 +63,11 @@ private:
Poco::Net::HTTPServerResponse & response,
Output & used_output);
void trySendExceptionToClient(const std::string & s, int exception_code,
Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response,
void trySendExceptionToClient(
const std::string & s,
int exception_code,
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response,
Output & used_output);
void pushDelayedResults(Output & used_output);
......
#pragma once
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include "IServer.h"
#include "HTTPHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "NotFoundHandler.h"
#include "PingRequestHandler.h"
#include "ReplicasStatusHandler.h"
#include "RootRequestHandler.h"
namespace DB
{
template <typename HandlerType>
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
private:
IServer & server;
Logger * log;
std::string name;
public:
HTTPRequestHandlerFactory(IServer & server_, const std::string & name_) : server(server_), log(&Logger::get(name_)), name(name_)
{
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
{
LOG_TRACE(log,
"HTTP Request for " << name << ". "
<< "Method: "
<< request.getMethod()
<< ", Address: "
<< request.clientAddress().toString()
<< ", User-Agent: "
<< (request.has("User-Agent") ? request.get("User-Agent") : "none"));
const auto & uri = request.getURI();
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD)
{
if (uri == "/")
return new RootRequestHandler(server);
if (uri == "/ping")
return new PingRequestHandler;
else if (startsWith(uri, "/replicas_status"))
return new ReplicasStatusHandler(server.context());
}
if (uri.find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
return new HandlerType(server);
}
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
return new NotFoundHandler;
}
return nullptr;
}
};
using HTTPHandlerFactory = HTTPRequestHandlerFactory<HTTPHandler>;
using InterserverIOHTTPHandlerFactory = HTTPRequestHandlerFactory<InterserverIOHTTPHandler>;
}
#pragma once
#include <Poco/Logger.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Interpreters/Context.h>
namespace DB
{
class IServer
{
public:
/// Returns the application's configuration.
virtual Poco::Util::LayeredConfiguration & config() const = 0;
/// Returns the application's logger.
virtual Poco::Logger & logger() const = 0;
/// Returns global application's context.
virtual Context & context() const = 0;
/// Returns true if shutdown signaled.
virtual bool isCancelled() const = 0;
virtual ~IServer() {}
};
}
#include "InterserverIOHTTPHandler.h"
#include <Interpreters/InterserverIOHandler.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <common/logger_useful.h>
#include <Common/HTMLForm.h>
#include <IO/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <Interpreters/InterserverIOHandler.h>
#include "InterserverIOHTTPHandler.h"
namespace DB
{
......@@ -32,7 +38,7 @@ void InterserverIOHTTPHandler::processQuery(Poco::Net::HTTPServerRequest & reque
WriteBufferFromHTTPServerResponse out(response);
auto endpoint = server.global_context->getInterserverIOHandler().getEndpoint(endpoint_name);
auto endpoint = server.context().getInterserverIOHandler().getEndpoint(endpoint_name);
if (compress)
{
......
#pragma once
#include "Server.h"
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Common/CurrentMetrics.h>
#include "IServer.h"
namespace CurrentMetrics
{
......@@ -15,20 +19,21 @@ namespace DB
class InterserverIOHTTPHandler : public Poco::Net::HTTPRequestHandler
{
public:
InterserverIOHTTPHandler(Server & server_)
InterserverIOHTTPHandler(IServer & server_)
: server(server_)
, log(&Logger::get("InterserverIOHTTPHandler"))
, log(&Poco::Logger::get("InterserverIOHTTPHandler"))
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
private:
Server & server;
IServer & server;
Poco::Logger * log;
CurrentMetrics::Increment metric_increment{CurrentMetrics::InterserverConnection};
Logger * log;
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
void processQuery(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response);
};
}
#include "NotFoundHandler.h"
#include <IO/HTTPCommon.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
void NotFoundHandler::handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response)
{
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND);
response.send() << "There is no handle " << request.getURI() << "\n\n"
<< "Use / or /ping for health checks.\n"
<< "Or /replicas_status for more sophisticated health checks.\n\n"
<< "Send queries from your program with POST method or GET /?query=...\n\n"
<< "Use clickhouse-client:\n\n"
<< "For interactive data analysis:\n"
<< " clickhouse-client\n\n"
<< "For batch query processing:\n"
<< " clickhouse-client --query='SELECT 1' > result\n"
<< " clickhouse-client < query > result\n";
}
catch (...)
{
tryLogCurrentException("NotFoundHandler");
}
}
}
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
namespace DB
{
/// Response with 404 and verbose description.
class NotFoundHandler : public Poco::Net::HTTPRequestHandler
{
public:
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
};
}
#include "PingRequestHandler.h"
#include <IO/HTTPCommon.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
void PingRequestHandler::handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response)
{
try
{
setResponseDefaultHeaders(response);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingRequestHandler");
}
}
}
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
namespace DB
{
/// Response with "Ok.\n". Used for availability checks.
class PingRequestHandler : public Poco::Net::HTTPRequestHandler
{
public:
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
};
}
#include "RootRequestHandler.h"
#include <IO/HTTPCommon.h>
#include <Common/Exception.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
namespace DB
{
void RootRequestHandler::handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response)
{
try
{
setResponseDefaultHeaders(response);
response.setContentType("text/html; charset=UTF-8");
const std::string data = server.config().getString("http_server_default_response", "Ok.\n");
response.sendBuffer(data.data(), data.size());
}
catch (...)
{
tryLogCurrentException("RootRequestHandler");
}
}
}
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
#include "IServer.h"
namespace DB
{
/// Response with custom string. Can be used for browser.
class RootRequestHandler : public Poco::Net::HTTPRequestHandler
{
private:
IServer & server;
public:
RootRequestHandler(IServer & server_) : server(server_)
{
}
void handleRequest(
Poco::Net::HTTPServerRequest & request,
Poco::Net::HTTPServerResponse & response) override;
};
}
......@@ -2,51 +2,54 @@
#include <memory>
#include <sys/resource.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Net/DNS.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/Util/XMLConfiguration.h>
#include <ext/scope_guard.h>
#include <common/ApplicationServerExt.h>
#include <common/ErrorHandlers.h>
#include <ext/scope_guard.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <common/getMemoryAmount.h>
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentMetrics.h>
#include <Common/Macros.h>
#include <Common/StringUtils.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <Common/config.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <common/getMemoryAmount.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/CurrentMetrics.h>
#include <Common/ClickHouseRevision.h>
#include <IO/HTTPCommon.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/DDLWorker.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/DDLWorker.h>
#include <Storages/MergeTree/ReshardingWorker.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include "ConfigReloader.h"
#include "HTTPHandler.h"
#include "InterserverIOHTTPHandler.h"
#include "HTTPHandlerFactory.h"
#include "MetricsTransmitter.h"
#include "ReplicasStatusHandler.h"
#include "StatusFile.h"
#include "TCPHandler.h"
#include "TCPHandlerFactory.h"
#include <Common/config.h>
#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h>
#endif
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
namespace CurrentMetrics
{
......@@ -56,156 +59,13 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
namespace ErrorCodes
{
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
}
/// Response with "Ok.\n". Used for availability checks.
class PingRequestHandler : public Poco::Net::HTTPRequestHandler
{
public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
{
try
{
setResponseDefaultHeaders(response);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingRequestHandler");
}
}
};
/// Response with custom string. Can be used for browser.
class RootRequestHandler : public Poco::Net::HTTPRequestHandler
{
public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
{
try
{
setResponseDefaultHeaders(response);
response.setContentType("text/html; charset=UTF-8");
const std::string data = Poco::Util::Application::instance().config().getString("http_server_default_response", "Ok.\n");
response.sendBuffer(data.data(), data.size());
}
catch (...)
{
tryLogCurrentException("RootRequestHandler");
}
}
};
/// Response with 404 and verbose description.
class NotFoundHandler : public Poco::Net::HTTPRequestHandler
{
public:
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
{
try
{
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND);
response.send() << "There is no handle " << request.getURI() << "\n\n"
<< "Use / or /ping for health checks.\n"
<< "Or /replicas_status for more sophisticated health checks.\n\n"
<< "Send queries from your program with POST method or GET /?query=...\n\n"
<< "Use clickhouse-client:\n\n"
<< "For interactive data analysis:\n"
<< " clickhouse-client\n\n"
<< "For batch query processing:\n"
<< " clickhouse-client --query='SELECT 1' > result\n"
<< " clickhouse-client < query > result\n";
}
catch (...)
{
tryLogCurrentException("NotFoundHandler");
}
}
};
template <typename HandlerType>
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
private:
Server & server;
Logger * log;
std::string name;
public:
HTTPRequestHandlerFactory(Server & server_, const std::string & name_) : server(server_), log(&Logger::get(name_)), name(name_)
{
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override
{
LOG_TRACE(log,
"HTTP Request for " << name << ". "
<< "Method: "
<< request.getMethod()
<< ", Address: "
<< request.clientAddress().toString()
<< ", User-Agent: "
<< (request.has("User-Agent") ? request.get("User-Agent") : "none"));
const auto & uri = request.getURI();
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD)
{
if (uri == "/")
return new RootRequestHandler;
if (uri == "/ping")
return new PingRequestHandler;
else if (startsWith(uri, "/replicas_status"))
return new ReplicasStatusHandler(*server.global_context);
}
if (uri.find('?') != std::string::npos || request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
return new HandlerType(server);
}
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET || request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
return new NotFoundHandler;
}
return nullptr;
}
};
class TCPConnectionFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
Server & server;
Logger * log;
public:
TCPConnectionFactory(Server & server_) : server(server_), log(&Logger::get("TCPConnectionFactory"))
{
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override
{
LOG_TRACE(log,
"TCP Request. "
<< "Address: "
<< socket.peerAddress().toString());
return new TCPHandler(server, socket);
}
};
static std::string getCanonicalPath(std::string && path)
{
Poco::trimInPlace(path);
......@@ -372,7 +232,7 @@ int Server::main(const std::vector<std::string> & args)
[&](ConfigurationPtr config) { global_context->setUsersConfig(config); },
/* already_loaded = */ false);
/// Limit on total number of coucurrently executed queries.
/// Limit on total number of concurrently executed queries.
global_context->getProcessList().setMaxSize(config().getInt("max_concurrent_queries", 0));
/// Setup protection to avoid accidental DROP for big tables (that are greater than 50 GB by default)
......@@ -499,7 +359,10 @@ int Server::main(const std::vector<std::string> & args)
http_socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPRequestHandlerFactory<HTTPHandler>(*this, "HTTPHandler-factory"), server_pool, http_socket, http_params));
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
server_pool,
http_socket,
http_params));
LOG_INFO(log, "Listening http://" + http_socket_address.toString());
}
......@@ -507,7 +370,7 @@ int Server::main(const std::vector<std::string> & args)
/// HTTPS
if (config().has("https_port"))
{
#if Poco_NetSSL_FOUND
#if Poco_NetSSL_FOUND
std::call_once(ssl_init_once, SSLInit);
Poco::Net::SocketAddress http_socket_address = make_socket_address(listen_host, config().getInt("https_port"));
Poco::Net::SecureServerSocket http_socket(http_socket_address);
......@@ -515,13 +378,16 @@ int Server::main(const std::vector<std::string> & args)
http_socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPRequestHandlerFactory<HTTPHandler>(*this, "HTTPHandler-factory"), server_pool, http_socket, http_params));
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
server_pool,
http_socket,
http_params));
LOG_INFO(log, "Listening https://" + http_socket_address.toString());
#else
#else
throw Exception{"https protocol disabled because poco library built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
#endif
}
/// TCP
......@@ -531,8 +397,11 @@ int Server::main(const std::vector<std::string> & args)
Poco::Net::ServerSocket tcp_socket(tcp_address);
tcp_socket.setReceiveTimeout(settings.receive_timeout);
tcp_socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(
new Poco::Net::TCPServer(new TCPConnectionFactory(*this), server_pool, tcp_socket, new Poco::Net::TCPServerParams));
servers.emplace_back(new Poco::Net::TCPServer(
new TCPHandlerFactory(*this),
server_pool,
tcp_socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening tcp: " + tcp_address.toString());
}
......@@ -549,7 +418,7 @@ int Server::main(const std::vector<std::string> & args)
interserver_io_http_socket.setReceiveTimeout(settings.receive_timeout);
interserver_io_http_socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(new Poco::Net::HTTPServer(
new HTTPRequestHandlerFactory<InterserverIOHTTPHandler>(*this, "InterserverIOHTTPHandler-factory"),
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
interserver_io_http_socket,
http_params));
......@@ -568,7 +437,6 @@ int Server::main(const std::vector<std::string> & args)
else
throw;
}
}
if (servers.empty())
......
#pragma once
#include <Poco/URI.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPServerParams.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/Net/TCPServer.h>
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Net/TCPServerConnection.h>
#include "IServer.h"
#include <common/logger_useful.h>
#include <daemon/BaseDaemon.h>
#include <Common/HTMLForm.h>
#include <Interpreters/Context.h>
/** Server provides three interfaces:
* 1. HTTP - simple interface for any applications.
......@@ -37,11 +19,28 @@
namespace DB
{
class Server : public BaseDaemon
class Server : public BaseDaemon, public IServer
{
public:
/// Global settings of server.
std::unique_ptr<Context> global_context;
Poco::Util::LayeredConfiguration & config() const override
{
return BaseDaemon::config();
}
Poco::Logger & logger() const override
{
return BaseDaemon::logger();
}
Context & context() const override
{
return *global_context;
}
bool isCancelled() const override
{
return BaseDaemon::isCancelled();
}
protected:
void initialize(Application & self) override
......@@ -58,8 +57,10 @@ protected:
int main(const std::vector<std::string> & args) override;
private:
std::string getDefaultCorePath() const override;
private:
std::unique_ptr<Context> global_context;
};
}
......@@ -49,10 +49,10 @@ namespace ErrorCodes
void TCPHandler::runImpl()
{
connection_context = *server.global_context;
connection_context = server.context();
connection_context.setSessionContext(connection_context);
Settings global_settings = server.global_context->getSettings();
Settings global_settings = connection_context.getSettings();
socket().setReceiveTimeout(global_settings.receive_timeout);
socket().setSendTimeout(global_settings.send_timeout);
......@@ -117,11 +117,11 @@ void TCPHandler::runImpl()
while (1)
{
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) && !BaseDaemon::instance().isCancelled())
while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) && !server.isCancelled())
;
/// If we need to shut down, or client disconnects.
if (BaseDaemon::instance().isCancelled() || in->eof())
if (server.isCancelled() || in->eof())
break;
Stopwatch watch;
......@@ -257,7 +257,7 @@ void TCPHandler::readData(const Settings & global_settings)
break;
/// Do we need to shut down?
if (BaseDaemon::instance().isCancelled())
if (server.isCancelled())
return;
/** Have we waited for data for too long?
......@@ -778,5 +778,4 @@ void TCPHandler::run()
}
}
}
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Core/Progress.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/BlockIO.h>
#include <Common/Stopwatch.h>
#include <Common/CurrentMetrics.h>
#include <Core/Progress.h>
#include "Server.h"
#include "IServer.h"
namespace CurrentMetrics
{
......@@ -71,17 +73,19 @@ struct QueryState
class TCPHandler : public Poco::Net::TCPServerConnection
{
public:
TCPHandler(Server & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_), server(server_),
log(&Logger::get("TCPHandler")), client_revision(0),
connection_context(*server.global_context), query_context(connection_context)
TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Logger::get("TCPHandler"))
, connection_context(server.context())
, query_context(server.context())
{
}
void run();
private:
Server & server;
IServer & server;
Logger * log;
String client_name;
......@@ -143,5 +147,4 @@ private:
void updateProgress(const Progress & value);
};
}
#pragma once
#include <Poco/Net/TCPServerConnectionFactory.h>
#include "IServer.h"
#include "TCPHandler.h"
namespace DB
{
class TCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
IServer & server;
Logger * log;
public:
TCPHandlerFactory(IServer & server_)
: server(server_)
, log(&Logger::get("TCPHandlerFactory"))
{
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override
{
LOG_TRACE(log,
"TCP Request. "
<< "Address: "
<< socket.peerAddress().toString());
return new TCPHandler(server, socket);
}
};
}
......@@ -24,6 +24,7 @@ namespace ErrorCodes
extern const int ABORTED;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int TOO_MUCH_SIMULTANEOUS_QUERIES;
extern const int CANNOT_WRITE_TO_OSTREAM;
}
namespace DataPartsExchange
......@@ -143,7 +144,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & body
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::ABORTED)
if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM)
typeid_cast<StorageReplicatedMergeTree &>(*owned_storage).enqueuePartForCheck(part_name);
throw;
}
......
......@@ -234,7 +234,7 @@ void MergeTreeDataPartChecksums::add(MergeTreeDataPartChecksums && rhs_checksums
rhs_checksums.files.clear();
}
/// Control sum computed from the set of control sums of .bin files.
/// Checksum computed from the set of control sums of .bin files.
void MergeTreeDataPartChecksums::summaryDataChecksum(SipHash & hash) const
{
/// We use fact that iteration is in deterministic (lexicographical) order.
......
......@@ -44,6 +44,7 @@ struct MergeTreeDataPartChecksums
{
using Checksum = MergeTreeDataPartChecksum;
/// The order is important.
using FileChecksums = std::map<String, Checksum>;
FileChecksums files;
......
......@@ -16,7 +16,8 @@ namespace ErrorCodes
ReplicatedMergeTreeCleanupThread::ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_)
: storage(storage_),
log(&Logger::get(storage.database_name + "." + storage.table_name + " (StorageReplicatedMergeTree, CleanupThread)")),
thread([this] { run(); }) {}
thread([this] { run(); }),
cached_block_stats(std::make_unique<NodesStatCache>()) {}
void ReplicatedMergeTreeCleanupThread::run()
......@@ -96,7 +97,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
{
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(storage.zookeeper_path + "/log/" + entries[i], -1));
if (ops.size() > 400 || i + 1 == entries.size())
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
{
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(std::make_unique<zkutil::Op::Check>(storage.zookeeper_path + "/replicas", stat.version));
......@@ -109,10 +110,119 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs()
}
namespace
{
/// Just a subset of zkutil::Stat fields required for the cache
struct RequiredStat
{
int64_t ctime = 0;
int32_t numChildren = 0;
RequiredStat() = default;
RequiredStat(const RequiredStat &) = default;
explicit RequiredStat(const zkutil::Stat & s) : ctime(s.ctime), numChildren(s.numChildren) {};
explicit RequiredStat(Int64 ctime_) : ctime(ctime_) {}
};
}
/// Just a node name with its ZooKeeper's stat
struct ReplicatedMergeTreeCleanupThread::NodeWithStat
{
String node;
RequiredStat stat;
NodeWithStat() = default;
NodeWithStat(const String & node_, const RequiredStat & stat_) : node(node_), stat(stat_) {}
static bool greaterByTime (const NodeWithStat & lhs, const NodeWithStat & rhs)
{
return std::greater<void>()(std::forward_as_tuple(lhs.stat.ctime, lhs.node), std::forward_as_tuple(rhs.stat.ctime, rhs.node));
}
};
/// Use simple map node_name -> zkutil::Stat (only required fields) as the cache
/// It is not declared in the header explicitly to hide extra implementation dependent structs like RequiredStat
class ReplicatedMergeTreeCleanupThread::NodesStatCache : public std::map<String, RequiredStat> {};
void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
{
auto zookeeper = storage.getZooKeeper();
std::vector<NodeWithStat> timed_blocks;
getBlocksSortedByTime(zookeeper, timed_blocks);
if (timed_blocks.empty())
return;
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().stat.ctime;
Int64 time_threshold = std::max(0L, current_time - static_cast<Int64>(1000 * storage.data.settings.replicated_deduplication_window_seconds));
/// Virtual node, all nodes that are "greater" than this one will be deleted
NodeWithStat block_threshold("", RequiredStat(time_threshold));
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
/// TODO After about half a year, we could remain only multi op, because there will be no obsolete children nodes.
std::vector<zkutil::ZooKeeper::MultiFuture> multi_futures;
zkutil::Ops ops;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
String path = storage.zookeeper_path + "/blocks/" + it->node;
if (it->stat.numChildren == 0)
{
ops.emplace_back(new zkutil::Op::Remove(path, -1));
if (ops.size() >= zkutil::MULTI_BATCH_SIZE)
{
multi_futures.emplace_back(zookeeper->tryAsyncMulti(ops));
ops.clear();
}
}
else
zookeeper->removeRecursive(path);
}
if (!ops.empty())
{
multi_futures.emplace_back(zookeeper->tryAsyncMulti(ops));
ops.clear();
}
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
size_t num_nodes_not_deleted = 0;
int last_error_code = ZOK;
for (auto & future : multi_futures)
{
auto res = future.get();
if (res.code != ZOK)
{
num_nodes_not_deleted += res.results->size();
last_error_code = res.code;
}
}
if (num_nodes_not_deleted)
{
LOG_ERROR(log, "There was a problem with deleting " << num_nodes_not_deleted << " (of " << num_nodes_to_delete << ")"
<< " old blocks from ZooKeeper, error: " << zkutil::ZooKeeper::error2string(last_error_code));
}
else
LOG_TRACE(log, "Cleared " << num_nodes_to_delete << " old blocks from ZooKeeper");
}
void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeperPtr & zookeeper, std::vector<NodeWithStat> & timed_blocks)
{
timed_blocks.clear();
Strings blocks;
zkutil::Stat stat;
if (ZOK != zookeeper->tryGetChildren(storage.zookeeper_path + "/blocks", blocks, &stat))
......@@ -121,66 +231,54 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
/// Clear already deleted blocks from the cache, cached_block_ctime should be subset of blocks
{
NameSet blocks_set(blocks.begin(), blocks.end());
for (auto it = cached_block_ctime.begin(); it != cached_block_ctime.end();)
for (auto it = cached_block_stats->begin(); it != cached_block_stats->end();)
{
if (!blocks_set.count(it->first))
it = cached_block_ctime.erase(it);
it = cached_block_stats->erase(it);
else
++it;
}
}
auto not_cached_blocks = stat.numChildren - cached_block_ctime.size();
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
auto not_cached_blocks = stat.numChildren - cached_block_stats->size();
LOG_TRACE(log, "Checking " << stat.numChildren << " blocks (" << not_cached_blocks << " are not cached)"
<< " to clear old ones from ZooKeeper. This might take several minutes.");
/// Time -> block hash from ZooKeeper (from node name)
using TimedBlock = std::pair<Int64, String>;
using TimedBlocksComparator = std::greater<TimedBlock>;
std::vector<TimedBlock> timed_blocks;
std::vector<std::pair<String, zkutil::ZooKeeper::ExistsFuture>> exists_futures;
for (const String & block : blocks)
{
auto it = cached_block_ctime.find(block);
if (it == cached_block_ctime.end())
auto it = cached_block_stats->find(block);
if (it == cached_block_stats->end())
{
/// New block. Fetch its stat and put it into the cache
zkutil::Stat block_stat;
zookeeper->exists(storage.zookeeper_path + "/blocks/" + block, &block_stat);
cached_block_ctime.emplace(block, block_stat.ctime);
timed_blocks.emplace_back(block_stat.ctime, block);
/// New block. Fetch its stat stat asynchronously
exists_futures.emplace_back(block, zookeeper->asyncExists(storage.zookeeper_path + "/blocks/" + block));
}
else
{
/// Cached block
timed_blocks.emplace_back(it->second, block);
timed_blocks.emplace_back(block, it->second);
}
}
if (timed_blocks.empty())
return;
std::sort(timed_blocks.begin(), timed_blocks.end(), TimedBlocksComparator());
/// Put fetched stats into the cache
for (auto & elem : exists_futures)
{
zkutil::ZooKeeper::StatAndExists status = elem.second.get();
if (!status.exists)
throw zkutil::KeeperException("A block node was suddenly deleted", ZNONODE);
/// Use ZooKeeper's first node (last according to time) timestamp as "current" time.
Int64 current_time = timed_blocks.front().first;
Int64 time_threshold = std::max(0L, current_time - static_cast<Int64>(storage.data.settings.replicated_deduplication_window_seconds));
TimedBlock block_threshold(time_threshold, "");
cached_block_stats->emplace(elem.first, status.stat);
timed_blocks.emplace_back(elem.first, RequiredStat(status.stat));
}
size_t current_deduplication_window = std::min(timed_blocks.size(), storage.data.settings.replicated_deduplication_window);
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
auto first_outdated_block_time_threshold = std::upper_bound(timed_blocks.begin(), timed_blocks.end(), block_threshold, TimedBlocksComparator());
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
}
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
/// TODO After about half a year, we could replace this to multi op, because there will be no obsolete children nodes.
zookeeper->removeRecursive(storage.zookeeper_path + "/blocks/" + it->second);
cached_block_ctime.erase(it->second);
}
LOG_TRACE(log, "Cleared " << timed_blocks.end() - first_outdated_block << " old blocks from ZooKeeper");
ReplicatedMergeTreeCleanupThread::~ReplicatedMergeTreeCleanupThread()
{
if (thread.joinable())
thread.join();
}
}
#pragma once
#include <Core/Types.h>
#include <Common/ZooKeeper/Types.h>
#include <common/logger_useful.h>
#include <thread>
#include <map>
......@@ -19,11 +20,7 @@ class ReplicatedMergeTreeCleanupThread
public:
ReplicatedMergeTreeCleanupThread(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeCleanupThread()
{
if (thread.joinable())
thread.join();
}
~ReplicatedMergeTreeCleanupThread();
private:
StorageReplicatedMergeTree & storage;
......@@ -39,7 +36,12 @@ private:
/// Remove old block hashes from ZooKeeper. This makes a leading replica.
void clearOldBlocks();
std::map<String, Int64> cached_block_ctime;
class NodesStatCache;
struct NodeWithStat;
std::unique_ptr<NodesStatCache> cached_block_stats;
/// Returns list of blocks (with their stat) sorted by ctime in descending order
void getBlocksSortedByTime(std::shared_ptr<zkutil::ZooKeeper> & zookeeper, std::vector<NodeWithStat> & timed_blocks);
/// TODO Removing old quorum/failed_parts
/// TODO Removing old nonincrement_block_numbers
......
......@@ -290,7 +290,6 @@ StoragePtr StorageFactory::get(
}
else if (name == "Dictionary")
{
return StorageDictionary::create(
table_name, context, query, columns,
materialized_columns, alias_columns, column_defaults);
......
......@@ -833,13 +833,11 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
}
/// Remove from ZK information about the parts covered by the newly added ones.
for (const String & name : expected_parts)
{
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
for (const String & name : expected_parts)
LOG_ERROR(log, "Removing unexpectedly merged local part from ZooKeeper: " << name);
zkutil::Ops ops;
removePossiblyIncompletePartNodeFromZooKeeper(name, ops, zookeeper);
zookeeper->multi(ops);
removePartsFromZooKeeper(zookeeper, Strings(expected_parts.begin(), expected_parts.end()));
}
/// Add to the queue job to pick up the missing parts from other replicas and remove from ZK the information that we have them.
......@@ -855,7 +853,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
/// We assume that this occurs before the queue is loaded (queue.initialize).
zkutil::Ops ops;
removePossiblyIncompletePartNodeFromZooKeeper(name, ops, zookeeper);
removePartFromZooKeeper(name, ops);
ops.emplace_back(std::make_unique<zkutil::Op::Create>(
replica_path + "/queue/queue-", log_entry.toString(), zookeeper->getDefaultACL(), zkutil::CreateMode::PersistentSequential));
zookeeper->multi(ops);
......@@ -1879,25 +1877,6 @@ void StorageReplicatedMergeTree::removePartFromZooKeeper(const String & part_nam
}
/// Workarond for known ZooKeeper problem, see CLICKHOUSE-3040 and ZOOKEEPER-2362
/// Multi operation was non-atomic on special wrongly-patched version of ZooKeeper
/// (occasionally used in AdFox) in case of exceeded quota.
void StorageReplicatedMergeTree::removePossiblyIncompletePartNodeFromZooKeeper(const String & part_name, zkutil::Ops & ops, const zkutil::ZooKeeperPtr & zookeeper)
{
String part_path = replica_path + "/parts/" + part_name;
Names children_ = zookeeper->getChildren(part_path);
NameSet children(children_.begin(), children_.end());
if (children.size() != 2)
LOG_WARNING(log, "Will remove incomplete part node " << part_path << " from ZooKeeper");
if (children.count("checksums"))
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(part_path + "/checksums", -1));
if (children.count("columns"))
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(part_path + "/columns", -1));
ops.emplace_back(std::make_unique<zkutil::Op::Remove>(part_path, -1));
}
void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_name)
{
auto zookeeper = getZooKeeper();
......@@ -3812,26 +3791,27 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
try
{
LOG_DEBUG(log, "Removing " << parts.size() << " old parts from file system");
Strings part_names;
while (!parts.empty())
{
MergeTreeData::DataPartPtr & part = parts.back();
LOG_DEBUG(log, "Removing " << part->name);
try
{
zkutil::Ops ops;
removePossiblyIncompletePartNodeFromZooKeeper(part->name, ops, zookeeper);
zookeeper->multi(ops);
}
catch (const zkutil::KeeperException & e)
{
LOG_WARNING(log, "Couldn't remove " << part->name << " from ZooKeeper: " << zkutil::ZooKeeper::error2string(e.code));
}
part->remove();
part_names.emplace_back(part->name);
parts.pop_back();
}
LOG_DEBUG(log, "Removed " << part_names.size() << " old parts from file system. Removing them from ZooKeeper.");
try
{
removePartsFromZooKeeper(zookeeper, part_names);
}
catch (const zkutil::KeeperException & e)
{
LOG_ERROR(log, "There is a problem with deleting parts from ZooKeeper: " << getCurrentExceptionMessage(false));
}
}
catch (...)
{
......@@ -3844,4 +3824,33 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK(Logger * log_)
}
void StorageReplicatedMergeTree::removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names)
{
zkutil::Ops ops;
std::vector<zkutil::ZooKeeper::MultiFuture> futures;
for (auto it = part_names.cbegin(); it != part_names.cend(); ++it)
{
removePartFromZooKeeper(*it, ops);
if (ops.size() >= zkutil::MULTI_BATCH_SIZE || next(it) == part_names.cend())
{
futures.emplace_back(zookeeper->tryAsyncMulti(ops));
ops.clear();
}
}
int last_error_code = ZOK;
for (auto & future : futures)
{
auto res = future.get();
if (res.code != ZOK)
last_error_code = res.code;
}
if (last_error_code != ZOK)
throw zkutil::KeeperException(last_error_code);
}
}
......@@ -374,9 +374,8 @@ private:
/// Adds actions to `ops` that remove a part from ZooKeeper.
void removePartFromZooKeeper(const String & part_name, zkutil::Ops & ops);
/// Like removePartFromZooKeeper, but handles absence of some nodes and remove other nodes anyway, see CLICKHOUSE-3040
/// Use it only in non-critical places for cleaning.
void removePossiblyIncompletePartNodeFromZooKeeper(const String & part_name, zkutil::Ops & ops, const zkutil::ZooKeeperPtr & zookeeper);
/// Quickly removes big set of parts from ZooKeeper (using async multi queries)
void removePartsFromZooKeeper(zkutil::ZooKeeperPtr & zookeeper, const Strings & part_names);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);
......
......@@ -7,6 +7,7 @@ DST=${1:-.};
PATH="/usr/local/bin:/usr/local/sbin:/usr/bin:$PATH"
LD=$(command -v gold || command -v ld.gold || command -v ld)
# Should be runned with correct path to clang
if [ -z "$CLANG" ]; then
CLANG=$(which clang)
fi
......@@ -21,8 +22,8 @@ if [ ! -x "$LD" ]; then
exit 1
fi
cp "$CLANG" $DST
cp "$LD" ${DST}/ld
cp "$CLANG" "${DST}/clang"
cp "$LD" "${DST}/ld"
STDCPP=$(ldd $CLANG | grep -oE '/[^ ]+libstdc++[^ ]+')
......
......@@ -21,7 +21,8 @@ DEB_HOST_MULTIARCH ?= $(shell dpkg-architecture -qDEB_HOST_MULTIARCH)
DEB_CC ?= gcc-6
DEB_CXX ?= g++-6
DEB_CLANG ?= $(shell which clang)
DEB_CLANG ?= $(shell which clang-6.0 || which clang-5.0 || which clang-4.0 || which clang || which clang-3.9 || which clang-3.8)
# CMAKE_FLAGS_ADD += -DINTERNAL_COMPILER_EXECUTABLE=$(basename $(DEB_CLANG)) # TODO: this is actual only if you will also change clang name in copy_clang_binaries.sh
DEB_BUILD_GNU_TYPE := $(shell dpkg-architecture -qDEB_BUILD_GNU_TYPE)
DEB_HOST_GNU_TYPE := $(shell dpkg-architecture -qDEB_HOST_GNU_TYPE)
......
......@@ -280,7 +280,7 @@ Examples: ``uniqArrayIf(arr, cond)``, ``quantilesTimingArrayIf(level1, level2)(
State combinator
----------------
If this combinator is used, the aggregate function returns a non-completed/non-finished value (for example, in the case of the ``uniq`` function, the number of unique values), and the intermediate aggregation state (for example, in the case of the ``uniq`` function, a hash table for calculating the number of unique values), which has type of ``AggregateFunction(...)`` and can be used for further processing or can be saved to a table for subsequent pre-aggregation - see the sections "AggregatingMergeTree" and "functions for working with intermediate aggregation states".
If this combinator is used, the aggregate function returns intermediate aggregation state (for example, in the case of the ``uniqCombined`` function, a HyperLogLog structure for calculating the number of unique values), which has type of ``AggregateFunction(...)`` and can be used for further processing or can be saved to a table for subsequent pre-aggregation - see the sections "AggregatingMergeTree" and "functions for working with intermediate aggregation states".
Merge combinator
----------------
......
......@@ -43,7 +43,7 @@ By default, files are searched for in this order:
.. code-block:: text
./clickhouse-client.xml
~/./clickhouse-client/config.xml
~/.clickhouse-client/config.xml
/etc/clickhouse-client/config.xml
Settings are only taken from the first file found.
......
......@@ -41,7 +41,7 @@
``--config-file`` - имя конфигурационного файла, в котором есть дополнительные настройки или изменены умолчания для настроек, указанных выше.
По умолчанию, ищутся файлы в следующем порядке:
./clickhouse-client.xml
~/./clickhouse-client/config.xml
~/.clickhouse-client/config.xml
/etc/clickhouse-client/config.xml
Настройки берутся только из первого найденного файла.
......
......@@ -84,6 +84,7 @@ target_include_directories (common PUBLIC ${COMMON_INCLUDE_DIR})
target_link_libraries (
common
pocoext
${CITYHASH_LIBRARIES}
${CCTZ_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
......
#pragma once
#include <iostream>
// TODO: https://stackoverflow.com/questions/16464032/how-to-enhance-this-variable-dumping-debug-macro-to-be-variadic
#define DUMPS(VAR) #VAR " = " << VAR
#define DUMPHEAD std::cerr << __FILE__ << ":" << __LINE__ << " "
#define DUMP(V1) DUMPHEAD << DUMPS(V1) << "\n";
#define DUMP2(V1, V2) DUMPHEAD << DUMPS(V1) << ", " << DUMPS(V2) << "\n";
#define DUMP3(V1, V2, V3) DUMPHEAD << DUMPS(V1) << ", " << DUMPS(V2) << ", " << DUMPS(V3) << "\n";
#define DUMP4(V1, V2, V3, V4) DUMPHEAD << DUMPS(V1) << ", " << DUMPS(V2) << ", " << DUMPS(V3)<< ", " << DUMPS(V4) << "\n";
#define DUMP5(V1, V2, V3, V4, V5) DUMPHEAD << DUMPS(V1) << ", " << DUMPS(V2) << ", " << DUMPS(V3)<< ", " << DUMPS(V4) << ", " << DUMPS(V5) << "\n";
#include <utility>
......@@ -115,14 +124,14 @@ std::ostream & operator<<(std::ostream & stream, const std::ratio<Num, Denom> &
}
#include <chrono>
template <class clock, class duration>
template <typename clock, typename duration>
std::ostream & operator<<(std::ostream & stream, const std::chrono::duration<clock, duration> & what)
{
stream << "chrono::duration<clock=" << clock() << ", duration=" << duration() << ">{" << what.count() << "}";
return stream;
}
template <class clock, class duration>
template <typename clock, typename duration>
std::ostream & operator<<(std::ostream & stream, const std::chrono::time_point<clock, duration> & what)
{
stream << "chrono::time_point{" << what.time_since_epoch() << "}";
......@@ -132,7 +141,7 @@ std::ostream & operator<<(std::ostream & stream, const std::chrono::time_point<c
#include <memory>
template <class T>
template <typename T>
std::ostream & operator<<(std::ostream & stream, const std::shared_ptr<T> & what)
{
stream << "shared_ptr(use_count = " << what.use_count() << ") {";
......@@ -144,10 +153,22 @@ std::ostream & operator<<(std::ostream & stream, const std::shared_ptr<T> & what
return stream;
}
template <typename T>
std::ostream & operator<<(std::ostream & stream, const std::unique_ptr<T> & what)
{
stream << "unique_ptr {";
if (what)
stream << *what;
else
stream << "nullptr";
stream << "}";
return stream;
}
#include <experimental/optional>
template <class T>
template <typename T>
std::ostream & operator<<(std::ostream & stream, const std::experimental::optional<T> & what)
{
stream << "optional{";
......
......@@ -79,7 +79,7 @@ public:
void kill();
/// Получен ли сигнал на завершение?
bool isCancelled()
bool isCancelled() const
{
return is_cancelled;
}
......
......@@ -7,5 +7,4 @@ add_library (pocoext
target_include_directories (pocoext PUBLIC include PRIVATE ${COMMON_INCLUDE_DIR})
target_link_libraries(pocoext ${Poco_Util_LIBRARY} ${Poco_Net_LIBRARY} ${Poco_XML_LIBRARY} ${Poco_Foundation_LIBRARY})
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册