提交 dca35b19 编写于 作者: A alesapin

Add awesome test for fetch

上级 d6130f13
......@@ -17,7 +17,8 @@ RUN apt-get update \
sqlite3 \
curl \
tar \
krb5-user
krb5-user \
iproute2
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
......
......@@ -205,12 +205,12 @@ namespace detail
bool nextImpl() override
{
if (read_callback)
read_callback(count());
if (!impl->next())
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
if (read_callback)
read_callback(count());
return true;
}
......@@ -225,6 +225,7 @@ namespace detail
void setNextReadCallback(std::function<void(size_t)> read_callback_)
{
read_callback = read_callback_;
read_callback(count());
}
};
}
......
......@@ -53,7 +53,6 @@ std::string getEndpointId(const std::string & node_id)
return "DataPartsExchange:" + node_id;
}
struct ReplicatedFetchReadCallback
{
ReplicatedFetchList::Entry & replicated_fetch_entry;
......@@ -65,9 +64,9 @@ struct ReplicatedFetchReadCallback
void operator() (size_t bytes_count)
{
replicated_fetch_entry->bytes_read_compressed = bytes_count;
replicated_fetch_entry->bytes_read_compressed.store(bytes_count, std::memory_order_relaxed);
replicated_fetch_entry->progress.store(
replicated_fetch_entry->bytes_read_compressed.load(std::memory_order_relaxed) / replicated_fetch_entry->total_size_bytes_compressed,
static_cast<double>(bytes_count) / replicated_fetch_entry->total_size_bytes_compressed,
std::memory_order_relaxed);
}
};
......@@ -307,10 +306,11 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
reservation = data.makeEmptyReservationOnLargestDisk();
}
auto storage_id = data.getStorageID();
String new_part_path = data.getFullPathOnDisk(reservation->getDisk()) + part_name + "/";
auto entry = data.global_context.getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, part_name,
replica_path, uri.toString(), interserver_scheme, to_detached, sum_files_size);
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);
in.setNextReadCallback(ReplicatedFetchReadCallback(*entry));
......
......@@ -9,25 +9,21 @@ ReplicatedFetchListElement::ReplicatedFetchListElement(
const std::string & database_, const std::string & table_,
const std::string & partition_id_, const std::string & result_part_name_,
const std::string & result_part_path_, const std::string & source_replica_path_,
const std::string & source_replica_address_, const std::string & interserver_scheme_,
UInt8 to_detached_, UInt64 total_size_bytes_compressed_)
const Poco::URI & uri_, UInt8 to_detached_, UInt64 total_size_bytes_compressed_)
: database(database_)
, table(table_)
, partition_id(partition_id_)
, result_part_name(result_part_name_)
, result_part_path(result_part_path_)
, source_replica_path(source_replica_path_)
, source_replica_address(source_replica_address_)
, interserver_scheme(interserver_scheme_)
, source_replica_hostname(uri_.getHost())
, source_replica_port(uri_.getPort())
, interserver_scheme(uri_.getScheme())
, uri(uri_.toString())
, to_detached(to_detached_)
, total_size_bytes_compressed(total_size_bytes_compressed_)
, thread_id(getThreadId())
{
background_thread_memory_tracker = CurrentThread::getMemoryTracker();
if (background_thread_memory_tracker)
{
background_thread_memory_tracker_prev_parent = background_thread_memory_tracker->getParent();
background_thread_memory_tracker->setParent(&memory_tracker);
}
}
......@@ -40,23 +36,18 @@ ReplicatedFetchInfo ReplicatedFetchListElement::getInfo() const
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.source_replica_path = source_replica_path;
res.source_replica_address = source_replica_address;
res.source_replica_hostname = source_replica_hostname;
res.source_replica_port = source_replica_port;
res.interserver_scheme = interserver_scheme;
res.uri = uri;
res.interserver_scheme = interserver_scheme;
res.to_detached = to_detached;
res.elapsed = watch.elapsedSeconds();
res.progress = progress.load(std::memory_order_relaxed);
res.bytes_read_compressed = bytes_read_compressed.load(std::memory_order_relaxed);
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.memory_usage = memory_tracker.get();
res.thread_id = thread_id;
return res;
}
ReplicatedFetchListElement::~ReplicatedFetchListElement()
{
/// Unplug memory_tracker from current background processing pool thread
if (background_thread_memory_tracker)
background_thread_memory_tracker->setParent(background_thread_memory_tracker_prev_parent);
}
}
......@@ -4,6 +4,7 @@
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Common/Stopwatch.h>
#include <Common/MemoryTracker.h>
#include <Poco/URI.h>
namespace CurrentMetrics
{
......@@ -23,8 +24,10 @@ struct ReplicatedFetchInfo
std::string result_part_path;
std::string source_replica_path;
std::string source_replica_address;
std::string source_replica_hostname;
UInt16 source_replica_port;
std::string interserver_scheme;
std::string uri;
UInt8 to_detached;
......@@ -34,7 +37,6 @@ struct ReplicatedFetchInfo
UInt64 total_size_bytes_compressed;
UInt64 bytes_read_compressed;
UInt64 memory_usage;
UInt64 thread_id;
};
......@@ -48,9 +50,11 @@ struct ReplicatedFetchListElement : private boost::noncopyable
const std::string result_part_name;
const std::string result_part_path;
const std::string source_replica_path;
const std::string source_replica_address;
const std::string interserver_scheme;
std::string source_replica_path;
std::string source_replica_hostname;
UInt16 source_replica_port;
std::string interserver_scheme;
std::string uri;
const UInt8 to_detached;
......@@ -60,22 +64,15 @@ struct ReplicatedFetchListElement : private boost::noncopyable
std::atomic<UInt64> bytes_read_compressed{};
UInt64 total_size_bytes_compressed{};
MemoryTracker memory_tracker{VariableContext::Process};
MemoryTracker * background_thread_memory_tracker;
MemoryTracker * background_thread_memory_tracker_prev_parent = nullptr;
UInt64 thread_id;
ReplicatedFetchListElement(
const std::string & database_, const std::string & table_,
const std::string & partition_id_, const std::string & result_part_name_,
const std::string & result_part_path_, const std::string & source_replica_path_,
const std::string & source_replica_address_, const std::string & interserver_scheme_,
UInt8 to_detached_, UInt64 total_size_bytes_compressed_);
const Poco::URI & uri, UInt8 to_detached_, UInt64 total_size_bytes_compressed_);
ReplicatedFetchInfo getInfo() const;
~ReplicatedFetchListElement();
};
......
#include <Storages/System/StorageSystemFetches.h>
#include <Storages/MergeTree/ReplicatedFetchesList.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <Access/ContextAccess.h>
......@@ -19,10 +21,11 @@ NamesAndTypesList StorageSystemFetches::getNamesAndTypes()
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
{"bytes_read_compressed", std::make_shared<DataTypeUInt64>()},
{"source_replica_path", std::make_shared<DataTypeString>()},
{"source_replica_address", std::make_shared<DataTypeString>()},
{"source_replica_hostname", std::make_shared<DataTypeString>()},
{"source_replica_port", std::make_shared<DataTypeUInt16>()},
{"interserver_scheme", std::make_shared<DataTypeString>()},
{"URI", std::make_shared<DataTypeString>()},
{"to_detached", std::make_shared<DataTypeUInt8>()},
{"memory_usage", std::make_shared<DataTypeUInt64>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
};
}
......@@ -48,10 +51,11 @@ void StorageSystemFetches::fillData(MutableColumns & res_columns, const Context
res_columns[i++]->insert(fetch.total_size_bytes_compressed);
res_columns[i++]->insert(fetch.bytes_read_compressed);
res_columns[i++]->insert(fetch.source_replica_path);
res_columns[i++]->insert(fetch.source_replica_address);
res_columns[i++]->insert(fetch.source_replica_hostname);
res_columns[i++]->insert(fetch.source_replica_port);
res_columns[i++]->insert(fetch.interserver_scheme);
res_columns[i++]->insert(fetch.uri);
res_columns[i++]->insert(fetch.to_detached);
res_columns[i++]->insert(fetch.memory_usage);
res_columns[i++]->insert(fetch.thread_id);
}
}
......
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
......@@ -11,6 +10,7 @@ namespace DB
class Context;
/// system.fetches table. Takes data from context.getReplicatedFetchesList()
class StorageSystemFetches final : public ext::shared_ptr_helper<StorageSystemFetches>, public IStorageSystemOneBlock<StorageSystemFetches >
{
friend struct ext::shared_ptr_helper<StorageSystemFetches>;
......
......@@ -814,6 +814,7 @@ services:
tmpfs: {tmpfs}
cap_add:
- SYS_PTRACE
- NET_ADMIN
depends_on: {depends_on}
user: '{user}'
env_file:
......
......@@ -19,6 +19,7 @@ class PartitionManager:
def __init__(self):
self._iptables_rules = []
self._netem_delayed_instances = []
_NetworkManager.get()
def drop_instance_zk_connections(self, instance, action='DROP'):
......@@ -46,11 +47,18 @@ class PartitionManager:
self._add_rule(create_rule(left, right))
self._add_rule(create_rule(right, left))
def add_network_delay(self, instance, delay_ms):
self._add_tc_netem_delay(instance, delay_ms)
def heal_all(self):
while self._iptables_rules:
rule = self._iptables_rules.pop()
_NetworkManager.get().delete_iptables_rule(**rule)
while self._netem_delayed_instances:
instance = self._netem_delayed_instances.pop()
instance.exec_in_container(["bash", "-c", "tc qdisc del dev eth0 root netem"], user="root")
def pop_rules(self):
res = self._iptables_rules[:]
self.heal_all()
......@@ -73,6 +81,10 @@ class PartitionManager:
_NetworkManager.get().delete_iptables_rule(**rule)
self._iptables_rules.remove(rule)
def _add_tc_netem_delay(self, instance, delay_ms):
instance.exec_in_container(["bash", "-c", "tc qdisc add dev eth0 root netem delay {}ms".format(delay_ms)], user="root")
self._netem_delayed_instances.append(instance)
def __enter__(self):
return self
......
#!/usr/bin/env python3
import pytest
import time
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
import random
import string
import json
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_random_string(length):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length))
def test_system_fetches(started_cluster):
node1.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '1') ORDER BY tuple()")
node2.query("CREATE TABLE t (key UInt64, data String) ENGINE = ReplicatedMergeTree('/clickhouse/test/t', '2') ORDER BY tuple()")
with PartitionManager() as pm:
node2.query("SYSTEM STOP FETCHES t")
node1.query("INSERT INTO t SELECT number, '{}' FROM numbers(10000)".format(get_random_string(104857)))
pm.add_network_delay(node1, 80)
node2.query("SYSTEM START FETCHES t")
fetches_result = []
for _ in range(1000):
result = json.loads(node2.query("SELECT * FROM system.fetches FORMAT JSON"))
if not result["data"]:
if fetches_result:
break
time.sleep(0.1)
else:
fetches_result.append(result["data"][0])
print(fetches_result[-1])
time.sleep(0.1)
node2.query("SYSTEM SYNC REPLICA t", timeout=10)
assert node2.query("SELECT COUNT() FROM t") == "10000\n"
for elem in fetches_result:
elem['bytes_read_compressed'] = float(elem['bytes_read_compressed'])
elem['total_size_bytes_compressed'] = float(elem['total_size_bytes_compressed'])
elem['progress'] = float(elem['progress'])
elem['elapsed'] = float(elem['elapsed'])
assert len(fetches_result) > 0
first_non_empty = fetches_result[0]
assert first_non_empty['database'] == "default"
assert first_non_empty['table'] == "t"
assert first_non_empty['source_replica_hostname'] == 'node1'
assert first_non_empty['source_replica_port'] == 9009
assert first_non_empty['source_replica_path'] == '/clickhouse/test/t/replicas/1'
assert first_non_empty['interserver_scheme'] == 'http'
assert first_non_empty['partition_id'] == 'all'
assert first_non_empty['URI'].startswith('http://node1:9009/?endpoint=DataPartsExchange')
for elem in fetches_result:
assert elem['bytes_read_compressed'] <= elem['total_size_bytes_compressed'], "Bytes read ({}) more than total bytes ({}). It's a bug".format(elem['bytes_read_compressed'], elem['total_size_bytes_compressed'])
assert 0.0 <= elem['progress'] <= 1.0, "Progress shouldn't less than 0 and bigger than 1, got {}".format(elem['progress'])
assert 0.0 <= elem['elapsed'], "Elapsed time must be greater than 0, got {}".format(elem['elapsed'])
prev_progress = first_non_empty['progress']
for elem in fetches_result:
assert elem['progress'] >= prev_progress, "Progress decreasing prev{}, next {}? It's a bug".format(prev_progress, elem['progress'])
prev_progress = elem['progress']
prev_bytes = first_non_empty['bytes_read_compressed']
for elem in fetches_result:
assert elem['bytes_read_compressed'] >= prev_bytes, "Bytes read decreasing prev {}, next {}? It's a bug".format(prev_bytes, elem['bytes_read_compressed'])
prev_bytes = elem['bytes_read_compressed']
prev_elapsed = first_non_empty['elapsed']
for elem in fetches_result:
assert elem['elapsed'] >= prev_elapsed, "Elapsed time decreasing prev {}, next {}? It's a bug".format(prev_elapsed, elem['elapsed'])
prev_elapsed = elem['elapsed']
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册