diff --git a/contrib/libzookeeper/src/zookeeper.c b/contrib/libzookeeper/src/zookeeper.c index 1ba90afa2fc80653693b2615a25633730882c73b..a46d3aa83981aca2aed0af2d8307cb5652d8a2b0 100644 --- a/contrib/libzookeeper/src/zookeeper.c +++ b/contrib/libzookeeper/src/zookeeper.c @@ -1378,27 +1378,25 @@ static void free_key_list(char **list, int count) free(list); } -static int send_set_watches(zhandle_t *zh) +/// ZOOKEEPER-706: Split SET_WATCHES message into multiple messages if a session has a large number of watches. +#define MAX_SET_WATCHES_MSG_LENGTH (128 * 1024) + +static int send_set_watches_batch( + zhandle_t *zh, + int64_t relative_zxid, + struct String_vector *data_watches, + struct String_vector *exist_watches, + struct String_vector *child_watches) { struct oarchive *oa; struct RequestHeader h = { STRUCT_INITIALIZER(xid , SET_WATCHES_XID), STRUCT_INITIALIZER(type , ZOO_SETWATCHES_OP)}; struct SetWatches req; int rc; - req.relativeZxid = zh->last_zxid; - req.dataWatches.data = collect_keys(zh->active_node_watchers, (int*)&req.dataWatches.count); - req.existWatches.data = collect_keys(zh->active_exist_watchers, (int*)&req.existWatches.count); - req.childWatches.data = collect_keys(zh->active_child_watchers, (int*)&req.childWatches.count); - - // return if there are no pending watches - if (!req.dataWatches.count && !req.existWatches.count && - !req.childWatches.count) { - free_key_list(req.dataWatches.data, req.dataWatches.count); - free_key_list(req.existWatches.data, req.existWatches.count); - free_key_list(req.childWatches.data, req.childWatches.count); - return ZOK; - } - + req.relativeZxid = relative_zxid; + req.dataWatches = *data_watches; + req.existWatches = *exist_watches; + req.childWatches = *child_watches; oa = create_buffer_oarchive(); rc = serialize_RequestHeader(oa, "header", &h); @@ -1406,12 +1404,79 @@ static int send_set_watches(zhandle_t *zh) /* add this buffer to the head of the send queue */ rc = rc < 0 ? rc : queue_front_buffer_bytes(&zh->to_send, get_buffer(oa), get_buffer_len(oa)); - /* We queued the buffer, so don't free it */ + /* We queued the buffer, so don't free it */ close_buffer_oarchive(&oa, 0); - free_key_list(req.dataWatches.data, req.dataWatches.count); - free_key_list(req.existWatches.data, req.existWatches.count); - free_key_list(req.childWatches.data, req.childWatches.count); LOG_DEBUG(("Sending set watches request to %s",format_current_endpoint_info(zh))); + return rc; +} + +static int send_set_watches(zhandle_t *zh) +{ + int64_t relative_zxid = zh->last_zxid; + struct String_vector all_data_watches; + struct String_vector all_exist_watches; + struct String_vector all_child_watches; + int i_data_watch = 0; + int i_exist_watch = 0; + int i_child_watch = 0; + int rc = 0; + + all_data_watches.data = collect_keys(zh->active_node_watchers, (int*)&all_data_watches.count); + all_exist_watches.data = collect_keys(zh->active_exist_watchers, (int*)&all_exist_watches.count); + all_child_watches.data = collect_keys(zh->active_child_watchers, (int*)&all_child_watches.count); + + while (i_data_watch < all_data_watches.count || i_exist_watch < all_exist_watches.count + || i_child_watch < all_child_watches.count) { + struct String_vector data_watches_batch = { + STRUCT_INITIALIZER(data , all_data_watches.data + i_data_watch), + STRUCT_INITIALIZER(count , 0) + }; + struct String_vector exist_watches_batch = { + STRUCT_INITIALIZER(data , all_exist_watches.data + i_exist_watch), + STRUCT_INITIALIZER(count , 0) + }; + struct String_vector child_watches_batch = { + STRUCT_INITIALIZER(data , all_child_watches.data + i_child_watch), + STRUCT_INITIALIZER(count , 0) + }; + int batch_length = 0; + + data_watches_batch.data = all_data_watches.data + i_data_watch; + exist_watches_batch.data = all_exist_watches.data + i_exist_watch; + child_watches_batch.data = all_child_watches.data + i_child_watch; + + /// Note, we may exceed our max length by a bit when we add the last + /// watch in the batch. This isn't ideal, but it makes the code simpler. + while (batch_length < MAX_SET_WATCHES_MSG_LENGTH) { + char *watch; + if (i_data_watch < all_data_watches.count) { + watch = all_data_watches.data[i_data_watch]; + ++i_data_watch; + ++data_watches_batch.count; + } else if (i_exist_watch < all_exist_watches.count) { + watch = all_exist_watches.data[i_exist_watch]; + ++i_exist_watch; + ++exist_watches_batch.count; + } else if (i_child_watch < all_child_watches.count) { + watch = all_child_watches.data[i_child_watch]; + ++i_child_watch; + ++child_watches_batch.count; + } else { + break; + } + batch_length += strlen(watch); + } + + rc = send_set_watches_batch( + zh, relative_zxid, &data_watches_batch, &exist_watches_batch, &child_watches_batch); + if (rc < 0) { + break; + } + } + + free_key_list(all_data_watches.data, all_data_watches.count); + free_key_list(all_exist_watches.data, all_exist_watches.count); + free_key_list(all_child_watches.data, all_child_watches.count); return (rc < 0)?ZMARSHALLINGERROR:ZOK; } diff --git a/libs/libzkutil/src/tests/CMakeLists.txt b/libs/libzkutil/src/tests/CMakeLists.txt index 0669656ba1e671c37befee68f9cca90ea2e6a9ca..09efea6f50d1c60b3aa0d6c95f8985d7c51ebe13 100644 --- a/libs/libzkutil/src/tests/CMakeLists.txt +++ b/libs/libzkutil/src/tests/CMakeLists.txt @@ -15,3 +15,6 @@ target_link_libraries(zkutil_test_async zkutil dbms) add_executable(zkutil_zookeeper_holder zkutil_zookeeper_holder.cpp) target_link_libraries(zkutil_zookeeper_holder zkutil dbms) + +add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp) +target_link_libraries (zk_many_watches_reconnect zkutil dbms) diff --git a/libs/libzkutil/src/tests/zk_many_watches_reconnect.cpp b/libs/libzkutil/src/tests/zk_many_watches_reconnect.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3695407f819fda5295588c4b2461be08730992a7 --- /dev/null +++ b/libs/libzkutil/src/tests/zk_many_watches_reconnect.cpp @@ -0,0 +1,68 @@ +#include +#include +#include +#include + +/// A tool for reproducing https://issues.apache.org/jira/browse/ZOOKEEPER-706 +/// Original libzookeeper can't reconnect the session if the length of SET_WATCHES message +/// exceeeds jute.maxbuffer (0xfffff by default). +/// This happens when the number of watches exceeds ~29000. +/// +/// Session reconnect can be caused by forbidding packets to the current zookeeper server, e.g. +/// sudo ip6tables -A OUTPUT -d mtzoo01it.haze.yandex.net -j REJECT + +const size_t N_THREADS = 100; + +int main(int argc, char ** argv) +{ + try + { + if (argc != 3) + { + std::cerr << "usage: " << argv[0] << " " << std::endl; + return 3; + } + + ConfigProcessor processor(false, true); + auto config = processor.loadConfig(argv[1]); + zkutil::ZooKeeper zk(*config, "zookeeper"); + zkutil::EventPtr watch = std::make_shared(); + + /// NOTE: setting watches in multiple threads because doing it in a single thread is too slow. + size_t watches_per_thread = std::stoull(argv[2]) / N_THREADS; + std::vector threads; + for (size_t i_thread = 0; i_thread < N_THREADS; ++i_thread) + { + threads.emplace_back([&, i_thread] + { + for (size_t i = 0; i < watches_per_thread; ++i) + zk.exists("/clickhouse/nonexistent_node" + std::to_string(i * N_THREADS + i_thread), nullptr, watch); + }); + } + for (size_t i_thread = 0; i_thread < N_THREADS; ++i_thread) + threads[i_thread].join(); + + while (true) + { + std::cerr << "WAITING..." << std::endl; + sleep(10); + } + } + catch (Poco::Exception & e) + { + std::cerr << "Exception: " << e.displayText() << std::endl; + return 1; + } + catch (std::exception & e) + { + std::cerr << "std::exception: " << e.what() << std::endl; + return 3; + } + catch (...) + { + std::cerr << "Some exception" << std::endl; + return 2; + } + + return 0; +}