提交 a4636f86 编写于 作者: V Vitaliy Lyudvichenko 提交者: alexey-milovidov

Fixed incorrect INSERT into Distributed table in async mode with local...

Fixed incorrect INSERT into Distributed table in async mode with local replicas in a shard. #1404 [#CLICKHOUSE-2]

Resolves #1404
上级 d940d171
......@@ -359,17 +359,32 @@ void DistributedBlockOutputStream::writeSplitAsync(const Block & block)
void DistributedBlockOutputStream::writeAsyncImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
if (shard_info.hasInternalReplication())
writeToShard(block, {shard_info.dir_name_for_internal_replication});
{
if (shard_info.getLocalNodeCount() > 0)
{
/// Prefer insert into current instance directly
writeToLocal(block, shard_info.getLocalNodeCount());
}
else
{
if (shard_info.dir_name_for_internal_replication.empty())
throw Exception("Directory name for async inserts is empty, table " + storage.getTableName(), ErrorCodes::LOGICAL_ERROR);
writeToShard(block, {shard_info.dir_name_for_internal_replication});
}
}
else
{
if (shard_info.getLocalNodeCount() > 0)
writeToLocal(block, shard_info.getLocalNodeCount());
std::vector<std::string> dir_names;
for (const auto & address : cluster->getShardsAddresses()[shard_id])
if (!address.is_local)
dir_names.push_back(address.toStringFull());
if (!dir_names.empty())
writeToShard(block, dir_names);
}
......
......@@ -11,11 +11,11 @@ class Client:
self.command = [command, '--host', self.host, '--port', str(self.port), '--stacktrace']
def query(self, sql, stdin=None, timeout=None):
return self.get_query_request(sql, stdin, timeout).get_answer()
def query(self, sql, stdin=None, timeout=None, settings=None):
return self.get_query_request(sql, stdin=stdin, timeout=timeout, settings=settings).get_answer()
def get_query_request(self, sql, stdin=None, timeout=None):
def get_query_request(self, sql, stdin=None, timeout=None, settings=None):
command = self.command[:]
if stdin is None:
......@@ -24,6 +24,10 @@ class Client:
else:
command += ['--query', sql]
if settings is not None:
for setting, value in settings.iteritems():
command += ['--' + setting, str(value)]
return CommandRequest(command, stdin, timeout)
......
......@@ -32,8 +32,8 @@ def ddl_check_query(instance, query, num_hosts=None):
return contents
def ddl_check_there_are_no_dublicates(instance):
rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/*ddl_entry=query-%' GROUP BY query)")
assert len(rows) == 0 or rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
rows = instance.query("SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE '/* ddl_entry=query-%' GROUP BY query)")
assert len(rows) > 0 and rows[0][0] == "1", "dublicates on {} {}, query {}".format(instance.name, instance.ip_address)
# Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil::KeeperException errors
def insert_reliable(instance, query_insert):
......
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
......@@ -12,5 +13,16 @@
</replica>
</shard>
</test_cluster>
<local_shard_with_internal_replication>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
</shard>
</local_shard_with_internal_replication>
</remote_servers>
</yandex>
from contextlib import contextmanager
from helpers.network import PartitionManager
from helpers.test_tools import TSV
import pytest
......@@ -77,6 +78,20 @@ def test_insertion_sync_with_disabled_timeout(started_cluster):
INSERT INTO distributed_table SELECT today() as date, number as val FROM system.numbers''', timeout=1)
def test_async_inserts_into_local_shard(started_cluster):
node1.query('''CREATE TABLE shard_local (i Int64) ENGINE = Memory''')
node1.query('''CREATE TABLE shard_distributed (i Int64) ENGINE = Distributed(local_shard_with_internal_replication, default, shard_local)''')
node1.query('''INSERT INTO shard_distributed VALUES (1)''', settings={ "insert_distributed_sync" : 0 })
assert TSV(node1.query('''SELECT count() FROM shard_distributed''')) == TSV("1\n")
node1.query('''DETACH TABLE shard_distributed''')
node1.query('''ATTACH TABLE shard_distributed''')
assert TSV(node1.query('''SELECT count() FROM shard_distributed''')) == TSV("1\n")
node1.query('''DROP TABLE shard_distributed''')
node1.query('''DROP TABLE shard_local''')
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册