From 4ac21b4c285619d2d06d7f65c9f2332d261f3619 Mon Sep 17 00:00:00 2001 From: Rajan Date: Tue, 21 Feb 2017 11:55:39 -0800 Subject: [PATCH] Add lookup throttling (#225) --- pulsar-client-cpp/include/pulsar/Client.h | 14 ++++++++++++++ pulsar-client-cpp/include/pulsar/Result.h | 1 + pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 6 +++--- pulsar-client-cpp/lib/Client.cc | 11 +++++++++++ pulsar-client-cpp/lib/ClientConnection.cc | 14 +++++++++++++- pulsar-client-cpp/lib/ClientConnection.h | 3 +++ pulsar-client-cpp/tests/BasicEndToEndTest.cc | 15 +++++++++++++++ 7 files changed, 60 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 0e96734f466..efaef3e298b 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -99,6 +99,20 @@ class ClientConfiguration { */ int getMessageListenerThreads() const; + /** + * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker. + * (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe on + * thousands of topic using created {@link PulsarClient} + * + * @param concurrentLookupRequest + */ + ClientConfiguration& setConcurrentLookupRequest(int concurrentLookupRequest); + + /** + * @return Get configured total allowed concurrent lookup-request. + */ + int getConcurrentLookupRequest() const; + /** * Initialize the log configuration * diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h index 959dd5bd75d..0619ed746e4 100644 --- a/pulsar-client-cpp/include/pulsar/Result.h +++ b/pulsar-client-cpp/include/pulsar/Result.h @@ -54,6 +54,7 @@ enum Result { ResultConsumerNotInitialized, /// Consumer is not initialized ResultProducerNotInitialized, /// Producer is not initialized + ResultTooManyLookupRequestException, /// Too Many concurrent LookupRequest ResultInvalidTopicName, /// Invalid topic name ResultInvalidUrl, /// Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index 4885d69c34d..61364318960 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -125,15 +125,15 @@ namespace pulsar { LookupDataResultPromisePtr promise) { if (data) { if(data->isRedirect()) { - LOG_DEBUG("Lookup request is for " << destinationName << " redirected to " << data->getBrokerUrl()); + LOG_DEBUG("PartitionMetadataLookup request is for " << destinationName << " redirected to " << data->getBrokerUrl()); Future future = cnxPool_.getConnectionAsync(data->getBrokerUrl()); future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, destinationName, _1, _2, promise)); } else { - LOG_DEBUG("Lookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl()); + LOG_DEBUG("PartitionMetadataLookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl()); promise->setValue(data); } } else { - LOG_DEBUG("Lookup failed for " << destinationName << ", result " << result); + LOG_DEBUG("PartitionMetadataLookup failed for " << destinationName << ", result " << result); promise->setFailed(result); } } diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index fded18e73bf..b870fa10b1c 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -35,6 +35,7 @@ struct ClientConfiguration::Impl { int ioThreads; int operationTimeoutSeconds; int messageListenerThreads; + int concurrentLookupRequest; std::string logConfFilePath; bool useTls; std::string tlsTrustCertsFilePath; @@ -43,6 +44,7 @@ struct ClientConfiguration::Impl { ioThreads(1), operationTimeoutSeconds(30), messageListenerThreads(1), + concurrentLookupRequest(5000), logConfFilePath() {} }; @@ -129,6 +131,15 @@ bool ClientConfiguration::isTlsAllowInsecureConnection() const { return impl_->tlsAllowInsecureConnection; } +ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurrentLookupRequest) { + impl_->concurrentLookupRequest = concurrentLookupRequest; + return *this; +} + +int ClientConfiguration::getConcurrentLookupRequest() const { + return impl_->concurrentLookupRequest; +} + ClientConfiguration& ClientConfiguration::setLogConfFilePath(const std::string& logConfFilePath) { impl_->logConfFilePath = logConfFilePath; return *this; diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index d7bc8660d7d..9a9453a6fbe 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -110,6 +110,8 @@ outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), outgoingCmd_(), havePendingPingRequest_(false), keepAliveTimer_(), +maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), +numOfPendingLookupRequest_(0), isTlsAllowInsecureConnection_(false) { if (clientConfiguration.isUseTls()) { using namespace boost::filesystem; @@ -623,6 +625,7 @@ void ClientConnection::handleIncomingCommand() { if (it != pendingLookupRequests_.end()) { LookupDataResultPromisePtr lookupDataPromise = it->second; pendingLookupRequests_.erase(it); + numOfPendingLookupRequest_--; lock.unlock(); if (!partitionMetadataResponse.has_response() @@ -661,6 +664,7 @@ void ClientConnection::handleIncomingCommand() { if (it != pendingLookupRequests_.end()) { LookupDataResultPromisePtr lookupDataPromise = it->second; pendingLookupRequests_.erase(it); + numOfPendingLookupRequest_--; lock.unlock(); if (!lookupTopicResponse.has_response() @@ -838,8 +842,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request if (isClosed()) { lock.unlock(); promise->setFailed(ResultNotConnected); + return; + } else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) { + lock.unlock(); + promise->setFailed(ResultTooManyLookupRequestException); + return; } pendingLookupRequests_.insert(std::make_pair(requestId, promise)); + numOfPendingLookupRequest_++; lock.unlock(); sendCommand(cmd); } @@ -1013,10 +1023,12 @@ void ClientConnection::close() { } // Fail all pending lookup-requests on the connection + lock.lock(); for (PendingLookupRequestsMap::iterator it = pendingLookupRequests_.begin(); it != pendingLookupRequests_.end(); ++it) { it->second->setFailed(ResultConnectError); + numOfPendingLookupRequest_--; } - + lock.unlock(); if (tlsSocket_) { tlsSocket_->lowest_layer().close(); } diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 12501451502..25123cdc82a 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -267,6 +267,9 @@ class ClientConnection : public boost::enable_shared_from_this bool havePendingPingRequest_; DeadlineTimerPtr keepAliveTimer_; + uint32_t maxPendingLookupRequest_; + uint32_t numOfPendingLookupRequest_; + friend class PulsarFriend; bool isTlsAllowInsecureConnection_; diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 7241b5b58ca..2126572eb0c 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -170,6 +170,21 @@ void resendMessage(Result r, const Message& msg, Producer &producer) { ASSERT_EQ(ResultOk, client.close()); } +TEST(BasicEndToEndTest, testLookupThrottling) { + ClientConfiguration config; + config.setConcurrentLookupRequest(0); + Client client(lookupUrl, config); + + Producer producer; + Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer); + ASSERT_EQ(ResultTooManyLookupRequestException, result); + + Consumer consumer1; + result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1); + ASSERT_EQ(ResultTooManyLookupRequestException, result); + +} + TEST(BasicEndToEndTest, testNonExistingTopic) { Client client(lookupUrl); -- GitLab