diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h
index 0e96734f466a8ca6aff96a12edde5f01d676dcb9..efaef3e298b5a5016e3a8eed5356a0377d8cfea4 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,20 @@ class ClientConfiguration {
*/
int getMessageListenerThreads() const;
+ /**
+ * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
+ * (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe on
+ * thousands of topic using created {@link PulsarClient}
+ *
+ * @param concurrentLookupRequest
+ */
+ ClientConfiguration& setConcurrentLookupRequest(int concurrentLookupRequest);
+
+ /**
+ * @return Get configured total allowed concurrent lookup-request.
+ */
+ int getConcurrentLookupRequest() const;
+
/**
* Initialize the log configuration
*
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index 959dd5bd75d4b73007f7529ee67008d8183015f7..0619ed746e47b9d76d848538bb86723b0ee6daa0 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -54,6 +54,7 @@ enum Result {
ResultConsumerNotInitialized, /// Consumer is not initialized
ResultProducerNotInitialized, /// Producer is not initialized
+ ResultTooManyLookupRequestException, /// Too Many concurrent LookupRequest
ResultInvalidTopicName, /// Invalid topic name
ResultInvalidUrl, /// Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 4885d69c34d8576868d6c43e8d28a36e82cc73e5..61364318960d87bc6d904dbb46845437f88dc29a 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -125,15 +125,15 @@ namespace pulsar {
LookupDataResultPromisePtr promise) {
if (data) {
if(data->isRedirect()) {
- LOG_DEBUG("Lookup request is for " << destinationName << " redirected to " << data->getBrokerUrl());
+ LOG_DEBUG("PartitionMetadataLookup request is for " << destinationName << " redirected to " << data->getBrokerUrl());
Future future = cnxPool_.getConnectionAsync(data->getBrokerUrl());
future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, destinationName, _1, _2, promise));
} else {
- LOG_DEBUG("Lookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
+ LOG_DEBUG("PartitionMetadataLookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
promise->setValue(data);
}
} else {
- LOG_DEBUG("Lookup failed for " << destinationName << ", result " << result);
+ LOG_DEBUG("PartitionMetadataLookup failed for " << destinationName << ", result " << result);
promise->setFailed(result);
}
}
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index fded18e73bfc5b175f025c75909ca674dd8b2873..b870fa10b1c1f77d3393949dc6f0d7863c52019f 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -35,6 +35,7 @@ struct ClientConfiguration::Impl {
int ioThreads;
int operationTimeoutSeconds;
int messageListenerThreads;
+ int concurrentLookupRequest;
std::string logConfFilePath;
bool useTls;
std::string tlsTrustCertsFilePath;
@@ -43,6 +44,7 @@ struct ClientConfiguration::Impl {
ioThreads(1),
operationTimeoutSeconds(30),
messageListenerThreads(1),
+ concurrentLookupRequest(5000),
logConfFilePath() {}
};
@@ -129,6 +131,15 @@ bool ClientConfiguration::isTlsAllowInsecureConnection() const {
return impl_->tlsAllowInsecureConnection;
}
+ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurrentLookupRequest) {
+ impl_->concurrentLookupRequest = concurrentLookupRequest;
+ return *this;
+}
+
+int ClientConfiguration::getConcurrentLookupRequest() const {
+ return impl_->concurrentLookupRequest;
+}
+
ClientConfiguration& ClientConfiguration::setLogConfFilePath(const std::string& logConfFilePath) {
impl_->logConfFilePath = logConfFilePath;
return *this;
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index d7bc8660d7d79b53dc77d985025e0e5eb2a90918..9a9453a6fbe479061396ee2c3383c931eb0102b5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -110,6 +110,8 @@ outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
outgoingCmd_(),
havePendingPingRequest_(false),
keepAliveTimer_(),
+maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
+numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) {
if (clientConfiguration.isUseTls()) {
using namespace boost::filesystem;
@@ -623,6 +625,7 @@ void ClientConnection::handleIncomingCommand() {
if (it != pendingLookupRequests_.end()) {
LookupDataResultPromisePtr lookupDataPromise = it->second;
pendingLookupRequests_.erase(it);
+ numOfPendingLookupRequest_--;
lock.unlock();
if (!partitionMetadataResponse.has_response()
@@ -661,6 +664,7 @@ void ClientConnection::handleIncomingCommand() {
if (it != pendingLookupRequests_.end()) {
LookupDataResultPromisePtr lookupDataPromise = it->second;
pendingLookupRequests_.erase(it);
+ numOfPendingLookupRequest_--;
lock.unlock();
if (!lookupTopicResponse.has_response()
@@ -838,8 +842,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
if (isClosed()) {
lock.unlock();
promise->setFailed(ResultNotConnected);
+ return;
+ } else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
+ lock.unlock();
+ promise->setFailed(ResultTooManyLookupRequestException);
+ return;
}
pendingLookupRequests_.insert(std::make_pair(requestId, promise));
+ numOfPendingLookupRequest_++;
lock.unlock();
sendCommand(cmd);
}
@@ -1013,10 +1023,12 @@ void ClientConnection::close() {
}
// Fail all pending lookup-requests on the connection
+ lock.lock();
for (PendingLookupRequestsMap::iterator it = pendingLookupRequests_.begin(); it != pendingLookupRequests_.end(); ++it) {
it->second->setFailed(ResultConnectError);
+ numOfPendingLookupRequest_--;
}
-
+ lock.unlock();
if (tlsSocket_) {
tlsSocket_->lowest_layer().close();
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index 12501451502d5d6c5d70f1622af46f2d750fb5bf..25123cdc82ab0ccce614d00d02e116ee6b38affa 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -267,6 +267,9 @@ class ClientConnection : public boost::enable_shared_from_this
bool havePendingPingRequest_;
DeadlineTimerPtr keepAliveTimer_;
+ uint32_t maxPendingLookupRequest_;
+ uint32_t numOfPendingLookupRequest_;
+
friend class PulsarFriend;
bool isTlsAllowInsecureConnection_;
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 7241b5b58ca5b71c08e8a5072103701aa978cdc0..2126572eb0c18873a682a53c855dfd75b90ea58c 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -170,6 +170,21 @@ void resendMessage(Result r, const Message& msg, Producer &producer) {
ASSERT_EQ(ResultOk, client.close());
}
+TEST(BasicEndToEndTest, testLookupThrottling) {
+ ClientConfiguration config;
+ config.setConcurrentLookupRequest(0);
+ Client client(lookupUrl, config);
+
+ Producer producer;
+ Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer);
+ ASSERT_EQ(ResultTooManyLookupRequestException, result);
+
+ Consumer consumer1;
+ result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1);
+ ASSERT_EQ(ResultTooManyLookupRequestException, result);
+
+}
+
TEST(BasicEndToEndTest, testNonExistingTopic)
{
Client client(lookupUrl);