提交 87ac7c59 编写于 作者: V Vladimir Chebotarev

Added concurrent test for `ALTER MODIFY TTL`.

上级 4f1f9e7e
......@@ -74,37 +74,37 @@ def test_rule_with_invalid_destination(started_cluster, name, engine, alter):
{expression}
SETTINGS storage_policy='{policy}'
""".format(expression=x, name=name, engine=engine, policy=policy)
if alter:
node1.query(get_command(None, "small_jbod_with_external"))
with pytest.raises(QueryRuntimeException):
node1.query(get_command("TTL d1 TO DISK 'unknown'", "small_jbod_with_external"))
node1.query("DROP TABLE IF EXISTS {}".format(name))
if alter:
node1.query(get_command(None, "small_jbod_with_external"))
with pytest.raises(QueryRuntimeException):
node1.query(get_command("TTL d1 TO VOLUME 'unknown'", "small_jbod_with_external"))
node1.query("DROP TABLE IF EXISTS {}".format(name))
if alter:
node1.query(get_command(None, "only_jbod2"))
with pytest.raises(QueryRuntimeException):
node1.query(get_command("TTL d1 TO DISK 'jbod1'", "only_jbod2"))
node1.query("DROP TABLE IF EXISTS {}".format(name))
if alter:
node1.query(get_command(None, "only_jbod2"))
with pytest.raises(QueryRuntimeException):
node1.query(get_command("TTL d1 TO VOLUME 'external'", "only_jbod2"))
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
......@@ -582,14 +582,14 @@ limitations under the License."""
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY p1
TTL d1 + INTERVAL 30 SECOND TO DISK 'jbod2',
TTL d1 + INTERVAL 30 SECOND TO DISK 'jbod2',
d1 + INTERVAL 60 SECOND TO VOLUME 'external'
SETTINGS storage_policy='jbods_with_external', merge_with_ttl_timeout=0
""".format(name=name, engine=engine))
node1.query("""
ALTER TABLE {name} MODIFY
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
d1 + INTERVAL 5 SECOND TO VOLUME 'external',
d1 + INTERVAL 10 SECOND DELETE
""".format(name=name))
......@@ -624,3 +624,94 @@ limitations under the License."""
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
@pytest.mark.parametrize("name,engine", [
("concurrently_altering_ttl_mt","MergeTree()"),
("concurrently_altering_ttl_replicated_mt","ReplicatedMergeTree('/clickhouse/concurrently_altering_ttl_replicated_mt', '1')",),
])
def test_concurrent_alter_with_ttl_move(started_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
EventDate Date,
number UInt64
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY toYYYYMM(EventDate)
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
values = list({ random.randint(1, 1000000) for _ in range(0, 1000) })
def insert(num):
for i in range(num):
day = random.randint(11, 30)
value = values.pop()
month = '0' + str(random.choice([3, 4]))
node1.query("INSERT INTO {} VALUES(toDate('2019-{m}-{d}'), {v})".format(name, m=month, d=day, v=value))
def alter_move(num):
def produce_alter_move(node, name):
move_type = random.choice(["PART", "PARTITION"])
if move_type == "PART":
for _ in range(10):
try:
parts = node1.query("SELECT name from system.parts where table = '{}' and active = 1".format(name)).strip().split('\n')
break
except QueryRuntimeException:
pass
else:
raise Exception("Cannot select from system.parts")
move_part = random.choice(["'" + part + "'" for part in parts])
else:
move_part = random.choice([201903, 201904])
move_disk = random.choice(["DISK", "VOLUME"])
if move_disk == "DISK":
move_volume = random.choice(["'external'", "'jbod1'", "'jbod2'"])
else:
move_volume = random.choice(["'main'", "'external'"])
try:
node1.query("ALTER TABLE {} MOVE {mt} {mp} TO {md} {mv}".format(
name, mt=move_type, mp=move_part, md=move_disk, mv=move_volume))
except QueryRuntimeException as ex:
pass
for i in range(num):
produce_alter_move(node1, name)
def alter_update(num):
for i in range(num):
node1.query("ALTER TABLE {} UPDATE number = number + 1 WHERE 1".format(name))
def alter_modify_ttl(num):
for i in range(num):
ttls = []
for j in range(random.randint(1, 10)):
what = random.choice(["TO VOLUME 'main'", "TO VOLUME 'external'", "TO DISK 'jbod1'", "TO DISK 'jbod2'", "TO DISK 'external'"])
when = "now()+{}".format(random.randint(-1, 5))
ttls.append("{} {}".format(when, what))
node1.query("ALTER TABLE {} MODIFY TTL {}".format(name, ", ".join(ttls)))
def optimize_table(num):
for i in range(num):
node1.query("OPTIMIZE TABLE {} FINAL".format(name))
p = Pool(15)
tasks = []
for i in range(5):
tasks.append(p.apply_async(insert, (100,)))
tasks.append(p.apply_async(alter_move, (100,)))
tasks.append(p.apply_async(alter_update, (100,)))
tasks.append(p.apply_async(alter_modify_ttl, (100,)))
tasks.append(p.apply_async(optimize_table, (100,)))
for task in tasks:
task.get(timeout=60)
assert node1.query("SELECT 1") == "1\n"
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册