diff --git a/dbms/src/Functions/CMakeLists.txt b/dbms/src/Functions/CMakeLists.txt index d86e43cb43529e38cc3443ac1c069d89c972103a..cf0bf00b07518dd732fa19b73419a5670b8e9abe 100644 --- a/dbms/src/Functions/CMakeLists.txt +++ b/dbms/src/Functions/CMakeLists.txt @@ -78,7 +78,7 @@ list(REMOVE_ITEM clickhouse_functions_sources IFunction.cpp FunctionFactory.cpp list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h FunctionHelpers.h) add_library(clickhouse_functions ${clickhouse_functions_sources}) -target_link_libraries(clickhouse_functions dbms) +target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing) target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash) target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src) target_include_directories (clickhouse_functions BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR}) diff --git a/dbms/src/Functions/FunctionsConsistentHashing.cpp b/dbms/src/Functions/FunctionsConsistentHashing.cpp new file mode 100644 index 0000000000000000000000000000000000000000..7f93257774b9884923cefaf71ce8ae9d0b744c1c --- /dev/null +++ b/dbms/src/Functions/FunctionsConsistentHashing.cpp @@ -0,0 +1,15 @@ +#include "FunctionsConsistentHashing.h" +#include + + +namespace DB +{ + +void registerFunctionsConsistentHashing(FunctionFactory & factory) +{ + factory.registerFunction(); + factory.registerFunction(); + factory.registerFunction(); +} + +} diff --git a/dbms/src/Functions/FunctionsConsistentHashing.h b/dbms/src/Functions/FunctionsConsistentHashing.h new file mode 100644 index 0000000000000000000000000000000000000000..bd08dce9d316d4090b311aac35ba9c31b5865712 --- /dev/null +++ b/dbms/src/Functions/FunctionsConsistentHashing.h @@ -0,0 +1,202 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int ILLEGAL_COLUMN; + extern const int BAD_ARGUMENTS; +} + + +/// An O(1) time and space consistent hash algorithm by Konstantin Oblakov +struct YandexConsistentHashImpl +{ + static constexpr auto name = "yandexConsistentHash"; + + using HashType = UInt64; + /// Actually it supports UInt64, but it is effective only if n < 65536 + using ResultType = UInt32; + using BucketsCountType = ResultType; + + static inline ResultType apply(UInt64 hash, BucketsCountType n) + { + return ConsistentHashing(hash, n); + } +}; + + +/// Code from https://arxiv.org/pdf/1406.2294.pdf +static inline int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) { + int64_t b = -1, j = 0; + while (j < num_buckets) { + b = j; + key = key * 2862933555777941757ULL + 1; + j = static_cast((b + 1) * (double(1LL << 31) / double((key >> 33) + 1))); + } + return static_cast(b); +} + +struct JumpConsistentHashImpl +{ + static constexpr auto name = "jumpConsistentHash"; + + using HashType = UInt64; + using ResultType = Int32; + using BucketsCountType = ResultType; + + static inline ResultType apply(UInt64 hash, BucketsCountType n) + { + return JumpConsistentHash(hash, n); + } +}; + + +struct SumburConsistentHashImpl +{ + static constexpr auto name = "sumburConsistentHash"; + + using HashType = UInt32; + using ResultType = UInt16; + using BucketsCountType = ResultType; + + static inline ResultType apply(HashType hash, BucketsCountType n) + { + return static_cast(sumburConsistentHash(hash, n)); + } +}; + + +template +class FunctionConsistentHashImpl : public IFunction +{ +public: + + static constexpr auto name = Impl::name; + + static FunctionPtr create(const Context &) { return std::make_shared>(); }; + + String getName() const override { return name; } + + size_t getNumberOfArguments() const override { return 2; } + + DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override + { + if (!arguments[0]->isInteger()) + throw Exception("Illegal type " + arguments[0]->getName() + " of the first argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + if (arguments[0]->getSizeOfValueInMemory() > sizeof(HashType)) + throw Exception("Function " + getName() + " accepts " + std::to_string(sizeof(HashType) * 8) + "-bit integers at most" + + ", got " + arguments[0]->getName(), ErrorCodes::BAD_ARGUMENTS); + + if (!arguments[1]->isInteger()) + throw Exception("Illegal type " + arguments[1]->getName() + " of the second argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + return std::make_shared>(); + } + + bool useDefaultImplementationForConstants() const override { return true; } + ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; } + + void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override + { + if (block.getByPosition(arguments[1]).column->isColumnConst()) + executeConstBuckets(block, arguments, result); + else + throw Exception("The second argument of function " + getName() + " (number of buckets) must be constant", ErrorCodes::BAD_ARGUMENTS); + } + +private: + + using HashType = typename Impl::HashType; + using ResultType = typename Impl::ResultType; + using BucketsType = typename Impl::BucketsCountType; + static constexpr auto max_buckets = static_cast(std::numeric_limits::max()); + + template + inline BucketsType checkBucketsRange(T buckets) + { + if (unlikely(buckets <= 0)) + throw Exception("The second argument of function " + getName() + " (number of buckets) must be positive number", + ErrorCodes::BAD_ARGUMENTS); + + if (unlikely(static_cast(buckets) > max_buckets)) + throw Exception("The value of the second argument of function " + getName() + " (number of buckets) is not fit to " + + DataTypeNumber().getName(), ErrorCodes::BAD_ARGUMENTS); + + return static_cast(buckets); + } + + void executeConstBuckets(Block & block, const ColumnNumbers & arguments, size_t result) + { + Field buckets_field = (*block.getByPosition(arguments[1]).column)[0]; + BucketsType num_buckets; + + if (buckets_field.getType() == Field::Types::Int64) + num_buckets = checkBucketsRange(buckets_field.get()); + else if (buckets_field.getType() == Field::Types::UInt64) + num_buckets = checkBucketsRange(buckets_field.get()); + else + throw Exception("Illegal type " + String(buckets_field.getTypeName()) + " of the second argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + const auto & hash_col = block.getByPosition(arguments[0]).column; + const IDataType * hash_type = block.getByPosition(arguments[0]).type.get(); + auto res_col = ColumnVector::create(); + + if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else if (checkDataType(hash_type)) executeType(hash_col, num_buckets, res_col.get()); + else + throw Exception("Illegal type " + hash_type->getName() + " of the first argument of function " + getName(), + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + block.getByPosition(result).column = std::move(res_col); + } + + template + void executeType(const ColumnPtr & col_hash_ptr, BucketsType num_buckets, ColumnVector * col_result) + { + auto col_hash = checkAndGetColumn>(col_hash_ptr.get()); + if (!col_hash) + throw Exception("Illegal type of the first argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + + auto & vec_result = col_result->getData(); + const auto & vec_hash = col_hash->getData(); + + size_t size = vec_hash.size(); + vec_result.resize(size); + for (size_t i = 0; i < size; ++i) + vec_result[i] = Impl::apply(static_cast(vec_hash[i]), num_buckets); + } +}; + + +using FunctionYandexConsistentHash = FunctionConsistentHashImpl; +using FunctionJumpConsistentHash = FunctionConsistentHashImpl; +using FunctionSumburConsistentHash = FunctionConsistentHashImpl; + + +} diff --git a/dbms/src/Functions/registerFunctions.cpp b/dbms/src/Functions/registerFunctions.cpp index 365b4a730bf85d68c0e5e12640629f4ab1b49a35..0dcc66bfd776cd45cfd2d7a176dc99cdd969260a 100644 --- a/dbms/src/Functions/registerFunctions.cpp +++ b/dbms/src/Functions/registerFunctions.cpp @@ -24,6 +24,7 @@ void registerFunctionsExternalDictionaries(FunctionFactory &); void registerFunctionsExternalModels(FunctionFactory &); void registerFunctionsFormatting(FunctionFactory &); void registerFunctionsHashing(FunctionFactory &); +void registerFunctionsConsistentHashing(FunctionFactory &); void registerFunctionsHigherOrder(FunctionFactory &); void registerFunctionsLogical(FunctionFactory &); void registerFunctionsMiscellaneous(FunctionFactory &); @@ -60,6 +61,7 @@ void registerFunctions() registerFunctionsExternalModels(factory); registerFunctionsFormatting(factory); registerFunctionsHashing(factory); + registerFunctionsConsistentHashing(factory); registerFunctionsHigherOrder(factory); registerFunctionsLogical(factory); registerFunctionsMiscellaneous(factory); diff --git a/dbms/src/Server/ClusterCopier.h b/dbms/src/Server/ClusterCopier.h index 1a76a0c8c1158fda80a46736d9844f9950548926..b24b08be0e701df1c43164b87598b7abb7320267 100644 --- a/dbms/src/Server/ClusterCopier.h +++ b/dbms/src/Server/ClusterCopier.h @@ -1,132 +1,12 @@ #pragma once #include -/* = clickhouse-cluster-copier util = +/* clickhouse cluster copier util * Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner. * - * Configuration of copying tasks is set in special ZooKeeper node (called the description node). - * A ZooKeeper path to the description node is specified via --task-path parameter. - * So, node /task/path/description should contain special XML content describing copying tasks. + * See overview in the docs: docs/en/utils/clickhouse-copier.md * - * Simultaneously many clickhouse-cluster-copier processes located on any servers could execute the same task. - * ZooKeeper node /task/path/ is used by the processes to coordinate their work. - * You must not add additional child nodes to /task/path/. - * - * Currently you are responsible for launching cluster-copier processes. - * You can launch as many processes as you want, whenever and wherever you want. - * Each process try to select nearest available shard of source cluster and copy some part of data (partition) from it to the whole - * destination cluster with resharding. - * Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage. - * - * Since the workers coordinate their work via ZooKeeper, in addition to --task-path you have to specify ZooKeeper - * configuration via --config-file parameter. Example of zookeeper.xml: - - - - - 127.0.0.1 - 2181 - - - - - * When you run clickhouse-cluster-copier --config-file --task-path - * the process connects to ZooKeeper, reads tasks config from /task/path/description and executes them. - * - * - * = Format of task config = - - - - - - - false - - 127.0.0.1 - 9000 - - - ... - - - - ... - - - - - 2 - - - - 1 - - - - - 0 - - - - - 3 - - 1 - - - - - - - <-- Source cluster name (from section) and tables in it that should be copied --> - source_cluster - test - hits - - <-- Destination cluster name and tables in which the data should be inserted --> - destination_cluster - test - hits2 - - - ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/hits2/{shard}/hits2', '{replica}', EventDate, (CounterID, EventDate), 8192) - - - intHash32(UserID) - - - CounterID != 0 - - - - 201712 - 201801 - ... - - - - - ... - - ... - - - - - * = Implementation details = + * Implementation details: * * cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through * Distributed table (to preform data resharding). So, worker job is a partition of a source shard. @@ -144,7 +24,7 @@ * /server_fqdn#PID_timestamp - cluster-copier worker ID * ... * /tables - directory with table tasks - * /table_hits - directory of table_hits task + * /cluster.db.table - directory of table_hits task * /partition1 - directory for partition1 * /shards - directory for source cluster shards * /1 - worker job for the first shard of partition1 of table test.hits diff --git a/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml b/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml index 82cd16a6b6c08806813855c4e7c9e41f4b6503e8..fe2d4a715967125d0b252adff361cfe028403a95 100644 --- a/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml +++ b/dbms/tests/integration/test_cluster_copier/task_month_to_week_description.xml @@ -26,10 +26,14 @@ --> - ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}') PARTITION BY toMonday(date) ORDER BY d + ENGINE= + ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}') + PARTITION BY toMonday(date) + ORDER BY d + - d + 1 + jumpConsistentHash(intHash64(d), 2) diff --git a/dbms/tests/integration/test_cluster_copier/test.py b/dbms/tests/integration/test_cluster_copier/test.py index 8ef4e27b913dfed96fcde11fc4fc43f04d13c57e..54b1ff87c500171b801909e8ff9d55686f081cf2 100644 --- a/dbms/tests/integration/test_cluster_copier/test.py +++ b/dbms/tests/integration/test_cluster_copier/test.py @@ -129,8 +129,8 @@ class Task2: assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM cluster(cluster0, default, a)")) == TSV("85\n") assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count(), uniqExact(date) FROM cluster(cluster1, default, b)")) == TSV("85\t85\n") - assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM b")) == TSV("1\n") - assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM b")) == TSV("0\n") + assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT jumpConsistentHash(intHash64(d), 2) FROM b")) == TSV("0\n") + assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT jumpConsistentHash(intHash64(d), 2) FROM b")) == TSV("1\n") assert TSV(self.cluster.instances['s1_0_0'].query("SELECT uniqExact(partition) IN (12, 13) FROM system.parts WHERE active AND database='default' AND table='b'")) == TSV("1\n") assert TSV(self.cluster.instances['s1_1_0'].query("SELECT uniqExact(partition) IN (12, 13) FROM system.parts WHERE active AND database='default' AND table='b'")) == TSV("1\n") @@ -184,20 +184,21 @@ def execute_task(task, cmd_options): zk.delete(zk_task_path, recursive=True) +# Tests + def test_copy1_simple(started_cluster): execute_task(Task1(started_cluster), []) - def test_copy1_with_recovering(started_cluster): execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)]) - def test_copy_month_to_week_partition(started_cluster): execute_task(Task2(started_cluster), []) -def test_copy_month_to_week_partition(started_cluster): +def test_copy_month_to_week_partition_with_recovering(started_cluster): execute_task(Task2(started_cluster), ['--copy-fault-probability', str(0.1)]) + if __name__ == '__main__': with contextmanager(started_cluster)() as cluster: for name, instance in cluster.instances.items(): diff --git a/dbms/tests/performance/consistent_hashes/consistent_hashes.xml b/dbms/tests/performance/consistent_hashes/consistent_hashes.xml new file mode 100644 index 0000000000000000000000000000000000000000..33930add3b741892409e88fc9c0acdd6f92ef56a --- /dev/null +++ b/dbms/tests/performance/consistent_hashes/consistent_hashes.xml @@ -0,0 +1,37 @@ + + consistent_hashes + once + + + + 1000 + 5000 + + + + + + + + + + + hash_func + + yandexConsistentHash + jumpConsistentHash + + + + buckets + + 2 + 500 + 65535 + + + + + SELECT {hash_func}(number, {buckets}) FROM system.numbers LIMIT 1000000000 + SELECT sumburConsistentHash(toUInt32(number), {buckets}) FROM system.numbers LIMIT 10000 + diff --git a/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.reference b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.reference new file mode 100644 index 0000000000000000000000000000000000000000..64458288805658505fb890ba8575fc6bce541409 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.reference @@ -0,0 +1,6 @@ +0 43 520 0 361 237 +0 1 1 3 111 173 +358 +341 +111 +111 diff --git a/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.sql b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.sql new file mode 100644 index 0000000000000000000000000000000000000000..20d1892f192b1f54f804ee5fba0ea779cf84a7bf --- /dev/null +++ b/dbms/tests/queries/0_stateless/00580_consistent_hashing_functions.sql @@ -0,0 +1,4 @@ +SELECT jumpConsistentHash(1, 1), jumpConsistentHash(42, 57), jumpConsistentHash(256, 1024), jumpConsistentHash(3735883980, 1), jumpConsistentHash(3735883980, 666), jumpConsistentHash(16045690984833335023, 255); +SELECT yandexConsistentHash(16045690984833335023, 1), yandexConsistentHash(16045690984833335023, 2), yandexConsistentHash(16045690984833335023, 3), yandexConsistentHash(16045690984833335023, 4), yandexConsistentHash(16045690984833335023, 173), yandexConsistentHash(16045690984833335023, 255); +SELECT jumpConsistentHash(intHash64(number), 787) FROM system.numbers LIMIT 1000000, 2; +SELECT yandexConsistentHash(16045690984833335023+number-number, 120) FROM system.numbers LIMIT 1000000, 2; diff --git a/docs/en/utils/clickhouse-copier.md b/docs/en/utils/clickhouse-copier.md new file mode 100644 index 0000000000000000000000000000000000000000..25d22f192226da6c8140b6fc439dd7ab9b269f78 --- /dev/null +++ b/docs/en/utils/clickhouse-copier.md @@ -0,0 +1,156 @@ +# clickhouse-copier util + +The util copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed and fault-tolerant manner. + +Configuration of copying tasks is set in special ZooKeeper node (called the `/description` node). +A ZooKeeper path to the description node is specified via `--task-path ` parameter. +So, node `/task/path/description` should contain special XML content describing copying tasks. + +Simultaneously many `clickhouse-copier` processes located on any servers could execute the same task. +ZooKeeper node `/task/path/` is used by the processes to coordinate their work. +You must not add additional child nodes to `/task/path/`. + +Currently you are responsible for manual launching of all `cluster-copier` processes. +You can launch as many processes as you want, whenever and wherever you want. +Each process try to select the nearest available shard of source cluster and copy some part of data (partition) from it to the whole +destination cluster (with resharding). +Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage. + +Since the workers coordinate their work via ZooKeeper, in addition to `--task-path ` you have to specify ZooKeeper +cluster configuration via `--config-file ` parameter. Example of `zookeeper.xml`: + +```xml + + + + 127.0.0.1 + 2181 + + + +``` + +When you run `clickhouse-copier --config-file --task-path ` the process connects to ZooKeeper cluster, reads tasks config from `/task/path/description` and executes them. + +## Format of task config + +Here is an example of `/task/path/description` content: + +```xml + + + + + + false + + 127.0.0.1 + 9000 + + + ... + + + + ... + + + + + 2 + + + + 1 + + + + + 0 + + + + + 3 + + 1 + + + + + + + + source_cluster + test + hits + + + destination_cluster + test + hits2 + + + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}') + PARTITION BY toMonday(date) + ORDER BY (CounterID, EventDate) + + + + jumpConsistentHash(intHash64(UserID), 2) + + + CounterID != 0 + + + + '2018-02-26' + '2018-03-05' + ... + + + + + + ... + + ... + + +``` + +cluster-copier processes watch for `/task/path/description` node update. +So, if you modify the config settings or `max_workers` params, they will be updated. + +## Example + +```bash +clickhouse-copier copier --daemon --config /path/to/copier/zookeeper.xml --task-path /clickhouse-copier/cluster1_tables_hits --base-dir /path/to/copier_logs +``` + +`--base-dir /path/to/copier_logs` specifies where auxilary and log files of the copier process will be saved. +In this case it will create `/path/to/copier_logs/clickhouse-copier_YYYYMMHHSS_/` dir with log and status-files. +If it is not specified it will use current dir (`/clickhouse-copier_YYYYMMHHSS_/` if it is run as a `--daemon`). diff --git a/docs/en/utils/clickhouse-local.md b/docs/en/utils/clickhouse-local.md new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/docs/en/utils/index.md b/docs/en/utils/index.md new file mode 100644 index 0000000000000000000000000000000000000000..7a8c5ee51386e57e725dca54f758ba26b80810c2 --- /dev/null +++ b/docs/en/utils/index.md @@ -0,0 +1,6 @@ +# ClickHouse utilites + +There are several ClickHouse utilites that are separate executable files: + +* `clickhouse-local` allows to execute SQL queries on a local data like `awk` +* `clickhouse-copier` copies (and reshards) immutable data from one cluster to another in a fault-tolerant manner. diff --git a/docs/mkdocs_en.yml b/docs/mkdocs_en.yml index 5966c952a030f239fd629228d732616452ea2c85..c108052fdd271f4444687416d2c8926e558f9d26 100644 --- a/docs/mkdocs_en.yml +++ b/docs/mkdocs_en.yml @@ -233,6 +233,11 @@ pages: - 'Settings': 'operations/settings/settings.md' - 'Settings profiles': 'operations/settings/settings_profiles.md' +- 'Utilites': + - 'Utilites': 'utils/index.md' + - 'clickhouse-copier': 'utils/clickhouse-copier.md' + #- 'clickhouse-local' : 'utils/clickhouse-local.md' + - 'ClickHouse Development': # - 'ClickHouse Development': 'development/index.md' - 'Overview of ClickHouse architecture': 'development/architecture.md' diff --git a/docs/mkdocs_ru.yml b/docs/mkdocs_ru.yml index ad26a510ad98df7744577d0a9afac4279aa7a028..05d7e9d8eb80f9d2ba004d7cdedf12e00839f24d 100644 --- a/docs/mkdocs_ru.yml +++ b/docs/mkdocs_ru.yml @@ -238,6 +238,11 @@ pages: - 'Настройки': 'operations/settings/settings.md' - 'Профили настроек': 'operations/settings/settings_profiles.md' +- 'Утилиты': + - 'Утилиты': 'utils/index.md' + - 'clickhouse-copier': 'utils/clickhouse-copier.md' + #- 'clickhouse-local' : 'utils/clickhouse-local.md' + - 'ClickHouse Development': # - 'ClickHouse Development': 'development/index.md' - 'Overview of ClickHouse architecture': 'development/architecture.md' diff --git a/docs/ru/utils/clickhouse-copier.md b/docs/ru/utils/clickhouse-copier.md new file mode 100644 index 0000000000000000000000000000000000000000..25d22f192226da6c8140b6fc439dd7ab9b269f78 --- /dev/null +++ b/docs/ru/utils/clickhouse-copier.md @@ -0,0 +1,156 @@ +# clickhouse-copier util + +The util copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed and fault-tolerant manner. + +Configuration of copying tasks is set in special ZooKeeper node (called the `/description` node). +A ZooKeeper path to the description node is specified via `--task-path ` parameter. +So, node `/task/path/description` should contain special XML content describing copying tasks. + +Simultaneously many `clickhouse-copier` processes located on any servers could execute the same task. +ZooKeeper node `/task/path/` is used by the processes to coordinate their work. +You must not add additional child nodes to `/task/path/`. + +Currently you are responsible for manual launching of all `cluster-copier` processes. +You can launch as many processes as you want, whenever and wherever you want. +Each process try to select the nearest available shard of source cluster and copy some part of data (partition) from it to the whole +destination cluster (with resharding). +Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage. + +Since the workers coordinate their work via ZooKeeper, in addition to `--task-path ` you have to specify ZooKeeper +cluster configuration via `--config-file ` parameter. Example of `zookeeper.xml`: + +```xml + + + + 127.0.0.1 + 2181 + + + +``` + +When you run `clickhouse-copier --config-file --task-path ` the process connects to ZooKeeper cluster, reads tasks config from `/task/path/description` and executes them. + +## Format of task config + +Here is an example of `/task/path/description` content: + +```xml + + + + + + false + + 127.0.0.1 + 9000 + + + ... + + + + ... + + + + + 2 + + + + 1 + + + + + 0 + + + + + 3 + + 1 + + + + + + + + source_cluster + test + hits + + + destination_cluster + test + hits2 + + + ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}') + PARTITION BY toMonday(date) + ORDER BY (CounterID, EventDate) + + + + jumpConsistentHash(intHash64(UserID), 2) + + + CounterID != 0 + + + + '2018-02-26' + '2018-03-05' + ... + + + + + + ... + + ... + + +``` + +cluster-copier processes watch for `/task/path/description` node update. +So, if you modify the config settings or `max_workers` params, they will be updated. + +## Example + +```bash +clickhouse-copier copier --daemon --config /path/to/copier/zookeeper.xml --task-path /clickhouse-copier/cluster1_tables_hits --base-dir /path/to/copier_logs +``` + +`--base-dir /path/to/copier_logs` specifies where auxilary and log files of the copier process will be saved. +In this case it will create `/path/to/copier_logs/clickhouse-copier_YYYYMMHHSS_/` dir with log and status-files. +If it is not specified it will use current dir (`/clickhouse-copier_YYYYMMHHSS_/` if it is run as a `--daemon`). diff --git a/docs/ru/utils/clickhouse-local.md b/docs/ru/utils/clickhouse-local.md new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/docs/ru/utils/index.md b/docs/ru/utils/index.md new file mode 100644 index 0000000000000000000000000000000000000000..760fc0100c35f0c99a344ff6d8652821a3c51466 --- /dev/null +++ b/docs/ru/utils/index.md @@ -0,0 +1,6 @@ +# Утилиты ClickHouse + +Существует несколько утилит ClickHouse, которые представляют из себя отдельные исполняемые файлы: + +* `clickhouse-local` позволяет выполнять SQL-запросы над данными подобно тому, как это делает `awk` +* `clickhouse-copier` копирует (и перешардирует) неизменяемые данные с одного кластера на другой отказоустойчивым способом. diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 970d2be15b4e7357e6c6e5263fd83e89a2bbf290..dbd960e16f1061c824153936f66af088e639c0a6 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -18,3 +18,5 @@ endif () if (USE_MYSQL) add_subdirectory (libmysqlxx) endif () + +add_subdirectory (libconsistent-hashing) diff --git a/libs/libconsistent-hashing/CMakeLists.txt b/libs/libconsistent-hashing/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..ad520abace0b34c87d5e6c29f0533a6516e3c48c --- /dev/null +++ b/libs/libconsistent-hashing/CMakeLists.txt @@ -0,0 +1,5 @@ +cmake_minimum_required(VERSION 2.8) +project(libconsistent-hashing CXX) + +add_library(libconsistent-hashing yandex/consistent_hashing.cpp yandex/popcount.cpp mailru/sumbur.cpp) +target_include_directories(libconsistent-hashing PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) \ No newline at end of file diff --git a/libs/libconsistent-hashing/mailru/sumbur.cpp b/libs/libconsistent-hashing/mailru/sumbur.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3b905f0adc7c47d91452b579a2345c45f98639cc --- /dev/null +++ b/libs/libconsistent-hashing/mailru/sumbur.cpp @@ -0,0 +1,113 @@ +//Copyright (c) 2011-2012 Mail.RU +//Copyright (c) 2011-2012 Maksim Kalinchenko +//Copyright (c) 2012 Sokolov Yura aka funny-falcon +// +//MIT License +// +//Permission is hereby granted, free of charge, to any person obtaining +//a copy of this software and associated documentation files (the +//"Software"), to deal in the Software without restriction, including +//without limitation the rights to use, copy, modify, merge, publish, +//distribute, sublicense, and/or sell copies of the Software, and to +//permit persons to whom the Software is furnished to do so, subject to +//the following conditions: +// +//The above copyright notice and this permission notice shall be +//included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +//MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +//LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +//OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +//WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +#include + + +#define L 0xFFFFFFFF + +static unsigned int L27_38[] = {L / 27, L / 28, L / 29, L / 30, L / 31, L / 32, + L / 33, L / 34, L / 35, L / 36, L / 37, L / 38, + L / 39, L / 40, L / 41, L / 42, L / 43, L / 44, + L / 45, L / 46, L / 47, L / 48, L / 49, L / 50, + L / 51, L / 52, L / 53, L / 54, L / 55, L / 56, + L / 57, L / 58, L / 59, L / 60, L / 61, L / 62 + }; +static unsigned int LL27_38[] = {L/(26*27), L/(27*28), L/(28*29), L/(29*30), L/(30*31), L/(31*32), + L/(32*33), L/(33*34), L/(34*35), L/(35*36), L/(36*37), L/(37*38), + L/(38*39), L/(39*40), L/(40*41), L/(41*42), L/(42*43), L/(43*44), + L/(44*45), L/(45*46), L/(46*47), L/(47*48), L/(48*49), L/(49*50), + L/(50*51), L/(51*52), L/(52*53), L/(53*54), L/(54*55), L/(55*56), + L/(56*57), L/(57*58), L/(58*59), L/(59*60), L/(60*61), L/(61*62) + }; + +unsigned int sumburConsistentHash(unsigned int hashed_int, unsigned int capacity) +{ + unsigned int h = hashed_int; + unsigned int capa = capacity; + unsigned int part, n, i, c; + + if (capa == 0) + throw std::runtime_error("Sumbur is not applicable to empty cluster"); + + part = L / capa; + + if (L - h < part) return 0; + + n = 1; + + do { + if (h >= L / 2) h -= L / 2; + else { + n = 2; + if (L / 2 - h < part) return 1; + } + if (capa == 2) return 1; + +#define curslice(i) (L / (i * (i - 1))) +#define unroll(i) \ + if (curslice(i) <= h) h -= curslice(i); \ + else { \ + h += curslice(i) * (i - n - 1); \ + n = i; \ + if (L / i - h < part) return n-1; \ + } \ + if (capa == i) return (n-1) + + unroll(3); unroll(4); unroll(5); + unroll(6); unroll(7); unroll(8); + unroll(9); unroll(10); unroll(11); + unroll(12); unroll(13); unroll(14); + unroll(15); unroll(16); unroll(17); + unroll(18); unroll(19); unroll(20); + unroll(21); unroll(22); unroll(23); + unroll(24); unroll(25); unroll(26); + + for (i = 27; i <= capa && i <= 62; i++) { + c = LL27_38[i-27]; + if (c <= h) { + h -= c; + } + else { + h += c * (i - n - 1); + n = i; + if (L27_38[i-27] - h < part) return n-1; + } + } + + for(i = 63; i <= capa; i++) { + c = L / (i * (i - 1)); + if (c <= h) { + h -= c; + } + else { + h += c * (i - n - 1); + n = i; + if (L / i - h < part) return n - 1; + } + } + } while(0); + return n - 1; +} diff --git a/libs/libconsistent-hashing/mailru/sumbur.h b/libs/libconsistent-hashing/mailru/sumbur.h new file mode 100644 index 0000000000000000000000000000000000000000..1632665a0733c9571509e69bdc48f34267bc350d --- /dev/null +++ b/libs/libconsistent-hashing/mailru/sumbur.h @@ -0,0 +1,28 @@ +//Copyright (c) 2011-2012 Mail.RU +//Copyright (c) 2011-2012 Maksim Kalinchenko +//Copyright (c) 2012 Sokolov Yura aka funny-falcon +// +//MIT License +// +//Permission is hereby granted, free of charge, to any person obtaining +//a copy of this software and associated documentation files (the +//"Software"), to deal in the Software without restriction, including +//without limitation the rights to use, copy, modify, merge, publish, +//distribute, sublicense, and/or sell copies of the Software, and to +//permit persons to whom the Software is furnished to do so, subject to +//the following conditions: +// +//The above copyright notice and this permission notice shall be +//included in all copies or substantial portions of the Software. +// +//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +//MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +//LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +//OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +//WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +/// Source code: https://github.com/mailru/sumbur-ruby/blob/master/ext/sumbur/sumbur.c + +unsigned int sumburConsistentHash(unsigned int hashed_int, unsigned int capacity); diff --git a/libs/libconsistent-hashing/yandex/bitops.h b/libs/libconsistent-hashing/yandex/bitops.h new file mode 100644 index 0000000000000000000000000000000000000000..697063ee77e86ec1d43aa057102855cb603254bd --- /dev/null +++ b/libs/libconsistent-hashing/yandex/bitops.h @@ -0,0 +1,52 @@ +#pragma once +#include +#include +#include + + +inline uint16_t LO_16(uint32_t x) { return static_cast(x & 0x0000FFFF); } +inline uint16_t HI_16(uint32_t x) { return static_cast(x >> 16); } + +inline uint32_t LO_32(uint64_t x) { return static_cast(x & 0x00000000FFFFFFFF); } +inline uint32_t HI_32(uint64_t x) { return static_cast(x >> 32); } + + +/// Clang also defines __GNUC__ +#if defined(__GNUC__) + inline unsigned GetValueBitCountImpl(unsigned int value) noexcept { + // NOTE: __builtin_clz* have undefined result for zero. + return std::numeric_limits::digits - __builtin_clz(value); + } + + inline unsigned GetValueBitCountImpl(unsigned long value) noexcept { + return std::numeric_limits::digits - __builtin_clzl(value); + } + + inline unsigned GetValueBitCountImpl(unsigned long long value) noexcept { + return std::numeric_limits::digits - __builtin_clzll(value); + } +#else + /// Stupid realization for non GCC-like compilers. Can use BSR from x86 instructions set. + template + inline unsigned GetValueBitCountImpl(T value) noexcept { + unsigned result = 1; // result == 0 - impossible value, since value cannot be zero + value >>= 1; + while (value) { + value >>= 1; + ++result; + } + + return result; + } +#endif + + +/** + * Returns the number of leading 0-bits in `value`, starting at the most significant bit position. + * NOTE: value cannot be zero + */ +template +static inline unsigned GetValueBitCount(T value) noexcept { + using TCvt = std::make_unsigned_t>; + return GetValueBitCountImpl(static_cast(value)); +} diff --git a/libs/libconsistent-hashing/yandex/consistent_hashing.cpp b/libs/libconsistent-hashing/yandex/consistent_hashing.cpp new file mode 100644 index 0000000000000000000000000000000000000000..347456eede32cd9d7280434197a936a4ee4fb26c --- /dev/null +++ b/libs/libconsistent-hashing/yandex/consistent_hashing.cpp @@ -0,0 +1,125 @@ +#include "consistent_hashing.h" + +#include "bitops.h" + +#include "popcount.h" + +#include + +/* + * (all numbers are written in big-endian manner: the least significant digit on the right) + * (only bit representations are used - no hex or octal, leading zeroes are ommited) + * + * Consistent hashing scheme: + * + * (sizeof(TValue) * 8, y] (y, 0] + * a = * ablock + * b = * cblock + * + * (sizeof(TValue) * 8, k] (k, 0] + * c = * cblock + * + * d = * + * + * k - is determined by 2^(k-1) < n <= 2^k inequality + * z - is number of ones in cblock + * y - number of digits after first one in cblock + * + * The cblock determines logic of using a- and b- blocks: + * + * bits of cblock | result of a function + * 0 : 0 + * 1 : 1 (optimization, the next case includes this one) + * 1?..? : 1ablock (z is even) or 1bblock (z is odd) if possible (=n), than smooth moving from n=2^(k-1) to n=2^k is applied. + * Using "*" bits of a-,b-,c-,d- blocks uint64_t value is combined, modulo of which determines + * if the value should be greather than 2^(k-1) or ConsistentHashing(x, 2^(k-1)) should be used. + * The last case is optimized according to previous checks. + */ + +namespace { + +template +TValue PowerOf2(size_t k) { + return (TValue)0x1 << k; +} + +template +TValue SelectAOrBBlock(TValue a, TValue b, TValue cBlock) { + size_t z = PopCount(cBlock); + bool useABlock = z % 2 == 0; + return useABlock ? a : b; +} + +// Gets the exact result for n = k2 = 2 ^ k +template +size_t ConsistentHashingForPowersOf2(TValue a, TValue b, TValue c, TValue k2) { + TValue cBlock = c & (k2 - 1); // (k, 0] bits of c + // Zero and one cases + if (cBlock < 2) { + // First two cases of result function table: 0 if cblock is 0, 1 if cblock is 1. + return cBlock; + } + size_t y = GetValueBitCount(cBlock) - 1; // cblock = 0..01?..? (y = number of digits after 1), y > 0 + TValue y2 = PowerOf2(y); // y2 = 2^y + TValue abBlock = SelectAOrBBlock(a, b, cBlock) & (y2 - 1); + return y2 + abBlock; +} + +template +uint64_t GetAsteriskBits(TValue a, TValue b, TValue c, TValue d, size_t k) { + size_t shift = sizeof(TValue) * 8 - k; + uint64_t res = (d << shift) | (c >> k); + ++shift; + res <<= shift; + res |= b >> (k - 1); + res <<= shift; + res |= a >> (k - 1); + + return res; +} + +template +size_t ConsistentHashingImpl(TValue a, TValue b, TValue c, TValue d, size_t n) { + if (n <= 0) + throw std::runtime_error("Can't map consistently to a zero values."); + + // Uninteresting case + if (n == 1) { + return 0; + } + size_t k = GetValueBitCount(n - 1); // 2^(k-1) < n <= 2^k, k >= 1 + TValue k2 = PowerOf2(k); // k2 = 2^k + size_t largeValue; + { + // Bit determined variant. Large scheme. + largeValue = ConsistentHashingForPowersOf2(a, b, c, k2); + if (largeValue < n) { + return largeValue; + } + } + // Since largeValue is not assigned yet + // Smooth moving from one bit scheme to another + TValue k21 = PowerOf2(k - 1); + { + size_t s = GetAsteriskBits(a, b, c, d, k) % (largeValue * (largeValue + 1)); + size_t largeValue2 = s / k2 + k21; + if (largeValue2 < n) { + return largeValue2; + } + } + // Bit determined variant. Short scheme. + return ConsistentHashingForPowersOf2(a, b, c, k21); // Do not apply checks. It is always less than k21 = 2^(k-1) +} + +} // namespace // anonymous + +std::size_t ConsistentHashing(std::uint64_t x, std::size_t n) { + uint32_t lo = LO_32(x); + uint32_t hi = HI_32(x); + return ConsistentHashingImpl(LO_16(lo), HI_16(lo), LO_16(hi), HI_16(hi), n); +} +std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n) { + return ConsistentHashingImpl(LO_32(lo), HI_32(lo), LO_32(hi), HI_32(hi), n); +} diff --git a/libs/libconsistent-hashing/yandex/consistent_hashing.h b/libs/libconsistent-hashing/yandex/consistent_hashing.h new file mode 100644 index 0000000000000000000000000000000000000000..fba229c2bd40aa77eab2410696128525e499c3fa --- /dev/null +++ b/libs/libconsistent-hashing/yandex/consistent_hashing.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + +/* + * Author: Konstantin Oblakov + * + * Maps random ui64 x (in fact hash of some string) to n baskets/shards. + * Output value is id of a basket. 0 <= ConsistentHashing(x, n) < n. + * Probability of all baskets must be equal. Also, it should be consistent + * in terms, that with different n_1 < n_2 probability of + * ConsistentHashing(x, n_1) != ConsistentHashing(x, n_2) must be equal to + * (n_2 - n_1) / n_2 - the least possible with previous conditions. + * It requires O(1) memory and cpu to calculate. So, it is faster than classic + * consistent hashing algos with points on circle. + */ +std::size_t ConsistentHashing(std::uint64_t x, std::size_t n); // Works good for n < 65536 +std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n); // Works good for n < 4294967296 diff --git a/libs/libconsistent-hashing/yandex/popcount.cpp b/libs/libconsistent-hashing/yandex/popcount.cpp new file mode 100644 index 0000000000000000000000000000000000000000..66edfe65829cff361a35ff9a3c202cfb57a3edf4 --- /dev/null +++ b/libs/libconsistent-hashing/yandex/popcount.cpp @@ -0,0 +1,25 @@ +#include "popcount.h" + +static const uint8_t PopCountLUT8Impl[1 << 8] = { +#define B2(n) n, n + 1, n + 1, n + 2 +#define B4(n) B2(n), B2(n + 1), B2(n + 1), B2(n + 2) +#define B6(n) B4(n), B4(n + 1), B4(n + 1), B4(n + 2) + B6(0), B6(1), B6(1), B6(2)}; + +uint8_t const* PopCountLUT8 = PopCountLUT8Impl; + +#if !defined(_MSC_VER) +//ICE here for msvc + +static const uint8_t PopCountLUT16Impl[1 << 16] = { +#define B2(n) n, n + 1, n + 1, n + 2 +#define B4(n) B2(n), B2(n + 1), B2(n + 1), B2(n + 2) +#define B6(n) B4(n), B4(n + 1), B4(n + 1), B4(n + 2) +#define B8(n) B6(n), B6(n + 1), B6(n + 1), B6(n + 2) +#define B10(n) B8(n), B8(n + 1), B8(n + 1), B8(n + 2) +#define B12(n) B10(n), B10(n + 1), B10(n + 1), B10(n + 2) +#define B14(n) B12(n), B12(n + 1), B12(n + 1), B12(n + 2) + B14(0), B14(1), B14(1), B14(2)}; + +uint8_t const* PopCountLUT16 = PopCountLUT16Impl; +#endif diff --git a/libs/libconsistent-hashing/yandex/popcount.h b/libs/libconsistent-hashing/yandex/popcount.h new file mode 100644 index 0000000000000000000000000000000000000000..fdb56173e44d6ad3c3aaeab99d10ea1b8e549406 --- /dev/null +++ b/libs/libconsistent-hashing/yandex/popcount.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include +using std::size_t; + +#include "bitops.h" + +#if defined(_MSC_VER) +#include +#endif + + +static inline uint32_t PopCountImpl(uint8_t n) { + extern uint8_t const* PopCountLUT8; + return PopCountLUT8[n]; +} + +static inline uint32_t PopCountImpl(uint16_t n) { +#if defined(_MSC_VER) + return __popcnt16(n); +#else + extern uint8_t const* PopCountLUT16; + return PopCountLUT16[n]; +#endif +} + +static inline uint32_t PopCountImpl(uint32_t n) { +#if defined(_MSC_VER) + return __popcnt(n); +#elif defined(__GNUC__) // it is true for Clang also + return __builtin_popcount(n); +#else + return PopCountImpl((uint16_t)LO_16(n)) + PopCountImpl((uint16_t)HI_16(n)); +#endif +} + +static inline uint32_t PopCountImpl(uint64_t n) { +#if defined(_MSC_VER) && !defined(_i386_) + return __popcnt64(n); +#elif defined(__GNUC__) // it is true for Clang also + return __builtin_popcountll(n); +#else + return PopCountImpl((uint32_t)LO_32(n)) + PopCountImpl((uint32_t)HI_32(n)); +#endif +} + +template +static inline uint32_t PopCount(T n) { + using TCvt = std::make_unsigned_t>; + + return PopCountImpl(static_cast(n)); +}