提交 4e27c268 编写于 作者: V Vitaliy Lyudvichenko

Add clickhouse-copier description to the docs. [#CLICKHOUSE-3606]

上级 120530e4
#pragma once
#include <Poco/Util/ServerApplication.h>
/* = clickhouse-cluster-copier util =
/* clickhouse cluster copier util
* Copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed fault-tolerant manner.
*
* Configuration of copying tasks is set in special ZooKeeper node (called the description node).
* A ZooKeeper path to the description node is specified via --task-path </task/path> parameter.
* So, node /task/path/description should contain special XML content describing copying tasks.
* See overview in the docs: docs/en/utils/clickhouse-copier.md
*
* Simultaneously many clickhouse-cluster-copier processes located on any servers could execute the same task.
* ZooKeeper node /task/path/ is used by the processes to coordinate their work.
* You must not add additional child nodes to /task/path/.
*
* Currently you are responsible for launching cluster-copier processes.
* You can launch as many processes as you want, whenever and wherever you want.
* Each process try to select nearest available shard of source cluster and copy some part of data (partition) from it to the whole
* destination cluster with resharding.
* Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
*
* Since the workers coordinate their work via ZooKeeper, in addition to --task-path </task/path> you have to specify ZooKeeper
* configuration via --config-file <zookeeper.xml> parameter. Example of zookeeper.xml:
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
* When you run clickhouse-cluster-copier --config-file <zookeeper.xml> --task-path </task/path>
* the process connects to ZooKeeper, reads tasks config from /task/path/description and executes them.
*
*
* = Format of task config =
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- Name of the table task, it must be an unique name suitable for ZooKeeper node name -->
<table_hits>
<-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/hits2/{shard}/hits2', '{replica}', EventDate, (CounterID, EventDate), 8192)</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>intHash32(UserID)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- Optional section, it specifies partitions that should be copied, other partition will be ignored -->
<enabled_partitions>
<partition>201712</partition>
<partition>201801</partition>
...
</enabled_partitions>
</table_hits>
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
* = Implementation details =
* Implementation details:
*
* cluster-copier workers pull each partition of each shard of the source cluster and push it to the destination cluster through
* Distributed table (to preform data resharding). So, worker job is a partition of a source shard.
......@@ -144,7 +24,7 @@
* /server_fqdn#PID_timestamp - cluster-copier worker ID
* ...
* /tables - directory with table tasks
* /table_hits - directory of table_hits task
* /cluster.db.table - directory of table_hits task
* /partition1 - directory for partition1
* /shards - directory for source cluster shards
* /1 - worker job for the first shard of partition1 of table test.hits
......
......@@ -26,7 +26,11 @@
-->
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}') PARTITION BY toMonday(date) ORDER BY d</engine>
<engine>ENGINE=
ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/b', '{replica}')
PARTITION BY toMonday(date)
ORDER BY d
</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>jumpConsistentHash(intHash64(d), 2)</sharding_key>
......
# clickhouse-copier util
The util copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed and fault-tolerant manner.
Configuration of copying tasks is set in special ZooKeeper node (called the `/description` node).
A ZooKeeper path to the description node is specified via `--task-path </task/path>` parameter.
So, node `/task/path/description` should contain special XML content describing copying tasks.
Simultaneously many `clickhouse-copier` processes located on any servers could execute the same task.
ZooKeeper node `/task/path/` is used by the processes to coordinate their work.
You must not add additional child nodes to `/task/path/`.
Currently you are responsible for manual launching of all `cluster-copier` processes.
You can launch as many processes as you want, whenever and wherever you want.
Each process try to select the nearest available shard of source cluster and copy some part of data (partition) from it to the whole
destination cluster (with resharding).
Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
Since the workers coordinate their work via ZooKeeper, in addition to `--task-path </task/path>` you have to specify ZooKeeper
cluster configuration via `--config-file <zookeeper.xml>` parameter. Example of `zookeeper.xml`:
```xml
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
```
When you run `clickhouse-copier --config-file <zookeeper.xml> --task-path </task/path>` the process connects to ZooKeeper cluster, reads tasks config from `/task/path/description` and executes them.
## Format of task config
Here is an example of `/task/path/description` content:
```xml
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<!-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
PARTITION BY toMonday(date)
ORDER BY (CounterID, EventDate)
</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- This section specifies partitions that should be copied, other partition will be ignored.
Partition names should have the same format as
partition column of system.parts table (i.e. a quoted text).
Since partition key of source and destination cluster could be different,
these partition names specify destination partitions.
NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
it is strictly recommended to specify them explicitly.
If you already have some ready paritions on destination cluster they
will be removed at the start of the copying since they will be interpeted
as unfinished data from the previous copying!!!
-->
<enabled_partitions>
<partition>'2018-02-26'</partition>
<partition>'2018-03-05'</partition>
...
</enabled_partitions>
</table_hits>
<!-- Next table to copy. It is not copied until previous table is copying. -->
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
```
cluster-copier processes watch for `/task/path/description` node update.
So, if you modify the config settings or `max_workers` params, they will be updated.
## Example
```bash
clickhouse-copier copier --daemon --config /path/to/copier/zookeeper.xml --task-path /clickhouse-copier/cluster1_tables_hits --base-dir /path/to/copier_logs
```
`--base-dir /path/to/copier_logs` specifies where auxilary and log files of the copier process will be saved.
In this case it will create `/path/to/copier_logs/clickhouse-copier_YYYYMMHHSS_<PID>/` dir with log and status-files.
If it is not specified it will use current dir (`/clickhouse-copier_YYYYMMHHSS_<PID>/` if it is run as a `--daemon`).
# ClickHouse utilites
There are several ClickHouse utilites that are separate executable files:
* `clickhouse-local` allows to execute SQL queries on a local data like `awk`
* `clickhouse-copier` copies (and reshards) immutable data from one cluster to another in a fault-tolerant manner.
......@@ -235,6 +235,11 @@ pages:
- 'Settings': 'operations/settings/settings.md'
- 'Settings profiles': 'operations/settings/settings_profiles.md'
- 'Utilites':
- 'Utilites': 'utils/index.md'
- 'clickhouse-copier': 'utils/clickhouse-copier.md'
#- 'clickhouse-local' : 'utils/clickhouse-local.md'
- 'ClickHouse Development':
# - 'ClickHouse Development': 'development/index.md'
- 'Overview of ClickHouse architecture': 'development/architecture.md'
......
......@@ -238,6 +238,11 @@ pages:
- 'Настройки': 'operations/settings/settings.md'
- 'Профили настроек': 'operations/settings/settings_profiles.md'
- 'Утилиты':
- 'Утилиты': 'utils/index.md'
- 'clickhouse-copier': 'utils/clickhouse-copier.md'
#- 'clickhouse-local' : 'utils/clickhouse-local.md'
- 'ClickHouse Development':
# - 'ClickHouse Development': 'development/index.md'
- 'Overview of ClickHouse architecture': 'development/architecture.md'
......
# clickhouse-copier util
The util copies tables data from one cluster to new tables of other (possibly the same) cluster in distributed and fault-tolerant manner.
Configuration of copying tasks is set in special ZooKeeper node (called the `/description` node).
A ZooKeeper path to the description node is specified via `--task-path </task/path>` parameter.
So, node `/task/path/description` should contain special XML content describing copying tasks.
Simultaneously many `clickhouse-copier` processes located on any servers could execute the same task.
ZooKeeper node `/task/path/` is used by the processes to coordinate their work.
You must not add additional child nodes to `/task/path/`.
Currently you are responsible for manual launching of all `cluster-copier` processes.
You can launch as many processes as you want, whenever and wherever you want.
Each process try to select the nearest available shard of source cluster and copy some part of data (partition) from it to the whole
destination cluster (with resharding).
Therefore it makes sense to launch cluster-copier processes on the source cluster nodes to reduce the network usage.
Since the workers coordinate their work via ZooKeeper, in addition to `--task-path </task/path>` you have to specify ZooKeeper
cluster configuration via `--config-file <zookeeper.xml>` parameter. Example of `zookeeper.xml`:
```xml
<yandex>
<zookeeper>
<node index="1">
<host>127.0.0.1</host>
<port>2181</port>
</node>
</zookeeper>
</yandex>
```
When you run `clickhouse-copier --config-file <zookeeper.xml> --task-path </task/path>` the process connects to ZooKeeper cluster, reads tasks config from `/task/path/description` and executes them.
## Format of task config
Here is an example of `/task/path/description` content:
```xml
<yandex>
<!-- Configuration of clusters as in an ordinary server config -->
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
...
</source_cluster>
<destination_cluster>
...
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>test</database_pull>
<table_pull>hits</table_pull>
<!-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>test</database_push>
<table_push>hits2</table_push>
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/hits2', '{replica}')
PARTITION BY toMonday(date)
ORDER BY (CounterID, EventDate)
</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>jumpConsistentHash(intHash64(UserID), 2)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<where_condition>CounterID != 0</where_condition>
<!-- This section specifies partitions that should be copied, other partition will be ignored.
Partition names should have the same format as
partition column of system.parts table (i.e. a quoted text).
Since partition key of source and destination cluster could be different,
these partition names specify destination partitions.
NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
it is strictly recommended to specify them explicitly.
If you already have some ready paritions on destination cluster they
will be removed at the start of the copying since they will be interpeted
as unfinished data from the previous copying!!!
-->
<enabled_partitions>
<partition>'2018-02-26'</partition>
<partition>'2018-03-05'</partition>
...
</enabled_partitions>
</table_hits>
<!-- Next table to copy. It is not copied until previous table is copying. -->
</table_visits>
...
</table_visits>
...
</tables>
</yandex>
```
cluster-copier processes watch for `/task/path/description` node update.
So, if you modify the config settings or `max_workers` params, they will be updated.
## Example
```bash
clickhouse-copier copier --daemon --config /path/to/copier/zookeeper.xml --task-path /clickhouse-copier/cluster1_tables_hits --base-dir /path/to/copier_logs
```
`--base-dir /path/to/copier_logs` specifies where auxilary and log files of the copier process will be saved.
In this case it will create `/path/to/copier_logs/clickhouse-copier_YYYYMMHHSS_<PID>/` dir with log and status-files.
If it is not specified it will use current dir (`/clickhouse-copier_YYYYMMHHSS_<PID>/` if it is run as a `--daemon`).
# Утилиты ClickHouse
Существует несколько утилит ClickHouse, которые представляют из себя отдельные исполняемые файлы:
* `clickhouse-local` позволяет выполнять SQL-запросы над данными подобно тому, как это делает `awk`
* `clickhouse-copier` копирует (и перешардирует) неизменяемые данные с одного кластера на другой отказоустойчивым способом.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册