提交 98a866ee 编写于 作者: A Alexey Zatelepin 提交者: alexey-milovidov

split a single large SET_WATCHES message into multiple smaller ones [#CLICKHOUSE-2101]

See https://issues.apache.org/jira/browse/ZOOKEEPER-706 for details.
上级 d777cab1
......@@ -1378,27 +1378,25 @@ static void free_key_list(char **list, int count)
free(list);
}
static int send_set_watches(zhandle_t *zh)
/// ZOOKEEPER-706: Split SET_WATCHES message into multiple messages if a session has a large number of watches.
#define MAX_SET_WATCHES_MSG_LENGTH (128 * 1024)
static int send_set_watches_batch(
zhandle_t *zh,
int64_t relative_zxid,
struct String_vector *data_watches,
struct String_vector *exist_watches,
struct String_vector *child_watches)
{
struct oarchive *oa;
struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)};
struct SetWatches req;
int rc;
req.relativeZxid = zh->last_zxid;
req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count);
req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count);
req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count);
// return if there are no pending watches
if (!req.dataWatches.count && !req.existWatches.count &&
!req.childWatches.count) {
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
return ZOK;
}
req.relativeZxid = relative_zxid;
req.dataWatches = *data_watches;
req.existWatches = *exist_watches;
req.childWatches = *child_watches;
oa = create_buffer_oarchive();
rc = serialize_RequestHeader(oa, "header", &h);
......@@ -1406,12 +1404,79 @@ static int send_set_watches(zhandle_t *zh)
/* add this buffer to the head of the send queue */
rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa),
get_buffer_len(oa));
/* We queued the buffer, so don't free it */
/* We queued the buffer, so don't free it */
close_buffer_oarchive(&oa, 0);
free_key_list(req.dataWatches.data, req.dataWatches.count);
free_key_list(req.existWatches.data, req.existWatches.count);
free_key_list(req.childWatches.data, req.childWatches.count);
LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh)));
return rc;
}
static int send_set_watches(zhandle_t *zh)
{
int64_t relative_zxid = zh->last_zxid;
struct String_vector all_data_watches;
struct String_vector all_exist_watches;
struct String_vector all_child_watches;
int i_data_watch = 0;
int i_exist_watch = 0;
int i_child_watch = 0;
int rc = 0;
all_data_watches.data = collect_keys(zh->active_node_watchers, (int*)&all_data_watches.count);
all_exist_watches.data = collect_keys(zh->active_exist_watchers, (int*)&all_exist_watches.count);
all_child_watches.data = collect_keys(zh->active_child_watchers, (int*)&all_child_watches.count);
while (i_data_watch < all_data_watches.count || i_exist_watch < all_exist_watches.count
|| i_child_watch < all_child_watches.count) {
struct String_vector data_watches_batch = {
STRUCT_INITIALIZER(data , all_data_watches.data + i_data_watch),
STRUCT_INITIALIZER(count , 0)
};
struct String_vector exist_watches_batch = {
STRUCT_INITIALIZER(data , all_exist_watches.data + i_exist_watch),
STRUCT_INITIALIZER(count , 0)
};
struct String_vector child_watches_batch = {
STRUCT_INITIALIZER(data , all_child_watches.data + i_child_watch),
STRUCT_INITIALIZER(count , 0)
};
int batch_length = 0;
data_watches_batch.data = all_data_watches.data + i_data_watch;
exist_watches_batch.data = all_exist_watches.data + i_exist_watch;
child_watches_batch.data = all_child_watches.data + i_child_watch;
/// Note, we may exceed our max length by a bit when we add the last
/// watch in the batch. This isn't ideal, but it makes the code simpler.
while (batch_length < MAX_SET_WATCHES_MSG_LENGTH) {
char *watch;
if (i_data_watch < all_data_watches.count) {
watch = all_data_watches.data[i_data_watch];
++i_data_watch;
++data_watches_batch.count;
} else if (i_exist_watch < all_exist_watches.count) {
watch = all_exist_watches.data[i_exist_watch];
++i_exist_watch;
++exist_watches_batch.count;
} else if (i_child_watch < all_child_watches.count) {
watch = all_child_watches.data[i_child_watch];
++i_child_watch;
++child_watches_batch.count;
} else {
break;
}
batch_length += strlen(watch);
}
rc = send_set_watches_batch(
zh, relative_zxid, &data_watches_batch, &exist_watches_batch, &child_watches_batch);
if (rc < 0) {
break;
}
}
free_key_list(all_data_watches.data, all_data_watches.count);
free_key_list(all_exist_watches.data, all_exist_watches.count);
free_key_list(all_child_watches.data, all_child_watches.count);
return (rc < 0)?ZMARSHALLINGERROR:ZOK;
}
......
......@@ -15,3 +15,6 @@ target_link_libraries(zkutil_test_async zkutil dbms)
add_executable(zkutil_zookeeper_holder zkutil_zookeeper_holder.cpp)
target_link_libraries(zkutil_zookeeper_holder zkutil dbms)
add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect zkutil dbms)
#include <DB/Common/ConfigProcessor.h>
#include <zkutil/ZooKeeper.h>
#include <Poco/Event.h>
#include <iostream>
/// A tool for reproducing https://issues.apache.org/jira/browse/ZOOKEEPER-706
/// Original libzookeeper can't reconnect the session if the length of SET_WATCHES message
/// exceeeds jute.maxbuffer (0xfffff by default).
/// This happens when the number of watches exceeds ~29000.
///
/// Session reconnect can be caused by forbidding packets to the current zookeeper server, e.g.
/// sudo ip6tables -A OUTPUT -d mtzoo01it.haze.yandex.net -j REJECT
const size_t N_THREADS = 100;
int main(int argc, char ** argv)
{
try
{
if (argc != 3)
{
std::cerr << "usage: " << argv[0] << " <zookeeper_config> <number_of_watches>" << std::endl;
return 3;
}
ConfigProcessor processor(false, true);
auto config = processor.loadConfig(argv[1]);
zkutil::ZooKeeper zk(*config, "zookeeper");
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
/// NOTE: setting watches in multiple threads because doing it in a single thread is too slow.
size_t watches_per_thread = std::stoull(argv[2]) / N_THREADS;
std::vector<std::thread> threads;
for (size_t i_thread = 0; i_thread < N_THREADS; ++i_thread)
{
threads.emplace_back([&, i_thread]
{
for (size_t i = 0; i < watches_per_thread; ++i)
zk.exists("/clickhouse/nonexistent_node" + std::to_string(i * N_THREADS + i_thread), nullptr, watch);
});
}
for (size_t i_thread = 0; i_thread < N_THREADS; ++i_thread)
threads[i_thread].join();
while (true)
{
std::cerr << "WAITING..." << std::endl;
sleep(10);
}
}
catch (Poco::Exception & e)
{
std::cerr << "Exception: " << e.displayText() << std::endl;
return 1;
}
catch (std::exception & e)
{
std::cerr << "std::exception: " << e.what() << std::endl;
return 3;
}
catch (...)
{
std::cerr << "Some exception" << std::endl;
return 2;
}
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册