提交 4ac21b4c 编写于 作者: R Rajan 提交者: GitHub

Add lookup throttling (#225)

上级 d62cf072
...@@ -99,6 +99,20 @@ class ClientConfiguration { ...@@ -99,6 +99,20 @@ class ClientConfiguration {
*/ */
int getMessageListenerThreads() const; int getMessageListenerThreads() const;
/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
* thousands of topic using created {@link PulsarClient}
*
* @param concurrentLookupRequest
*/
ClientConfiguration& setConcurrentLookupRequest(int concurrentLookupRequest);
/**
* @return Get configured total allowed concurrent lookup-request.
*/
int getConcurrentLookupRequest() const;
/** /**
* Initialize the log configuration * Initialize the log configuration
* *
......
...@@ -54,6 +54,7 @@ enum Result { ...@@ -54,6 +54,7 @@ enum Result {
ResultConsumerNotInitialized, /// Consumer is not initialized ResultConsumerNotInitialized, /// Consumer is not initialized
ResultProducerNotInitialized, /// Producer is not initialized ResultProducerNotInitialized, /// Producer is not initialized
ResultTooManyLookupRequestException, /// Too Many concurrent LookupRequest
ResultInvalidTopicName, /// Invalid topic name ResultInvalidTopicName, /// Invalid topic name
ResultInvalidUrl, /// Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor) ResultInvalidUrl, /// Client Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
......
...@@ -125,15 +125,15 @@ namespace pulsar { ...@@ -125,15 +125,15 @@ namespace pulsar {
LookupDataResultPromisePtr promise) { LookupDataResultPromisePtr promise) {
if (data) { if (data) {
if(data->isRedirect()) { if(data->isRedirect()) {
LOG_DEBUG("Lookup request is for " << destinationName << " redirected to " << data->getBrokerUrl()); LOG_DEBUG("PartitionMetadataLookup request is for " << destinationName << " redirected to " << data->getBrokerUrl());
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(data->getBrokerUrl()); Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(data->getBrokerUrl());
future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, destinationName, _1, _2, promise)); future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, destinationName, _1, _2, promise));
} else { } else {
LOG_DEBUG("Lookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl()); LOG_DEBUG("PartitionMetadataLookup response for " << destinationName << ", lookup-broker-url " << data->getBrokerUrl());
promise->setValue(data); promise->setValue(data);
} }
} else { } else {
LOG_DEBUG("Lookup failed for " << destinationName << ", result " << result); LOG_DEBUG("PartitionMetadataLookup failed for " << destinationName << ", result " << result);
promise->setFailed(result); promise->setFailed(result);
} }
} }
......
...@@ -35,6 +35,7 @@ struct ClientConfiguration::Impl { ...@@ -35,6 +35,7 @@ struct ClientConfiguration::Impl {
int ioThreads; int ioThreads;
int operationTimeoutSeconds; int operationTimeoutSeconds;
int messageListenerThreads; int messageListenerThreads;
int concurrentLookupRequest;
std::string logConfFilePath; std::string logConfFilePath;
bool useTls; bool useTls;
std::string tlsTrustCertsFilePath; std::string tlsTrustCertsFilePath;
...@@ -43,6 +44,7 @@ struct ClientConfiguration::Impl { ...@@ -43,6 +44,7 @@ struct ClientConfiguration::Impl {
ioThreads(1), ioThreads(1),
operationTimeoutSeconds(30), operationTimeoutSeconds(30),
messageListenerThreads(1), messageListenerThreads(1),
concurrentLookupRequest(5000),
logConfFilePath() {} logConfFilePath() {}
}; };
...@@ -129,6 +131,15 @@ bool ClientConfiguration::isTlsAllowInsecureConnection() const { ...@@ -129,6 +131,15 @@ bool ClientConfiguration::isTlsAllowInsecureConnection() const {
return impl_->tlsAllowInsecureConnection; return impl_->tlsAllowInsecureConnection;
} }
ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurrentLookupRequest) {
impl_->concurrentLookupRequest = concurrentLookupRequest;
return *this;
}
int ClientConfiguration::getConcurrentLookupRequest() const {
return impl_->concurrentLookupRequest;
}
ClientConfiguration& ClientConfiguration::setLogConfFilePath(const std::string& logConfFilePath) { ClientConfiguration& ClientConfiguration::setLogConfFilePath(const std::string& logConfFilePath) {
impl_->logConfFilePath = logConfFilePath; impl_->logConfFilePath = logConfFilePath;
return *this; return *this;
......
...@@ -110,6 +110,8 @@ outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), ...@@ -110,6 +110,8 @@ outgoingBuffer_(SharedBuffer::allocate(DefaultBufferSize)),
outgoingCmd_(), outgoingCmd_(),
havePendingPingRequest_(false), havePendingPingRequest_(false),
keepAliveTimer_(), keepAliveTimer_(),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) { isTlsAllowInsecureConnection_(false) {
if (clientConfiguration.isUseTls()) { if (clientConfiguration.isUseTls()) {
using namespace boost::filesystem; using namespace boost::filesystem;
...@@ -623,6 +625,7 @@ void ClientConnection::handleIncomingCommand() { ...@@ -623,6 +625,7 @@ void ClientConnection::handleIncomingCommand() {
if (it != pendingLookupRequests_.end()) { if (it != pendingLookupRequests_.end()) {
LookupDataResultPromisePtr lookupDataPromise = it->second; LookupDataResultPromisePtr lookupDataPromise = it->second;
pendingLookupRequests_.erase(it); pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock(); lock.unlock();
if (!partitionMetadataResponse.has_response() if (!partitionMetadataResponse.has_response()
...@@ -661,6 +664,7 @@ void ClientConnection::handleIncomingCommand() { ...@@ -661,6 +664,7 @@ void ClientConnection::handleIncomingCommand() {
if (it != pendingLookupRequests_.end()) { if (it != pendingLookupRequests_.end()) {
LookupDataResultPromisePtr lookupDataPromise = it->second; LookupDataResultPromisePtr lookupDataPromise = it->second;
pendingLookupRequests_.erase(it); pendingLookupRequests_.erase(it);
numOfPendingLookupRequest_--;
lock.unlock(); lock.unlock();
if (!lookupTopicResponse.has_response() if (!lookupTopicResponse.has_response()
...@@ -838,8 +842,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request ...@@ -838,8 +842,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
if (isClosed()) { if (isClosed()) {
lock.unlock(); lock.unlock();
promise->setFailed(ResultNotConnected); promise->setFailed(ResultNotConnected);
return;
} else if (numOfPendingLookupRequest_ >= maxPendingLookupRequest_) {
lock.unlock();
promise->setFailed(ResultTooManyLookupRequestException);
return;
} }
pendingLookupRequests_.insert(std::make_pair(requestId, promise)); pendingLookupRequests_.insert(std::make_pair(requestId, promise));
numOfPendingLookupRequest_++;
lock.unlock(); lock.unlock();
sendCommand(cmd); sendCommand(cmd);
} }
...@@ -1013,10 +1023,12 @@ void ClientConnection::close() { ...@@ -1013,10 +1023,12 @@ void ClientConnection::close() {
} }
// Fail all pending lookup-requests on the connection // Fail all pending lookup-requests on the connection
lock.lock();
for (PendingLookupRequestsMap::iterator it = pendingLookupRequests_.begin(); it != pendingLookupRequests_.end(); ++it) { for (PendingLookupRequestsMap::iterator it = pendingLookupRequests_.begin(); it != pendingLookupRequests_.end(); ++it) {
it->second->setFailed(ResultConnectError); it->second->setFailed(ResultConnectError);
numOfPendingLookupRequest_--;
} }
lock.unlock();
if (tlsSocket_) { if (tlsSocket_) {
tlsSocket_->lowest_layer().close(); tlsSocket_->lowest_layer().close();
} }
......
...@@ -267,6 +267,9 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> ...@@ -267,6 +267,9 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
bool havePendingPingRequest_; bool havePendingPingRequest_;
DeadlineTimerPtr keepAliveTimer_; DeadlineTimerPtr keepAliveTimer_;
uint32_t maxPendingLookupRequest_;
uint32_t numOfPendingLookupRequest_;
friend class PulsarFriend; friend class PulsarFriend;
bool isTlsAllowInsecureConnection_; bool isTlsAllowInsecureConnection_;
......
...@@ -170,6 +170,21 @@ void resendMessage(Result r, const Message& msg, Producer &producer) { ...@@ -170,6 +170,21 @@ void resendMessage(Result r, const Message& msg, Producer &producer) {
ASSERT_EQ(ResultOk, client.close()); ASSERT_EQ(ResultOk, client.close());
} }
TEST(BasicEndToEndTest, testLookupThrottling) {
ClientConfiguration config;
config.setConcurrentLookupRequest(0);
Client client(lookupUrl, config);
Producer producer;
Result result = client.createProducer("persistent://prop/unit/ns1/my-topic-1", producer);
ASSERT_EQ(ResultTooManyLookupRequestException, result);
Consumer consumer1;
result = client.subscribe("persistent://prop/unit/ns1/my-topic-1", "my-sub-name", consumer1);
ASSERT_EQ(ResultTooManyLookupRequestException, result);
}
TEST(BasicEndToEndTest, testNonExistingTopic) TEST(BasicEndToEndTest, testNonExistingTopic)
{ {
Client client(lookupUrl); Client client(lookupUrl);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册