提交 15de7e4f 编写于 作者: N Nikita Mikhaylov

first try

上级 c5ca4c3b
......@@ -1185,14 +1185,21 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
{
String query;
query += "SELECT " + fields + " FROM " + getQuotedTable(from_table);
if (enable_splitting)
if (experimental_use_sample_offset)
query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);
/// TODO: Bad, it is better to rewrite with ASTLiteral(partition_key_field)
query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) + " = (" + task_partition.name + " AS partition_key))";
if (enable_splitting)
query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) + " = " + toString(current_piece_number) + " )";
if (!experimental_use_sample_offset)
query += " AND ( cityHash64(" + primary_key_comma_separated + ") %" + toString(number_of_splits) + " = " + toString(current_piece_number) + " )";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
if (!limit.empty())
query += " LIMIT " + limit;
......@@ -1778,16 +1785,22 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
createShardInternalTables(timeouts, task_shard, false);
TaskTable & task_table = task_shard.task_table;
std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
+ " WHERE (" + queryToString(task_table.engine_push_partition_key_ast)
+ " = (" + partition_quoted_name + " AS partition_key))";
const size_t number_of_splits = task_table.number_of_splits;
const String & primary_key_comma_separated = task_table.primary_key_comma_separated;
query += " AND (cityHash64(" + primary_key_comma_separated + ") % "
+ std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )";
UNUSED(primary_key_comma_separated);
std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard);
if (experimental_use_sample_offset)
query += " SAMPLE 1/" + toString(number_of_splits) + " OFFSET " + toString(current_piece_number) + "/" + toString(number_of_splits);
query += " WHERE (" + queryToString(task_table.engine_push_partition_key_ast)
+ " = (" + partition_quoted_name + " AS partition_key))";
if (!experimental_use_sample_offset)
query += " AND (cityHash64(" + primary_key_comma_separated + ") % "
+ std::to_string(number_of_splits) + " = " + std::to_string(current_piece_number) + " )";
if (!task_table.where_condition_str.empty())
query += " AND (" + task_table.where_condition_str + ")";
......
......@@ -61,6 +61,11 @@ public:
move_fault_probability = move_fault_probability_;
}
void setExperimentalUseSampleOffset(bool value)
{
experimental_use_sample_offset = value;
}
protected:
String getWorkersPath() const
......@@ -211,6 +216,8 @@ private:
double copy_fault_probability = 0.0;
double move_fault_probability = 0.0;
bool experimental_use_sample_offset{false};
Context & context;
Poco::Logger * log;
......
......@@ -20,6 +20,11 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
if (config().has("move-fault-probability"))
move_fault_probability = std::max(std::min(config().getDouble("move-fault-probability"), 1.0), 0.0);
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : Poco::Path::current();
if (config().has("experimental-use-sample-offset"))
experimental_use_sample_offset = config().getBool("experimental-use-sample-offset");
// process_id is '<hostname>#<start_timestamp>_<pid>'
time_t timestamp = Poco::Timestamp().epochTime();
auto curr_pid = Poco::Process::id();
......@@ -75,6 +80,8 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("log-level").binding("log-level"));
options.addOption(Poco::Util::Option("base-dir", "", "base directory for copiers, consecutive copier launches will populate /base-dir/launch_id/* directories")
.argument("base-dir").binding("base-dir"));
options.addOption(Poco::Util::Option("experimental-use-sample-offset", "", "Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k")
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
......@@ -121,6 +128,8 @@ void ClusterCopierApp::mainImpl()
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);
copier->setExperimentalUseSampleOffset(experimental_use_sample_offset);
auto task_file = config().getString("task-file", "");
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
......
......@@ -82,6 +82,8 @@ private:
double move_fault_probability = 0.0;
bool is_help = false;
bool experimental_use_sample_offset{false};
std::string base_dir;
std::string process_path;
std::string process_id;
......
......@@ -33,7 +33,7 @@
<enabled_partitions>3 4 5 6 1 2 0 </enabled_partitions>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16</engine>
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 3 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
......
......@@ -32,7 +32,7 @@
<cluster_push>default_cluster</cluster_push>
<database_push>default</database_push>
<table_push>copier_test1_1</table_push>
<engine>ENGINE = MergeTree PARTITION BY date ORDER BY date</engine>
<engine>ENGINE = MergeTree PARTITION BY date ORDER BY (date, sipHash64(date)) SAMPLE BY sipHash64(date)</engine>
<sharding_key>rand()</sharding_key>
</table_crm_fin_account>
</tables>
......
......@@ -29,7 +29,7 @@
<table_push>trivial</table_push>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 5 ORDER BY d SETTINGS index_granularity = 16</engine>
<engine>ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
......
......@@ -36,7 +36,7 @@ def ddl_check_query(instance, query, num_hosts=3):
return contents
@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def started_cluster():
global cluster
try:
......@@ -85,7 +85,9 @@ class Task1:
ddl_check_query(instance, "DROP DATABASE IF EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "CREATE DATABASE IF NOT EXISTS default ON CLUSTER cluster{}".format(cluster_num))
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/hits', '{replica}') PARTITION BY d % 3 ORDER BY d SETTINGS index_granularity = 16")
ddl_check_query(instance, "CREATE TABLE hits ON CLUSTER cluster0 (d UInt64, d1 UInt64 MATERIALIZED d+1) " +
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/hits', '{replica}') " +
"PARTITION BY d % 3 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster0 (d UInt64) ENGINE=Distributed(cluster0, default, hits, d)")
ddl_check_query(instance, "CREATE TABLE hits_all ON CLUSTER cluster1 (d UInt64) ENGINE=Distributed(cluster1, default, hits, d + 1)")
instance.query("INSERT INTO hits_all SELECT * FROM system.numbers LIMIT 1002", settings={"insert_distributed_sync": 1})
......@@ -155,7 +157,7 @@ class Task_test_block_size:
ddl_check_query(instance, """
CREATE TABLE test_block_size ON CLUSTER shard_0_0 (partition Date, d UInt64)
ENGINE=ReplicatedMergeTree('/clickhouse/tables/cluster_{cluster}/{shard}/test_block_size', '{replica}')
ORDER BY d""", 2)
ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d)""", 2)
instance.query("INSERT INTO test_block_size SELECT toDate(0) AS partition, number as d FROM system.numbers LIMIT {}".format(self.rows))
......@@ -260,31 +262,68 @@ def execute_task(task, cmd_options):
# Tests
def test_copy_simple(started_cluster):
execute_task(Task1(started_cluster), [])
def test_copy_with_recovering(started_cluster):
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
def test_copy_with_recovering_after_move_faults(started_cluster):
execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
def test_copy_simple(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(Task1(started_cluster), ['--experimental-use-sample-offset', '1'])
else:
execute_task(Task1(started_cluster), [])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
def test_copy_with_recovering(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
def test_copy_month_to_week_partition(started_cluster):
execute_task(Task2(started_cluster), [])
def test_copy_month_to_week_partition_with_recovering(started_cluster):
def test_copy_month_to_week_partition_with_recovering(started_cluster, use_sample_offset):
execute_task(Task2(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
def test_copy_month_to_week_partition_with_recovering_after_move_faults(started_cluster):
def test_copy_month_to_week_partition_with_recovering_after_move_faults(started_cluster, use_sample_offset):
execute_task(Task2(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
def test_block_size(started_cluster):
def test_block_size(started_cluster, use_sample_offset):
execute_task(Task_test_block_size(started_cluster), [])
def test_no_index(started_cluster):
def test_no_index(started_cluster, use_sample_offset):
execute_task(Task_no_index(started_cluster), [])
def test_no_arg(started_cluster):
def test_no_arg(started_cluster, use_sample_offset):
execute_task(Task_no_arg(started_cluster), [])
if __name__ == '__main__':
......
......@@ -18,7 +18,7 @@ COPYING_FAIL_PROBABILITY = 0.33
MOVING_FAIL_PROBABILITY = 0.1
cluster = None
@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def started_cluster():
global cluster
try:
......@@ -47,9 +47,12 @@ def started_cluster():
class TaskTrivial:
def __init__(self, cluster):
def __init__(self, cluster, use_sample_offset):
self.cluster = cluster
self.zk_task_path="/clickhouse-copier/task_trivial"
if use_sample_offset:
self.zk_task_path="/clickhouse-copier/task_trivial_use_sample_offset"
else:
self.zk_task_path="/clickhouse-copier/task_trivial"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_trivial.xml'), 'r').read()
......@@ -63,7 +66,7 @@ class TaskTrivial:
source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1) "
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/source_trivial_cluster/1/trivial', '1') "
"PARTITION BY d % 5 ORDER BY d SETTINGS index_granularity = 16")
"PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16")
source.query("INSERT INTO trivial SELECT * FROM system.numbers LIMIT 1002", settings={"insert_distributed_sync": 1})
......@@ -127,14 +130,51 @@ def execute_task(task, cmd_options):
# Tests
def test_trivial_copy(started_cluster):
execute_task(TaskTrivial(started_cluster), [])
def test_trivial_copy_with_copy_fault(started_cluster):
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
def test_trivial_copy_with_move_fault(started_cluster):
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
def test_trivial_copy(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster, use_sample_offset), ['--experimental-use-sample-offset', '1'])
else:
print("AAAAA")
execute_task(TaskTrivial(started_cluster, use_sample_offset), [])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
def test_trivial_copy_with_copy_fault(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(TaskTrivial(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
def test_trivial_copy_with_move_fault(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
'--experimental-use-sample-offset', '1'])
else:
execute_task(TaskTrivial(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY)])
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册