未验证 提交 924cc345 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #1955 from yandex/CLICKHOUSE-3606

Added consistent hashing functions
......@@ -78,7 +78,7 @@ list(REMOVE_ITEM clickhouse_functions_sources IFunction.cpp FunctionFactory.cpp
list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h FunctionHelpers.h)
add_library(clickhouse_functions ${clickhouse_functions_sources})
target_link_libraries(clickhouse_functions dbms)
target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libfarmhash)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/libmetrohash/src)
target_include_directories (clickhouse_functions BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})
......
#include "FunctionsConsistentHashing.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
void registerFunctionsConsistentHashing(FunctionFactory & factory)
{
factory.registerFunction<FunctionYandexConsistentHash>();
factory.registerFunction<FunctionJumpConsistentHash>();
factory.registerFunction<FunctionSumburConsistentHash>();
}
}
#pragma once
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <common/likely.h>
#include <yandex/consistent_hashing.h>
#include <mailru/sumbur.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
}
/// An O(1) time and space consistent hash algorithm by Konstantin Oblakov
struct YandexConsistentHashImpl
{
static constexpr auto name = "yandexConsistentHash";
using HashType = UInt64;
/// Actually it supports UInt64, but it is effective only if n < 65536
using ResultType = UInt32;
using BucketsCountType = ResultType;
static inline ResultType apply(UInt64 hash, BucketsCountType n)
{
return ConsistentHashing(hash, n);
}
};
/// Code from https://arxiv.org/pdf/1406.2294.pdf
static inline int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) {
int64_t b = -1, j = 0;
while (j < num_buckets) {
b = j;
key = key * 2862933555777941757ULL + 1;
j = static_cast<int64_t>((b + 1) * (double(1LL << 31) / double((key >> 33) + 1)));
}
return static_cast<int32_t>(b);
}
struct JumpConsistentHashImpl
{
static constexpr auto name = "jumpConsistentHash";
using HashType = UInt64;
using ResultType = Int32;
using BucketsCountType = ResultType;
static inline ResultType apply(UInt64 hash, BucketsCountType n)
{
return JumpConsistentHash(hash, n);
}
};
struct SumburConsistentHashImpl
{
static constexpr auto name = "sumburConsistentHash";
using HashType = UInt32;
using ResultType = UInt16;
using BucketsCountType = ResultType;
static inline ResultType apply(HashType hash, BucketsCountType n)
{
return static_cast<ResultType>(sumburConsistentHash(hash, n));
}
};
template <typename Impl>
class FunctionConsistentHashImpl : public IFunction
{
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionConsistentHashImpl<Impl>>(); };
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 2; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (!arguments[0]->isInteger())
throw Exception("Illegal type " + arguments[0]->getName() + " of the first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (arguments[0]->getSizeOfValueInMemory() > sizeof(HashType))
throw Exception("Function " + getName() + " accepts " + std::to_string(sizeof(HashType) * 8) + "-bit integers at most"
+ ", got " + arguments[0]->getName(), ErrorCodes::BAD_ARGUMENTS);
if (!arguments[1]->isInteger())
throw Exception("Illegal type " + arguments[1]->getName() + " of the second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeNumber<ResultType>>();
}
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result) override
{
if (block.getByPosition(arguments[1]).column->isColumnConst())
executeConstBuckets(block, arguments, result);
else
throw Exception("The second argument of function " + getName() + " (number of buckets) must be constant", ErrorCodes::BAD_ARGUMENTS);
}
private:
using HashType = typename Impl::HashType;
using ResultType = typename Impl::ResultType;
using BucketsType = typename Impl::BucketsCountType;
static constexpr auto max_buckets = static_cast<UInt64>(std::numeric_limits<BucketsType>::max());
template <typename T>
inline BucketsType checkBucketsRange(T buckets)
{
if (unlikely(buckets <= 0))
throw Exception("The second argument of function " + getName() + " (number of buckets) must be positive number",
ErrorCodes::BAD_ARGUMENTS);
if (unlikely(static_cast<UInt64>(buckets) > max_buckets))
throw Exception("The value of the second argument of function " + getName() + " (number of buckets) is not fit to " +
DataTypeNumber<BucketsType>().getName(), ErrorCodes::BAD_ARGUMENTS);
return static_cast<BucketsType>(buckets);
}
void executeConstBuckets(Block & block, const ColumnNumbers & arguments, size_t result)
{
Field buckets_field = (*block.getByPosition(arguments[1]).column)[0];
BucketsType num_buckets;
if (buckets_field.getType() == Field::Types::Int64)
num_buckets = checkBucketsRange(buckets_field.get<Int64>());
else if (buckets_field.getType() == Field::Types::UInt64)
num_buckets = checkBucketsRange(buckets_field.get<UInt64>());
else
throw Exception("Illegal type " + String(buckets_field.getTypeName()) + " of the second argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto & hash_col = block.getByPosition(arguments[0]).column;
const IDataType * hash_type = block.getByPosition(arguments[0]).type.get();
auto res_col = ColumnVector<ResultType>::create();
if (checkDataType<DataTypeUInt8>(hash_type)) executeType<UInt8>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeUInt16>(hash_type)) executeType<UInt16>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeUInt32>(hash_type)) executeType<UInt32>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeUInt64>(hash_type)) executeType<UInt64>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeInt8>(hash_type)) executeType<Int8>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeInt16>(hash_type)) executeType<Int16>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeInt32>(hash_type)) executeType<Int32>(hash_col, num_buckets, res_col.get());
else if (checkDataType<DataTypeInt64>(hash_type)) executeType<Int64>(hash_col, num_buckets, res_col.get());
else
throw Exception("Illegal type " + hash_type->getName() + " of the first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
block.getByPosition(result).column = std::move(res_col);
}
template <typename CurrentHashType>
void executeType(const ColumnPtr & col_hash_ptr, BucketsType num_buckets, ColumnVector<ResultType> * col_result)
{
auto col_hash = checkAndGetColumn<ColumnVector<CurrentHashType>>(col_hash_ptr.get());
if (!col_hash)
throw Exception("Illegal type of the first argument of function " + getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto & vec_result = col_result->getData();
const auto & vec_hash = col_hash->getData();
size_t size = vec_hash.size();
vec_result.resize(size);
for (size_t i = 0; i < size; ++i)
vec_result[i] = Impl::apply(static_cast<HashType>(vec_hash[i]), num_buckets);
}
};
using FunctionYandexConsistentHash = FunctionConsistentHashImpl<YandexConsistentHashImpl>;
using FunctionJumpConsistentHash = FunctionConsistentHashImpl<JumpConsistentHashImpl>;
using FunctionSumburConsistentHash = FunctionConsistentHashImpl<SumburConsistentHashImpl>;
}
......@@ -24,6 +24,7 @@ void registerFunctionsExternalDictionaries(FunctionFactory &);
void registerFunctionsExternalModels(FunctionFactory &);
void registerFunctionsFormatting(FunctionFactory &);
void registerFunctionsHashing(FunctionFactory &);
void registerFunctionsConsistentHashing(FunctionFactory &);
void registerFunctionsHigherOrder(FunctionFactory &);
void registerFunctionsLogical(FunctionFactory &);
void registerFunctionsMiscellaneous(FunctionFactory &);
......@@ -60,6 +61,7 @@ void registerFunctions()
registerFunctionsExternalModels(factory);
registerFunctionsFormatting(factory);
registerFunctionsHashing(factory);
registerFunctionsConsistentHashing(factory);
registerFunctionsHigherOrder(factory);
registerFunctionsLogical(factory);
registerFunctionsMiscellaneous(factory);
......
#pragma once
#include <Poco/Util/ServerApplication.h>
/* = clickhouse-cluster-copier util =
/* clickhouse cluster copier util
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* Configuration of copying tasks is set in special ZooKeeper node (called the description node).
* A ZooKeeper path to the description node is specified via --task-path </task/path> parameter.
* So, node /task/path/description should contain special XML content describing copying tasks.
* See overview in the docs: docs/en/utils/clickhouse-copier.md
*
* Simultaneously many clickhouse-cluster-copier processes located on any servers could execute the same task.
* ZooKeeper node /task/path/ is used by the processes to coordinate their work.
* You must not add additional child nodes to /task/path/.
*
* Currently you are responsible for launching cluster-copier processes.
* You can launch as many processes as you want, whenever and wherever you want.
* Each process try to select nearest available shard of source cluster and copy some part of data (partition) from it to the whole
* destination cluster with resharding.
* Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
*
* Since the workers coordinate their work via ZooKeeper, in addition to --task-path </task/path> you have to specify ZooKeeper
* configuration via --config-file <zookeeper.xml> parameter. Example of zookeeper.xml:
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
* When you run clickhouse-cluster-copier --config-file <zookeeper.xml> --task-path </task/path>
* the process connects to ZooKeeper, reads tasks config from /task/path/description and executes them.
*
*
* = Format of task config =
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- Name of the table task, it must be an unique name suitable for ZooKeeper node name -->
<table_hits>
<-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/hits2/{shard}/hits2', '{replica}', EventDate, (CounterID, EventDate), 8192)</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>intHash32(UserID)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- Optional section, it specifies partitions that should be copied, other partition will be ignored -->
<enabled_partitions>
<partition>201712</partition>
<partition>201801</partition>
...
</enabled_partitions>
</table_hits>
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
* = Implementation details =
* Implementation details:
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
......@@ -144,7 +24,7 @@
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /table_hits - directory of table_hits task
* /cluster.db.table - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
......
......@@ -26,10 +26,14 @@
-->
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}') PARTITION BY toMonday(date) ORDER BY d</engine>
<engine>ENGINE=
ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}')
PARTITION BY toMonday(date)
ORDER BY d
</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<sharding_key>jumpConsistentHash(intHash64(d), 2)</sharding_key>
<!-- Optional expression that filter copying data -->
<!-- <where_condition></where_condition> -->
......
......@@ -129,8 +129,8 @@ class Task2:
assert TSV(self.cluster.instances['s0_0_0'].query("SELECT count() FROM cluster(cluster0, default, a)")) == TSV("85\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT count(), uniqExact(date) FROM cluster(cluster1, default, b)")) == TSV("85\t85\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT d % 2 FROM b")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT d % 2 FROM b")) == TSV("0\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT DISTINCT jumpConsistentHash(intHash64(d), 2) FROM b")) == TSV("0\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT DISTINCT jumpConsistentHash(intHash64(d), 2) FROM b")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_0_0'].query("SELECT uniqExact(partition) IN (12, 13) FROM system.parts WHERE active AND database='default' AND table='b'")) == TSV("1\n")
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT uniqExact(partition) IN (12, 13) FROM system.parts WHERE active AND database='default' AND table='b'")) == TSV("1\n")
......@@ -184,20 +184,21 @@ def execute_task(task, cmd_options):
zk.delete(zk_task_path, recursive=True)
# Tests
def test_copy1_simple(started_cluster):
execute_task(Task1(started_cluster), [])
def test_copy1_with_recovering(started_cluster):
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
def test_copy_month_to_week_partition(started_cluster):
execute_task(Task2(started_cluster), [])
def test_copy_month_to_week_partition(started_cluster):
def test_copy_month_to_week_partition_with_recovering(started_cluster):
execute_task(Task2(started_cluster), ['--copy-fault-probability', str(0.1)])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
......
<test>
<name>consistent_hashes</name>
<type>once</type>
<stop_conditions>
<any_of>
<min_time_not_changing_for_ms>1000</min_time_not_changing_for_ms>
<total_time_ms>5000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<max_rows_per_second />
<avg_rows_per_second />
</main_metric>
<substitutions>
<substitution>
<name>hash_func</name>
<values>
<value>yandexConsistentHash</value>
<value>jumpConsistentHash</value>
</values>
</substitution>
<substitution>
<name>buckets</name>
<values>
<value>2</value>
<value>500</value>
<value>65535</value>
</values>
</substitution>
</substitutions>
<query>SELECT {hash_func}(number, {buckets}) FROM system.numbers LIMIT 1000000000</query>
<query>SELECT sumburConsistentHash(toUInt32(number), {buckets}) FROM system.numbers LIMIT 10000</query>
</test>
SELECT jumpConsistentHash(1, 1), jumpConsistentHash(42, 57), jumpConsistentHash(256, 1024), jumpConsistentHash(3735883980, 1), jumpConsistentHash(3735883980, 666), jumpConsistentHash(16045690984833335023, 255);
SELECT yandexConsistentHash(16045690984833335023, 1), yandexConsistentHash(16045690984833335023, 2), yandexConsistentHash(16045690984833335023, 3), yandexConsistentHash(16045690984833335023, 4), yandexConsistentHash(16045690984833335023, 173), yandexConsistentHash(16045690984833335023, 255);
SELECT jumpConsistentHash(intHash64(number), 787) FROM system.numbers LIMIT 1000000, 2;
SELECT yandexConsistentHash(16045690984833335023+number-number, 120) FROM system.numbers LIMIT 1000000, 2;
# clickhouse-copier util
The util copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed and fault-tolerant manner.
Configuration of copying tasks is set in special ZooKeeper node (called the `/description` node).
A ZooKeeper path to the description node is specified via `--task-path </task/path>` parameter.
So, node `/task/path/description` should contain special XML content describing copying tasks.
Simultaneously many `clickhouse-copier` processes located on any servers could execute the same task.
ZooKeeper node `/task/path/` is used by the processes to coordinate their work.
You must not add additional child nodes to `/task/path/`.
Currently you are responsible for manual launching of all `cluster-copier` processes.
You can launch as many processes as you want, whenever and wherever you want.
Each process try to select the nearest available shard of source cluster and copy some part of data (partition) from it to the whole
destination cluster (with resharding).
Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
Since the workers coordinate their work via ZooKeeper, in addition to `--task-path </task/path>` you have to specify ZooKeeper
cluster configuration via `--config-file <zookeeper.xml>` parameter. Example of `zookeeper.xml`:
```xml
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
```
When you run `clickhouse-copier --config-file <zookeeper.xml> --task-path </task/path>` the process connects to ZooKeeper cluster, reads tasks config from `/task/path/description` and executes them.
## Format of task config
Here is an example of `/task/path/description` content:
```xml
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<!-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
PARTITION BY toMonday(date)
ORDER BY (CounterID, EventDate)
</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- This section specifies partitions that should be copied, other partition will be ignored.
Partition names should have the same format as
partition column of system.parts table (i.e. a quoted text).
Since partition key of source and destination cluster could be different,
these partition names specify destination partitions.
NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
it is strictly recommended to specify them explicitly.
If you already have some ready paritions on destination cluster they
will be removed at the start of the copying since they will be interpeted
as unfinished data from the previous copying!!!
-->
<enabled_partitions>
<partition>'2018-02-26'</partition>
<partition>'2018-03-05'</partition>
...
</enabled_partitions>
</table_hits>
<!-- Next table to copy. It is not copied until previous table is copying. -->
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
```
cluster-copier processes watch for `/task/path/description` node update.
So, if you modify the config settings or `max_workers` params, they will be updated.
## Example
```bash
clickhouse-copier copier --daemon --config /path/to/copier/zookeeper.xml --task-path /clickhouse-copier/cluster1_tables_hits --base-dir /path/to/copier_logs
```
`--base-dir /path/to/copier_logs` specifies where auxilary and log files of the copier process will be saved.
In this case it will create `/path/to/copier_logs/clickhouse-copier_YYYYMMHHSS_<PID>/` dir with log and status-files.
If it is not specified it will use current dir (`/clickhouse-copier_YYYYMMHHSS_<PID>/` if it is run as a `--daemon`).
# ClickHouse utilites
There are several ClickHouse utilites that are separate executable files:
* `clickhouse-local` allows to execute SQL queries on a local data like `awk`
* `clickhouse-copier` copies (and reshards) immutable data from one cluster to another in a fault-tolerant manner.
......@@ -233,6 +233,11 @@ pages:
- 'Settings': 'operations/settings/settings.md'
- 'Settings profiles': 'operations/settings/settings_profiles.md'
- 'Utilites':
- 'Utilites': 'utils/index.md'
- 'clickhouse-copier': 'utils/clickhouse-copier.md'
#- 'clickhouse-local' : 'utils/clickhouse-local.md'
- 'ClickHouse Development':
# - 'ClickHouse Development': 'development/index.md'
- 'Overview of ClickHouse architecture': 'development/architecture.md'
......
......@@ -238,6 +238,11 @@ pages:
- 'Настройки': 'operations/settings/settings.md'
- 'Профили настроек': 'operations/settings/settings_profiles.md'
- 'Утилиты':
- 'Утилиты': 'utils/index.md'
- 'clickhouse-copier': 'utils/clickhouse-copier.md'
#- 'clickhouse-local' : 'utils/clickhouse-local.md'
- 'ClickHouse Development':
# - 'ClickHouse Development': 'development/index.md'
- 'Overview of ClickHouse architecture': 'development/architecture.md'
......
# clickhouse-copier util
The util copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed and fault-tolerant manner.
Configuration of copying tasks is set in special ZooKeeper node (called the `/description` node).
A ZooKeeper path to the description node is specified via `--task-path </task/path>` parameter.
So, node `/task/path/description` should contain special XML content describing copying tasks.
Simultaneously many `clickhouse-copier` processes located on any servers could execute the same task.
ZooKeeper node `/task/path/` is used by the processes to coordinate their work.
You must not add additional child nodes to `/task/path/`.
Currently you are responsible for manual launching of all `cluster-copier` processes.
You can launch as many processes as you want, whenever and wherever you want.
Each process try to select the nearest available shard of source cluster and copy some part of data (partition) from it to the whole
destination cluster (with resharding).
Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
Since the workers coordinate their work via ZooKeeper, in addition to `--task-path </task/path>` you have to specify ZooKeeper
cluster configuration via `--config-file <zookeeper.xml>` parameter. Example of `zookeeper.xml`:
```xml
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
```
When you run `clickhouse-copier --config-file <zookeeper.xml> --task-path </task/path>` the process connects to ZooKeeper cluster, reads tasks config from `/task/path/description` and executes them.
## Format of task config
Here is an example of `/task/path/description` content:
```xml
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<!-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
PARTITION BY toMonday(date)
ORDER BY (CounterID, EventDate)
</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- This section specifies partitions that should be copied, other partition will be ignored.
Partition names should have the same format as
partition column of system.parts table (i.e. a quoted text).
Since partition key of source and destination cluster could be different,
these partition names specify destination partitions.
NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
it is strictly recommended to specify them explicitly.
If you already have some ready paritions on destination cluster they
will be removed at the start of the copying since they will be interpeted
as unfinished data from the previous copying!!!
-->
<enabled_partitions>
<partition>'2018-02-26'</partition>
<partition>'2018-03-05'</partition>
...
</enabled_partitions>
</table_hits>
<!-- Next table to copy. It is not copied until previous table is copying. -->
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
```
cluster-copier processes watch for `/task/path/description` node update.
So, if you modify the config settings or `max_workers` params, they will be updated.
## Example
```bash
clickhouse-copier copier --daemon --config /path/to/copier/zookeeper.xml --task-path /clickhouse-copier/cluster1_tables_hits --base-dir /path/to/copier_logs
```
`--base-dir /path/to/copier_logs` specifies where auxilary and log files of the copier process will be saved.
In this case it will create `/path/to/copier_logs/clickhouse-copier_YYYYMMHHSS_<PID>/` dir with log and status-files.
If it is not specified it will use current dir (`/clickhouse-copier_YYYYMMHHSS_<PID>/` if it is run as a `--daemon`).
# Утилиты ClickHouse
Существует несколько утилит ClickHouse, которые представляют из себя отдельные исполняемые файлы:
* `clickhouse-local` позволяет выполнять SQL-запросы над данными подобно тому, как это делает `awk`
* `clickhouse-copier` копирует (и перешардирует) неизменяемые данные с одного кластера на другой отказоустойчивым способом.
......@@ -18,3 +18,5 @@ endif ()
if (USE_MYSQL)
add_subdirectory (libmysqlxx)
endif ()
add_subdirectory (libconsistent-hashing)
cmake_minimum_required(VERSION 2.8)
project(libconsistent-hashing CXX)
add_library(libconsistent-hashing yandex/consistent_hashing.cpp yandex/popcount.cpp mailru/sumbur.cpp)
target_include_directories(libconsistent-hashing PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
\ No newline at end of file
//Copyright (c) 2011-2012 Mail.RU
//Copyright (c) 2011-2012 Maksim Kalinchenko
//Copyright (c) 2012 Sokolov Yura aka funny-falcon
//
//MIT License
//
//Permission is hereby granted, free of charge, to any person obtaining
//a copy of this software and associated documentation files (the
//"Software"), to deal in the Software without restriction, including
//without limitation the rights to use, copy, modify, merge, publish,
//distribute, sublicense, and/or sell copies of the Software, and to
//permit persons to whom the Software is furnished to do so, subject to
//the following conditions:
//
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
//MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
//LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
//OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
//WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include <stdexcept>
#define L 0xFFFFFFFF
static unsigned int L27_38[] = {L / 27, L / 28, L / 29, L / 30, L / 31, L / 32,
L / 33, L / 34, L / 35, L / 36, L / 37, L / 38,
L / 39, L / 40, L / 41, L / 42, L / 43, L / 44,
L / 45, L / 46, L / 47, L / 48, L / 49, L / 50,
L / 51, L / 52, L / 53, L / 54, L / 55, L / 56,
L / 57, L / 58, L / 59, L / 60, L / 61, L / 62
};
static unsigned int LL27_38[] = {L/(26*27), L/(27*28), L/(28*29), L/(29*30), L/(30*31), L/(31*32),
L/(32*33), L/(33*34), L/(34*35), L/(35*36), L/(36*37), L/(37*38),
L/(38*39), L/(39*40), L/(40*41), L/(41*42), L/(42*43), L/(43*44),
L/(44*45), L/(45*46), L/(46*47), L/(47*48), L/(48*49), L/(49*50),
L/(50*51), L/(51*52), L/(52*53), L/(53*54), L/(54*55), L/(55*56),
L/(56*57), L/(57*58), L/(58*59), L/(59*60), L/(60*61), L/(61*62)
};
unsigned int sumburConsistentHash(unsigned int hashed_int, unsigned int capacity)
{
unsigned int h = hashed_int;
unsigned int capa = capacity;
unsigned int part, n, i, c;
if (capa == 0)
throw std::runtime_error("Sumbur is not applicable to empty cluster");
part = L / capa;
if (L - h < part) return 0;
n = 1;
do {
if (h >= L / 2) h -= L / 2;
else {
n = 2;
if (L / 2 - h < part) return 1;
}
if (capa == 2) return 1;
#define curslice(i) (L / (i * (i - 1)))
#define unroll(i) \
if (curslice(i) <= h) h -= curslice(i); \
else { \
h += curslice(i) * (i - n - 1); \
n = i; \
if (L / i - h < part) return n-1; \
} \
if (capa == i) return (n-1)
unroll(3); unroll(4); unroll(5);
unroll(6); unroll(7); unroll(8);
unroll(9); unroll(10); unroll(11);
unroll(12); unroll(13); unroll(14);
unroll(15); unroll(16); unroll(17);
unroll(18); unroll(19); unroll(20);
unroll(21); unroll(22); unroll(23);
unroll(24); unroll(25); unroll(26);
for (i = 27; i <= capa && i <= 62; i++) {
c = LL27_38[i-27];
if (c <= h) {
h -= c;
}
else {
h += c * (i - n - 1);
n = i;
if (L27_38[i-27] - h < part) return n-1;
}
}
for(i = 63; i <= capa; i++) {
c = L / (i * (i - 1));
if (c <= h) {
h -= c;
}
else {
h += c * (i - n - 1);
n = i;
if (L / i - h < part) return n - 1;
}
}
} while(0);
return n - 1;
}
//Copyright (c) 2011-2012 Mail.RU
//Copyright (c) 2011-2012 Maksim Kalinchenko
//Copyright (c) 2012 Sokolov Yura aka funny-falcon
//
//MIT License
//
//Permission is hereby granted, free of charge, to any person obtaining
//a copy of this software and associated documentation files (the
//"Software"), to deal in the Software without restriction, including
//without limitation the rights to use, copy, modify, merge, publish,
//distribute, sublicense, and/or sell copies of the Software, and to
//permit persons to whom the Software is furnished to do so, subject to
//the following conditions:
//
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
//MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
//LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
//OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
//WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
/// Source code: https://github.com/mailru/sumbur-ruby/blob/master/ext/sumbur/sumbur.c
unsigned int sumburConsistentHash(unsigned int hashed_int, unsigned int capacity);
#pragma once
#include <stdint.h>
#include <limits>
#include <type_traits>
inline uint16_t LO_16(uint32_t x) { return static_cast<uint16_t>(x & 0x0000FFFF); }
inline uint16_t HI_16(uint32_t x) { return static_cast<uint16_t>(x >> 16); }
inline uint32_t LO_32(uint64_t x) { return static_cast<uint32_t>(x & 0x00000000FFFFFFFF); }
inline uint32_t HI_32(uint64_t x) { return static_cast<uint32_t>(x >> 32); }
/// Clang also defines __GNUC__
#if defined(__GNUC__)
inline unsigned GetValueBitCountImpl(unsigned int value) noexcept {
// NOTE: __builtin_clz* have undefined result for zero.
return std::numeric_limits<unsigned int>::digits - __builtin_clz(value);
}
inline unsigned GetValueBitCountImpl(unsigned long value) noexcept {
return std::numeric_limits<unsigned long>::digits - __builtin_clzl(value);
}
inline unsigned GetValueBitCountImpl(unsigned long long value) noexcept {
return std::numeric_limits<unsigned long long>::digits - __builtin_clzll(value);
}
#else
/// Stupid realization for non GCC-like compilers. Can use BSR from x86 instructions set.
template <typename T>
inline unsigned GetValueBitCountImpl(T value) noexcept {
unsigned result = 1; // result == 0 - impossible value, since value cannot be zero
value >>= 1;
while (value) {
value >>= 1;
++result;
}
return result;
}
#endif
/**
* Returns the number of leading 0-bits in `value`, starting at the most significant bit position.
* NOTE: value cannot be zero
*/
template <typename T>
static inline unsigned GetValueBitCount(T value) noexcept {
using TCvt = std::make_unsigned_t<std::decay_t<T>>;
return GetValueBitCountImpl(static_cast<TCvt>(value));
}
#include "consistent_hashing.h"
#include "bitops.h"
#include "popcount.h"
#include <stdexcept>
/*
* (all numbers are written in big-endian manner: the least significant digit on the right)
* (only bit representations are used - no hex or octal, leading zeroes are ommited)
*
* Consistent hashing scheme:
*
* (sizeof(TValue) * 8, y] (y, 0]
* a = * ablock
* b = * cblock
*
* (sizeof(TValue) * 8, k] (k, 0]
* c = * cblock
*
* d = *
*
* k - is determined by 2^(k-1) < n <= 2^k inequality
* z - is number of ones in cblock
* y - number of digits after first one in cblock
*
* The cblock determines logic of using a- and b- blocks:
*
* bits of cblock | result of a function
* 0 : 0
* 1 : 1 (optimization, the next case includes this one)
* 1?..? : 1ablock (z is even) or 1bblock (z is odd) if possible (<n)
*
* If last case is not possible (>=n), than smooth moving from n=2^(k-1) to n=2^k is applied.
* Using "*" bits of a-,b-,c-,d- blocks uint64_t value is combined, modulo of which determines
* if the value should be greather than 2^(k-1) or ConsistentHashing(x, 2^(k-1)) should be used.
* The last case is optimized according to previous checks.
*/
namespace {
template<class TValue>
TValue PowerOf2(size_t k) {
return (TValue)0x1 << k;
}
template<class TValue>
TValue SelectAOrBBlock(TValue a, TValue b, TValue cBlock) {
size_t z = PopCount<uint64_t>(cBlock);
bool useABlock = z % 2 == 0;
return useABlock ? a : b;
}
// Gets the exact result for n = k2 = 2 ^ k
template<class TValue>
size_t ConsistentHashingForPowersOf2(TValue a, TValue b, TValue c, TValue k2) {
TValue cBlock = c & (k2 - 1); // (k, 0] bits of c
// Zero and one cases
if (cBlock < 2) {
// First two cases of result function table: 0 if cblock is 0, 1 if cblock is 1.
return cBlock;
}
size_t y = GetValueBitCount<uint64_t>(cBlock) - 1; // cblock = 0..01?..? (y = number of digits after 1), y > 0
TValue y2 = PowerOf2<TValue>(y); // y2 = 2^y
TValue abBlock = SelectAOrBBlock(a, b, cBlock) & (y2 - 1);
return y2 + abBlock;
}
template<class TValue>
uint64_t GetAsteriskBits(TValue a, TValue b, TValue c, TValue d, size_t k) {
size_t shift = sizeof(TValue) * 8 - k;
uint64_t res = (d << shift) | (c >> k);
++shift;
res <<= shift;
res |= b >> (k - 1);
res <<= shift;
res |= a >> (k - 1);
return res;
}
template<class TValue>
size_t ConsistentHashingImpl(TValue a, TValue b, TValue c, TValue d, size_t n) {
if (n <= 0)
throw std::runtime_error("Can't map consistently to a zero values.");
// Uninteresting case
if (n == 1) {
return 0;
}
size_t k = GetValueBitCount(n - 1); // 2^(k-1) < n <= 2^k, k >= 1
TValue k2 = PowerOf2<TValue>(k); // k2 = 2^k
size_t largeValue;
{
// Bit determined variant. Large scheme.
largeValue = ConsistentHashingForPowersOf2(a, b, c, k2);
if (largeValue < n) {
return largeValue;
}
}
// Since largeValue is not assigned yet
// Smooth moving from one bit scheme to another
TValue k21 = PowerOf2<TValue>(k - 1);
{
size_t s = GetAsteriskBits(a, b, c, d, k) % (largeValue * (largeValue + 1));
size_t largeValue2 = s / k2 + k21;
if (largeValue2 < n) {
return largeValue2;
}
}
// Bit determined variant. Short scheme.
return ConsistentHashingForPowersOf2(a, b, c, k21); // Do not apply checks. It is always less than k21 = 2^(k-1)
}
} // namespace // anonymous
std::size_t ConsistentHashing(std::uint64_t x, std::size_t n) {
uint32_t lo = LO_32(x);
uint32_t hi = HI_32(x);
return ConsistentHashingImpl<uint16_t>(LO_16(lo), HI_16(lo), LO_16(hi), HI_16(hi), n);
}
std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n) {
return ConsistentHashingImpl<uint32_t>(LO_32(lo), HI_32(lo), LO_32(hi), HI_32(hi), n);
}
#pragma once
#include <cstdint>
#include <cstddef>
/*
* Author: Konstantin Oblakov
*
* Maps random ui64 x (in fact hash of some string) to n baskets/shards.
* Output value is id of a basket. 0 <= ConsistentHashing(x, n) < n.
* Probability of all baskets must be equal. Also, it should be consistent
* in terms, that with different n_1 < n_2 probability of
* ConsistentHashing(x, n_1) != ConsistentHashing(x, n_2) must be equal to
* (n_2 - n_1) / n_2 - the least possible with previous conditions.
* It requires O(1) memory and cpu to calculate. So, it is faster than classic
* consistent hashing algos with points on circle.
*/
std::size_t ConsistentHashing(std::uint64_t x, std::size_t n); // Works good for n < 65536
std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n); // Works good for n < 4294967296
#include "popcount.h"
static const uint8_t PopCountLUT8Impl[1 << 8] = {
#define B2(n) n, n + 1, n + 1, n + 2
#define B4(n) B2(n), B2(n + 1), B2(n + 1), B2(n + 2)
#define B6(n) B4(n), B4(n + 1), B4(n + 1), B4(n + 2)
B6(0), B6(1), B6(1), B6(2)};
uint8_t const* PopCountLUT8 = PopCountLUT8Impl;
#if !defined(_MSC_VER)
//ICE here for msvc
static const uint8_t PopCountLUT16Impl[1 << 16] = {
#define B2(n) n, n + 1, n + 1, n + 2
#define B4(n) B2(n), B2(n + 1), B2(n + 1), B2(n + 2)
#define B6(n) B4(n), B4(n + 1), B4(n + 1), B4(n + 2)
#define B8(n) B6(n), B6(n + 1), B6(n + 1), B6(n + 2)
#define B10(n) B8(n), B8(n + 1), B8(n + 1), B8(n + 2)
#define B12(n) B10(n), B10(n + 1), B10(n + 1), B10(n + 2)
#define B14(n) B12(n), B12(n + 1), B12(n + 1), B12(n + 2)
B14(0), B14(1), B14(1), B14(2)};
uint8_t const* PopCountLUT16 = PopCountLUT16Impl;
#endif
#pragma once
#include <stdint.h>
#include <cstddef>
#include <type_traits>
using std::size_t;
#include "bitops.h"
#if defined(_MSC_VER)
#include <intrin.h>
#endif
static inline uint32_t PopCountImpl(uint8_t n) {
extern uint8_t const* PopCountLUT8;
return PopCountLUT8[n];
}
static inline uint32_t PopCountImpl(uint16_t n) {
#if defined(_MSC_VER)
return __popcnt16(n);
#else
extern uint8_t const* PopCountLUT16;
return PopCountLUT16[n];
#endif
}
static inline uint32_t PopCountImpl(uint32_t n) {
#if defined(_MSC_VER)
return __popcnt(n);
#elif defined(__GNUC__) // it is true for Clang also
return __builtin_popcount(n);
#else
return PopCountImpl((uint16_t)LO_16(n)) + PopCountImpl((uint16_t)HI_16(n));
#endif
}
static inline uint32_t PopCountImpl(uint64_t n) {
#if defined(_MSC_VER) && !defined(_i386_)
return __popcnt64(n);
#elif defined(__GNUC__) // it is true for Clang also
return __builtin_popcountll(n);
#else
return PopCountImpl((uint32_t)LO_32(n)) + PopCountImpl((uint32_t)HI_32(n));
#endif
}
template <class T>
static inline uint32_t PopCount(T n) {
using TCvt = std::make_unsigned_t<std::decay_t<T>>;
return PopCountImpl(static_cast<TCvt>(n));
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册