diff --git a/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h new file mode 100644 index 0000000000000000000000000000000000000000..d49dfc2ecbde7b1805d987ab5b43b6ccbcac3e8f --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h @@ -0,0 +1,86 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H +#define PULSAR_CPP_BROKERCONSUMERSTATS_H + +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class BrokerConsumerStatsImplBase; + +/* @note: isValid() or getXXX() methods are not allowed on an invalid BrokerConsumerStats */ +class BrokerConsumerStats { + private: + boost::shared_ptr impl_; + public: + explicit BrokerConsumerStats(boost::shared_ptr impl); + + BrokerConsumerStats(); + + /** Returns true if the Stats are still valid **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + /** @deprecated */ + boost::shared_ptr getImpl() const; + + friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj); +}; +typedef boost::function BrokerConsumerStatsCallback; + +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index 598c6b1319a63f75e81e19c26fd528300c978977..f349175d802cf27d50126ac7790510f2bc987758 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -23,6 +23,8 @@ #include #include #include +#include +#include #pragma GCC visibility push(default) class PulsarFriend; @@ -37,80 +39,6 @@ typedef boost::function ResultCallback; /// Callback definition for MessageListener typedef boost::function MessageListener; -enum ConsumerType { - /** - * There can be only 1 consumer on the same topic with the same consumerName - */ - ConsumerExclusive, - - /** - * Multiple consumers will be able to use the same consumerName and the messages - * will be dispatched according to a round-robin rotation between the connected consumers - */ - ConsumerShared, - - /** Only one consumer is active on the subscription; Subscription can have N consumers - * connected one of which will get promoted to master if the current master becomes inactive - */ - - ConsumerFailover -}; - -class BrokerConsumerStats { - private: - /* - * validTillInMs_ - Stats will be valid till this time. - */ - boost::posix_time::ptime validTill_; - public: - BrokerConsumerStats(); - BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, int availablePermits, - int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog); - - /** Returns true if the Message is Expired **/ - bool isValid() const; - - /** Total rate of messages delivered to the consumer. msg/s */ - double msgRateOut_; - - /** Total throughput delivered to the consumer. bytes/s */ - double msgThroughputOut_; - - /** Total rate of messages redelivered by this consumer. msg/s */ - double msgRateRedeliver_; - - /** Name of the consumer */ - std::string consumerName_; - - /** Number of available message permits for the consumer */ - int availablePermits_; - - /** Number of unacknowledged messages for the consumer */ - int unackedMessages_; - - /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ - bool blockedConsumerOnUnackedMsgs_; - - /** Address of this consumer */ - std::string address_; - - /** Timestamp of connection */ - std::string connectedSince_; - - /// Whether this subscription is Exclusive or Shared or Failover - std::string type_; - - /// Total rate of messages expired on this subscription. msg/s - double msgRateExpired_; - - /// Number of messages in the subscription backlog - long msgBacklog_; - - friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj); -}; - /** * Class specifying the configuration of a consumer. */ @@ -182,6 +110,18 @@ class ConsumerConfiguration { * @return the configured timeout in milliseconds for unacked messages. */ long getUnAckedMessagesTimeoutMs() const; + + /** + * Set the time duration for which the broker side consumer stats will be cached in the client. + * @param cacheTimeInMs in milliseconds + */ + void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs); + + /** + * @return the configured timeout in milliseconds caching BrokerConsumerStats. + */ + long getBrokerConsumerStatsCacheTimeInMs() const; + private: struct Impl; boost::shared_ptr impl_; @@ -347,12 +287,21 @@ class Consumer { * still valid. * * @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats - * @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned. * * @note This is a blocking call with timeout of thirty seconds. */ - Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); + Result getBrokerConsumerStats(BrokerConsumerStats& brokerConsumerStats); + /** + * Asynchronous call to gets Consumer Stats from broker. + * The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires + * then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are + * still valid. + * + * @param callback - callback function to get the brokerConsumerStats, + * if result is ResultOk then the brokerConsumerStats will be populated + */ + void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); private: typedef boost::shared_ptr ConsumerImplBasePtr; friend class PulsarFriend; diff --git a/pulsar-client-cpp/include/pulsar/ConsumerType.h b/pulsar-client-cpp/include/pulsar/ConsumerType.h new file mode 100644 index 0000000000000000000000000000000000000000..9f0fdc97364f356d13de71c2405d654bd94fe3bc --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ConsumerType.h @@ -0,0 +1,40 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CPP_CONSUMERTYPE_H +#define PULSAR_CPP_CONSUMERTYPE_H + +namespace pulsar { + enum ConsumerType { + /** + * There can be only 1 consumer on the same topic with the same consumerName + */ + ConsumerExclusive, + + /** + * Multiple consumers will be able to use the same consumerName and the messages + * will be dispatched according to a round-robin rotation between the connected consumers + */ + ConsumerShared, + + /** Only one consumer is active on the subscription; Subscription can have N consumers + * connected one of which will get promoted to master if the current master becomes inactive + */ + ConsumerFailover + }; +} + +#endif //PULSAR_CPP_CONSUMERTYPE_H diff --git a/pulsar-client-cpp/lib/BrokerConsumerStats.cc b/pulsar-client-cpp/lib/BrokerConsumerStats.cc new file mode 100644 index 0000000000000000000000000000000000000000..eb5eb19a7177e2660faac9267a0a90d18f8901d8 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStats.cc @@ -0,0 +1,99 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace pulsar { +BrokerConsumerStats::BrokerConsumerStats(boost::shared_ptr impl) + : impl_(impl) { +} + +BrokerConsumerStats::BrokerConsumerStats() { +} + +boost::shared_ptr BrokerConsumerStats::getImpl() const { + return impl_; +} + +bool BrokerConsumerStats::isValid() const { + return impl_->isValid(); +} + +std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) { + os << "\nBrokerConsumerStats [" << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " + << obj.getMsgRateOut() << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() + << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() << ", consumerName_ = " + << obj.getConsumerName() << ", availablePermits_ = " << obj.getAvailablePermits() + << ", unackedMessages_ = " << obj.getUnackedMessages() + << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() + << ", address_ = " << obj.getAddress() << ", connectedSince_ = " << obj.getConnectedSince() + << ", type_ = " << obj.getType() << ", msgRateExpired_ = " << obj.getMsgRateExpired() + << ", msgBacklog_ = " << obj.getMsgBacklog() << "]"; + return os; +} + +double BrokerConsumerStats::getMsgRateOut() const { + if (impl_) { + return impl_->getMsgRateOut(); + } + return 0; +} + +double BrokerConsumerStats::getMsgThroughputOut() const { + return impl_->getMsgThroughputOut(); +} + +double BrokerConsumerStats::getMsgRateRedeliver() const { + return impl_->getMsgRateRedeliver(); +} + +const std::string BrokerConsumerStats::getConsumerName() const { + return impl_->getConsumerName(); +} + +uint64_t BrokerConsumerStats::getAvailablePermits() const { + return impl_->getAvailablePermits(); +} + +uint64_t BrokerConsumerStats::getUnackedMessages() const { + return impl_->getUnackedMessages(); +} + +bool BrokerConsumerStats::isBlockedConsumerOnUnackedMsgs() const { + return impl_->isBlockedConsumerOnUnackedMsgs(); +} + +const std::string BrokerConsumerStats::getAddress() const { + return impl_->getAddress(); +} + +const std::string BrokerConsumerStats::getConnectedSince() const { + return impl_->getConnectedSince(); +} + +const ConsumerType BrokerConsumerStats::getType() const { + return impl_->getType(); +} + +double BrokerConsumerStats::getMsgRateExpired() const { + return impl_->getMsgRateExpired(); +} + +uint64_t BrokerConsumerStats::getMsgBacklog() const { + return impl_->getMsgBacklog(); +} +} diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc new file mode 100644 index 0000000000000000000000000000000000000000..e6251e918d032e9c27035bcc0dbffb7982810a3b --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.cc @@ -0,0 +1,127 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +namespace pulsar { + BrokerConsumerStatsImpl::BrokerConsumerStatsImpl() : validTill_(boost::posix_time::microsec_clock::universal_time()) {}; + + BrokerConsumerStatsImpl::BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, + double msgRateRedeliver, std::string consumerName, + uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, + std::string address, + std::string connectedSince, const std::string& type, + double msgRateExpired, uint64_t msgBacklog) : + msgRateOut_(msgRateOut), + msgThroughputOut_(msgThroughputOut), + msgRateRedeliver_(msgRateRedeliver), + consumerName_(consumerName), + availablePermits_(availablePermits), + unackedMessages_(unackedMessages), + blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), + address_(address), + connectedSince_(connectedSince), + type_(convertStringToConsumerType(type)), + msgRateExpired_(msgRateExpired), + msgBacklog_(msgBacklog) {} + + bool BrokerConsumerStatsImpl::isValid() const { + return boost::posix_time::microsec_clock::universal_time() <= validTill_; + } + + std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl& obj) { + os << "\nBrokerConsumerStatsImpl [" + << "validTill_ = " << obj.isValid() + << ", msgRateOut_ = " << obj.getMsgRateOut() + << ", msgThroughputOut_ = " << obj.getMsgThroughputOut() + << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() + << ", consumerName_ = " << obj.getConsumerName() + << ", availablePermits_ = " << obj.getAvailablePermits() + << ", unackedMessages_ = " << obj.getUnackedMessages() + << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs() + << ", address_ = " << obj.getAddress() + << ", connectedSince_ = " << obj.getConnectedSince() + << ", type_ = " << obj.getType() + << ", msgRateExpired_ = " << obj.getMsgRateExpired() + << ", msgBacklog_ = " << obj.getMsgBacklog() + << "]"; + return os; + } + + double BrokerConsumerStatsImpl::getMsgRateOut() const { + return msgRateOut_; + } + + double BrokerConsumerStatsImpl::getMsgThroughputOut() const { + return msgThroughputOut_; + } + + double BrokerConsumerStatsImpl::getMsgRateRedeliver() const { + return msgRateRedeliver_; + } + + const std::string BrokerConsumerStatsImpl::getConsumerName() const { + return consumerName_; + } + + uint64_t BrokerConsumerStatsImpl::getAvailablePermits() const { + return availablePermits_; + } + + uint64_t BrokerConsumerStatsImpl::getUnackedMessages() const { + return unackedMessages_; + } + + bool BrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const { + return blockedConsumerOnUnackedMsgs_; + } + + const std::string BrokerConsumerStatsImpl::getAddress() const { + return address_; + } + + const std::string BrokerConsumerStatsImpl::getConnectedSince() const { + return connectedSince_; + } + + const ConsumerType BrokerConsumerStatsImpl::getType() const { + return type_; + } + + double BrokerConsumerStatsImpl::getMsgRateExpired() const { + return msgRateExpired_; + } + + uint64_t BrokerConsumerStatsImpl::getMsgBacklog() const { + return msgBacklog_; + } + + void BrokerConsumerStatsImpl::setCacheTime(uint64_t cacehTimeInMs) { + validTill_ = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(cacehTimeInMs); + } + + ConsumerType BrokerConsumerStatsImpl::convertStringToConsumerType(const std::string& str) { + if (str == "ConsumerFailover" || str == "Failover") { + return ConsumerFailover; + } else if (str == "ConsumerShared" || str == "Shared") { + return ConsumerShared; + } else { + return ConsumerExclusive; + } + } +} diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h new file mode 100644 index 0000000000000000000000000000000000000000..dbb35ca2aad74645f379f03c04d81759ca5eb0c2 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImpl.h @@ -0,0 +1,126 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H +#define PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H + +#include +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { + private: + /** validTill_ - Stats will be valid till this time.*/ + boost::posix_time::ptime validTill_; + + /** Total rate of messages delivered to the consumer. msg/s */ + double msgRateOut_; + + /** Total throughput delivered to the consumer. bytes/s */ + double msgThroughputOut_; + + /** Total rate of messages redelivered by this consumer. msg/s */ + double msgRateRedeliver_; + + /** Name of the consumer */ + std::string consumerName_; + + /** Number of available message permits for the consumer */ + uint64_t availablePermits_; + + /** Number of unacknowledged messages for the consumer */ + uint64_t unackedMessages_; + + /** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */ + bool blockedConsumerOnUnackedMsgs_; + + /** Address of this consumer */ + std::string address_; + + /** Timestamp of connection */ + std::string connectedSince_; + + /** Whether this subscription is Exclusive or Shared or Failover */ + ConsumerType type_; + + /** Total rate of messages expired on this subscription. msg/s */ + double msgRateExpired_; + + /** Number of messages in the subscription backlog */ + uint64_t msgBacklog_; + +public: + + BrokerConsumerStatsImpl(); + + BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, double msgRateRedeliver, + std::string consumerName, uint64_t availablePermits, + uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs, + std::string address, std::string connectedSince, const std::string& type, + double msgRateExpired, uint64_t msgBacklog); + + /** Returns true if the Stats are still valid **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + void setCacheTime(uint64_t cacehTimeInMs); + + friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl &obj); + + static ConsumerType convertStringToConsumerType(const std::string& str); +}; +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h b/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h new file mode 100644 index 0000000000000000000000000000000000000000..bbdfca4f2186360cf36893cb02eac14cfada9557 --- /dev/null +++ b/pulsar-client-cpp/lib/BrokerConsumerStatsImplBase.h @@ -0,0 +1,67 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H +#define PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H + +#include + +namespace pulsar { + class BrokerConsumerStatsImplBase { + public: + /** Returns true if the Stats are still valid **/ + virtual bool isValid() const = 0; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const = 0; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const = 0; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const = 0; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const = 0; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const = 0; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const = 0; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const = 0; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const = 0; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const = 0; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const = 0; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const = 0; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const = 0; + }; + typedef boost::shared_ptr BrokerConsumerStatsImplBasePtr; +} + +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index bcb35be7a64bdf6b8b224c1a217c19fda0b8db44..e3c668e0260e61287ec5b466ccc9cc7c2c7dbec1 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -124,7 +124,6 @@ havePendingPingRequest_(false), keepAliveTimer_(), maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()), consumerStatsRequestTimer_(executor_->createDeadlineTimer()), -consumerStatsTTLMs_(30 * 1000), numOfPendingLookupRequest_(0), isTlsAllowInsecureConnection_(false) { if (clientConfiguration.isUseTls()) { @@ -205,7 +204,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte } void ClientConnection::startConsumerStatsTimer(std::vector consumerStatsRequests) { - std::vector > consumerStatsPromises; + std::vector > consumerStatsPromises; Lock lock(mutex_); for (int i = 0; i < consumerStatsRequests.size(); i++) { @@ -715,7 +714,7 @@ void ClientConnection::handleIncomingCommand() { PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find( consumerStatsResponse.request_id()); if (it != pendingConsumerStatsMap_.end()) { - Promise consumerStatsPromise = it->second; + Promise consumerStatsPromise = it->second; pendingConsumerStatsMap_.erase(it); lock.unlock(); @@ -730,10 +729,7 @@ void ClientConnection::handleIncomingCommand() { cnxString_ << "ConsumerStatsResponse command - Received consumer stats response from server. req_id: " << consumerStatsResponse.request_id() << " Stats: "); - boost::posix_time::ptime validTill = now() + milliseconds(consumerStatsTTLMs_); - BrokerConsumerStats brokerStats = - BrokerConsumerStats(validTill, - consumerStatsResponse.msgrateout(), + BrokerConsumerStatsImpl brokerStats(consumerStatsResponse.msgrateout(), consumerStatsResponse.msgthroughputout(), consumerStatsResponse.msgrateredeliver(), consumerStatsResponse.consumername(), @@ -923,11 +919,11 @@ void ClientConnection::handleIncomingCommand() { } } -Future +Future ClientConnection::newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) { Lock lock(mutex_); - Promise promise; + Promise promise; if (isClosed()) { lock.unlock(); LOG_ERROR(cnxString_ << " Client is not connected to the broker"); diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 0617b46b58aa36999242ca522d90f841d382e180..981e70ac09fbb1bdb896a0d3136d0affa5f2df87 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -40,6 +40,7 @@ #include "UtilAllocator.h" #include #include +#include using namespace pulsar; @@ -132,11 +133,9 @@ class ClientConnection : public boost::enable_shared_from_this Commands::ChecksumType getChecksumType() const; - Future newConsumerStats(const std::string topicName, const std::string subscriptionName, + Future newConsumerStats(const std::string topicName, const std::string subscriptionName, uint64_t consumerId, uint64_t requestId) ; private: - long consumerStatsTTLMs_ ; - struct PendingRequestData { Promise promise; DeadlineTimerPtr timer; @@ -254,7 +253,7 @@ class ClientConnection : public boost::enable_shared_from_this typedef std::map ConsumersMap; ConsumersMap consumers_; - typedef std::map > PendingConsumerStatsMap; + typedef std::map > PendingConsumerStatsMap; PendingConsumerStatsMap pendingConsumerStatsMap_; diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index f05d0035853e293bfc34da04c7cdebb707012662..927a941bfe9d1bc1e1930f0b624557a15a019bcc 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -18,55 +18,13 @@ #include #include "ConsumerImpl.h" #include "Utils.h" +#include +#include namespace pulsar { const std::string EMPTY_STRING; -BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {}; - -BrokerConsumerStats::BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut, - double msgRateRedeliver, std::string consumerName, int availablePermits, - int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address, - std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog): - validTill_(validTill), - msgRateOut_(msgRateOut), - msgThroughputOut_(msgThroughputOut), - msgRateRedeliver_(msgRateRedeliver), - consumerName_(consumerName), - availablePermits_(availablePermits), - unackedMessages_(unackedMessages), - blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs), - address_(address), - connectedSince_(connectedSince), - type_(type), - msgRateExpired_(msgRateExpired), - msgBacklog_(msgBacklog) -{} - -bool BrokerConsumerStats::isValid() const { - return now() <= validTill_; -} - -std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj) { - os << "\nBrokerConsumerStats [" - << "validTill_ = " << obj.validTill_ - << ", msgRateOut_ = " << obj.msgRateOut_ - << ", msgThroughputOut_ = " << obj.msgThroughputOut_ - << ", msgRateRedeliver_ = " << obj.msgRateRedeliver_ - << ", consumerName_ = " << obj.consumerName_ - << ", availablePermits_ = " << obj.availablePermits_ - << ", unackedMessages_ = " << obj.unackedMessages_ - << ", blockedConsumerOnUnackedMsgs_ = " << obj.blockedConsumerOnUnackedMsgs_ - << ", address_ = " << obj.address_ - << ", connectedSince_ = " << obj.connectedSince_ - << ", type_ = " << obj.type_ - << ", msgRateExpired_ = " << obj.msgRateExpired_ - << ", msgBacklog_ = " << obj.msgBacklog_ - << "]"; - return os; -} - struct ConsumerConfiguration::Impl { long unAckedMessagesTimeoutMs; ConsumerType consumerType; @@ -74,11 +32,13 @@ struct ConsumerConfiguration::Impl { bool hasMessageListener; int receiverQueueSize; std::string consumerName; + long brokerConsumerStatsCacheTimeInMs; Impl() : unAckedMessagesTimeoutMs(0), consumerType(ConsumerExclusive), messageListener(), hasMessageListener(false), + brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds receiverQueueSize(1000) { } }; @@ -99,6 +59,14 @@ ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfigurat return *this; } +long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const { + return impl_->brokerConsumerStatsCacheTimeInMs; +} + +void ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs) { + impl_->brokerConsumerStatsCacheTimeInMs = cacheTimeInMs; +} + ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) { impl_->consumerType = consumerType; return *this; @@ -304,10 +272,19 @@ void Consumer::redeliverUnacknowledgedMessages() { } } -Result Consumer::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { +Result Consumer::getBrokerConsumerStats(BrokerConsumerStats& brokerConsumerStats) { if (!impl_) { return ResultConsumerNotInitialized; } - return impl_->getConsumerStats(BrokerConsumerStats, partitionIndex); + Promise promise; + getBrokerConsumerStatsAsync(WaitForCallbackValue(promise)); + return promise.getFuture().get(brokerConsumerStats); +} + +void Consumer::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + if (!impl_) { + return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); + } + return impl_->getBrokerConsumerStatsAsync(callback); } } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 3f9b8541d8eca9fbe5c975ef87bb141ed75f7634..9d04c6366ec60e1b17cad5108e5b86b233674271 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -687,22 +687,21 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const { return incomingMessages_.size(); } -Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex) { - if (partitionIndex != -1) { - LOG_WARN(getName() << "Ignoring the partitionIndex since the topic is not partitioned") - } - - if (!isOpen()) { +void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + if (state_ != Ready) { LOG_ERROR(getName() << "Client connection is not open, please try again later.") - return ResultConsumerNotInitialized; + lock.unlock(); + return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); } if (brokerConsumerStats_.isValid()) { LOG_DEBUG(getName() << "Serving data from cache"); - brokerConsumerStats = brokerConsumerStats_; - return ResultOk; + BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_; + lock.unlock(); + return callback(ResultOk, BrokerConsumerStats(boost::make_shared(brokerConsumerStats_))); } - + lock.unlock(); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { @@ -712,19 +711,30 @@ Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, LOG_DEBUG(getName() << " Sending ConsumerStats Command for Consumer - " << getConsumerId() << ", requestId - "<newConsumerStats(topic_, subscription_, consumerId_, requestId).get(consumerStats); - if (res == ResultOk) { - brokerConsumerStats = brokerConsumerStats_ = consumerStats; - } - return res; + cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).addListener( + boost::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(), _1, _2, callback)); + return; } else { LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v7"); - return ResultOperationNotSupported; + return callback(ResultUnsupportedVersionError, BrokerConsumerStats()); } } LOG_ERROR(getName() << " Client Connection not ready for Consumer"); - return ResultNotConnected; + return callback(ResultNotConnected, BrokerConsumerStats()); +} + +void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsImpl brokerConsumerStats + , BrokerConsumerStatsCallback callback) { + + if (res == ResultOk) { + Lock lock(mutex_); + brokerConsumerStats.setCacheTime(config_.getBrokerConsumerStatsCacheTimeInMs()); + brokerConsumerStats_ = brokerConsumerStats; + } + + if (!callback.empty()) { + callback(res, BrokerConsumerStats(boost::make_shared(brokerConsumerStats))); + } } } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index 15ba7539ec4ffe997ae0f2f639c0ed380949ee1f..237aebc19816a1e997746135e296d030fdaabdc2 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -36,6 +36,7 @@ #include #include "BatchAcknowledgementTracker.h" #include +#include using namespace pulsar; @@ -91,8 +92,8 @@ enum ConsumerTopicType { virtual Result pauseMessageListener(); virtual Result resumeMessageListener(); virtual void redeliverUnacknowledgedMessages(); - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1); -protected: + virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); + protected: void connectionOpened(const ClientConnectionPtr& cnx); void connectionFailed(Result result); void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result); @@ -114,6 +115,7 @@ private: void increaseAvailablePermits(const ClientConnectionPtr& currentCnx); void drainIncomingMessageQueue(size_t count); unsigned int receiveIndividualMessagesFromBatch(Message &batchedMessage); + void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback); boost::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; @@ -134,7 +136,7 @@ private: CompressionCodecProvider compressionCodecProvider_; UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_; BatchAcknowledgementTracker batchAcknowledgementTracker_; - BrokerConsumerStats brokerConsumerStats_; + BrokerConsumerStatsImpl brokerConsumerStats_; }; } /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h index 5d48317f8fdc4f9b0c393d162e01a65353b37506..c7ee712af570f72be9bb7f868b191db292dbc878 100644 --- a/pulsar-client-cpp/lib/ConsumerImplBase.h +++ b/pulsar-client-cpp/lib/ConsumerImplBase.h @@ -47,7 +47,7 @@ public: virtual void redeliverUnacknowledgedMessages() = 0; virtual const std::string& getName() const = 0; virtual int getNumOfPrefetchedMessages() const = 0; - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1) = 0; + virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0; }; } #endif //PULSAR_CONSUMER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc new file mode 100644 index 0000000000000000000000000000000000000000..080fbb0956e0d8d6d1be4e1fc9da2f4fbc59f82a --- /dev/null +++ b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc @@ -0,0 +1,167 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +namespace pulsar { + + const std::string PartitionedBrokerConsumerStatsImpl::DELIMITER = ";"; + + PartitionedBrokerConsumerStatsImpl::PartitionedBrokerConsumerStatsImpl(size_t size) { + statsList_.resize(size); + } + + bool PartitionedBrokerConsumerStatsImpl::isValid() const { + bool isValid = true; + for (int i = 0; i +#include +#include +#include +#include +#include +#include +#include +#include + +namespace pulsar { +class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase { + private: + std::vector statsList_; + static const std::string DELIMITER; + public: + + PartitionedBrokerConsumerStatsImpl(size_t size); + + /** Returns true if the Stats are still valid **/ + virtual bool isValid() const; + + /** Returns the rate of messages delivered to the consumer. msg/s */ + virtual double getMsgRateOut() const; + + /** Returns the throughput delivered to the consumer. bytes/s */ + virtual double getMsgThroughputOut() const; + + /** Returns the rate of messages redelivered by this consumer. msg/s */ + virtual double getMsgRateRedeliver() const; + + /** Returns the Name of the consumer */ + virtual const std::string getConsumerName() const; + + /** Returns the Number of available message permits for the consumer */ + virtual uint64_t getAvailablePermits() const; + + /** Returns the Number of unacknowledged messages for the consumer */ + virtual uint64_t getUnackedMessages() const; + + /** Returns true if the consumer is blocked due to unacked messages. */ + virtual bool isBlockedConsumerOnUnackedMsgs() const; + + /** Returns the Address of this consumer */ + virtual const std::string getAddress() const; + + /** Returns the Timestamp of connection */ + virtual const std::string getConnectedSince() const; + + /** Returns Whether this subscription is Exclusive or Shared or Failover */ + virtual const ConsumerType getType() const; + + /** Returns the rate of messages expired on this subscription. msg/s */ + virtual double getMsgRateExpired() const; + + /** Returns the Number of messages in the subscription backlog */ + virtual uint64_t getMsgBacklog() const; + + /** Returns the BrokerConsumerStatsImpl at of ith partition */ + BrokerConsumerStats getBrokerConsumerStats(int index); + + void add(BrokerConsumerStats stats, int index); + + void clear(); + + friend std::ostream& operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj); +}; +typedef boost::shared_ptr PartitionedBrokerConsumerStatsPtr; + +} +#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index 5de6993902c5ca58f8eea1cb0f2a7ce7e6a2d016..cf71dda0bbc2a5fc23a64a7109e07218b16c7a9f 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -15,11 +15,6 @@ */ #include "PartitionedConsumerImpl.h" -#include "LogUtils.h" -#include -#include "pulsar/Result.h" -#include "MessageImpl.h" -#include "Utils.h" DECLARE_LOG_OBJECT() @@ -176,6 +171,7 @@ namespace pulsar { // all the partitioned-consumer belonging to one partitioned topic should have same name config.setConsumerName(conf_.getConsumerName()); config.setConsumerType(conf_.getConsumerType()); + config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs()); config.setMessageListener(boost::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(), _1, _2)); // create consumer on each partition for (unsigned int i = 0; i < numPartitions_; i++ ) { @@ -380,12 +376,37 @@ namespace pulsar { return messages_.size(); } - Result PartitionedConsumerImpl::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) { - if (partitionIndex >= numPartitions_ && partitionIndex < 0 && consumers_.size() <= partitionIndex) - { - LOG_ERROR(getName() << " PartitionIndex must be positive and less than number of partitiones") - return ResultInvalidConfiguration; + void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + if (state_ != Ready) { + lock.unlock(); + return callback(ResultConsumerNotInitialized, BrokerConsumerStats()); + } + PartitionedBrokerConsumerStatsPtr statsPtr = boost::make_shared(numPartitions_); + LatchPtr latchPtr = boost::make_shared(numPartitions_); + ConsumerList consumerList = consumers_; + lock.unlock(); + for (int i = 0; igetBrokerConsumerStatsAsync(boost::bind(&PartitionedConsumerImpl::handleGetConsumerStats, + shared_from_this(), _1, _2, latchPtr, + statsPtr, i, callback)); + } + } + + void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats, + LatchPtr latchPtr, PartitionedBrokerConsumerStatsPtr statsPtr, + size_t index, BrokerConsumerStatsCallback callback) { + Lock lock(mutex_); + if (res == ResultOk) { + latchPtr->countdown(); + statsPtr->add(brokerConsumerStats, index); + } else { + lock.unlock(); + return callback(res, BrokerConsumerStats()); + } + if (latchPtr->getCount() == 0) { + lock.unlock(); + callback(ResultOk, BrokerConsumerStats(statsPtr)); } - return consumers_[partitionIndex]->getConsumerStats(BrokerConsumerStats); } } diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h index 3338adda125a2f1c27084facd8b073dcff760e30..cb5e034c7674c922f3ab498bdd5d39741c3235d7 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h @@ -25,6 +25,9 @@ #include "boost/enable_shared_from_this.hpp" #include "ConsumerImplBase.h" #include "lib/UnAckedMessageTrackerDisabled.h" +#include +#include + namespace pulsar { class PartitionedConsumerImpl; class PartitionedConsumerImpl: public ConsumerImplBase, public boost::enable_shared_from_this { @@ -60,7 +63,10 @@ namespace pulsar { virtual void redeliverUnacknowledgedMessages(); virtual const std::string& getName() const; virtual int getNumOfPrefetchedMessages() const ; - virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex); + virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback); + void handleGetConsumerStats(Result , BrokerConsumerStats, LatchPtr, + PartitionedBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback); + private: const ClientImplPtr client_; const std::string subscriptionName_; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 3225dee3daf29abf41fd0a42dc8db92650c94552..02868cf8786eeabbc0a9eed82dcc0f0d63681b5a 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -281,7 +281,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { // reserving a spot and going forward - not blocking if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) { LOG_DEBUG(getName() << " - Producer Queue is full"); - // If queue is full sending the batch immediately, no point waiting till batchMessageimeout + // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout if (batchMessageContainer) { LOG_DEBUG(getName() << " - sending batch message immediately"); batchMessageContainer->sendMessage(); diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc index 02d05e6c482668b7584d215155daec62f19c1085..4fe8fedf6b698617e302b972f3b7203bdde23a65 100644 --- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc +++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc @@ -18,21 +18,18 @@ #include #include #include -#include -#include "DestinationName.h" +#include #include -#include #include "boost/date_time/posix_time/posix_time.hpp" #include "CustomRoutingPolicy.h" #include #include "lib/Future.h" #include "lib/Utils.h" -#include -#include "LogUtils.h" #include "PulsarFriend.h" -#include #include "ConsumerTest.h" #include "HttpHelper.h" +#include +#include DECLARE_LOG_OBJECT(); using namespace pulsar; @@ -40,23 +37,39 @@ using namespace pulsar; static std::string lookupUrl = "http://localhost:8765"; static std::string adminUrl = "http://localhost:8765/"; +void partitionedCallbackFunction(Result result, BrokerConsumerStats brokerConsumerStats, long expectedBacklog, Latch& latch, int index) { + ASSERT_EQ(result, ResultOk); + PartitionedBrokerConsumerStatsImpl* statsPtr = (PartitionedBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get()); + LOG_DEBUG(statsPtr); + ASSERT_EQ(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog()); + latch.countdown(); +} +void simpleCallbackFunction(Result result, BrokerConsumerStats& brokerConsumerStats, Result expectedResult, + uint64_t expectedBacklog, ConsumerType expectedConsumerType) { + LOG_DEBUG(brokerConsumerStats); + ASSERT_EQ(result, expectedResult); + ASSERT_EQ(brokerConsumerStats.getMsgBacklog(), expectedBacklog); + ASSERT_EQ(brokerConsumerStats.getType(), expectedConsumerType); +} TEST(ConsumerStatsTest, testBacklogInfo) { long epochTime=time(NULL); std::string testName="testBacklogInfo-" + boost::lexical_cast(epochTime); Client client(lookupUrl); std::string topicName = "persistent://property/cluster/namespace/" + testName; std::string subName = "subscription-name"; + ConsumerConfiguration conf; + conf.setBrokerConsumerStatsCacheTimeInMs(3 * 1000); Consumer consumer; Promise consumerPromise; - client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); + client.subscribeAsync(topicName, subName, conf, WaitForCallbackValue(consumerPromise)); Future consumerFuture = consumerPromise.getFuture(); Result result = consumerFuture.get(consumer); ASSERT_EQ(ResultOk, result); // handling dangling subscriptions consumer.unsubscribe(); - client.subscribe(topicName, subName, consumer); + client.subscribe(topicName, subName, conf, consumer); // Producing messages Producer producer; @@ -74,12 +87,8 @@ TEST(ConsumerStatsTest, testBacklogInfo) { producer.send(msg); } - BrokerConsumerStats consumerStats; - Result res = consumer.getConsumerStats(consumerStats); - ASSERT_EQ(res, ResultOk); - - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + LOG_DEBUG("Calling consumer.getBrokerConsumerStats"); + consumer.getBrokerConsumerStatsAsync(boost::bind(simpleCallbackFunction, _1, _2, ResultOk, numOfMessages, ConsumerExclusive)); for (int i = numOfMessages; i<(numOfMessages*2); i++) { std::string messageContent = prefix + boost::lexical_cast(i); @@ -87,12 +96,13 @@ TEST(ConsumerStatsTest, testBacklogInfo) { producer.send(msg); } - usleep(35 * 1000 * 1000); - res = consumer.getConsumerStats(consumerStats); + usleep(3.5 * 1000 * 1000); + BrokerConsumerStats consumerStats; + Result res = consumer.getBrokerConsumerStats(consumerStats); ASSERT_EQ(res, ResultOk); - LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, 2 * numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), 2 * numOfMessages); + ASSERT_EQ(consumerStats.getType(), ConsumerExclusive); consumer.unsubscribe(); } @@ -106,14 +116,14 @@ TEST(ConsumerStatsTest, testFailure) { Promise consumerPromise; BrokerConsumerStats consumerStats; client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); Future consumerFuture = consumerPromise.getFuture(); Result result = consumerFuture.get(consumer); ASSERT_EQ(ResultOk, result); // handling dangling subscriptions consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); client.subscribe(topicName, subName, consumer); // Producing messages @@ -132,13 +142,13 @@ TEST(ConsumerStatsTest, testFailure) { producer.send(msg); } - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); } TEST(ConsumerStatsTest, testCachingMechanism) { @@ -147,19 +157,21 @@ TEST(ConsumerStatsTest, testCachingMechanism) { Client client(lookupUrl); std::string topicName = "persistent://property/cluster/namespace/" + testName; std::string subName = "subscription-name"; + ConsumerConfiguration conf; + conf.setBrokerConsumerStatsCacheTimeInMs(3.5 * 1000); Consumer consumer; Promise consumerPromise; BrokerConsumerStats consumerStats; - client.subscribeAsync(topicName, subName, WaitForCallbackValue(consumerPromise)); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); + client.subscribeAsync(topicName, subName, conf, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); Future consumerFuture = consumerPromise.getFuture(); Result result = consumerFuture.get(consumer); ASSERT_EQ(ResultOk, result); // handling dangling subscriptions consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); - client.subscribe(topicName, subName, consumer); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); + client.subscribe(topicName, subName, conf, consumer); // Producing messages Producer producer; @@ -177,10 +189,10 @@ TEST(ConsumerStatsTest, testCachingMechanism) { producer.send(msg); } - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); for (int i = numOfMessages; i<(numOfMessages*2); i++) { std::string messageContent = prefix + boost::lexical_cast(i); @@ -190,26 +202,105 @@ TEST(ConsumerStatsTest, testCachingMechanism) { LOG_DEBUG("Expecting cached results"); ASSERT_TRUE(consumerStats.isValid()); - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); LOG_DEBUG("Still Expecting cached results"); - usleep(10 * 1000 * 1000); + usleep(1 * 1000 * 1000); ASSERT_TRUE(consumerStats.isValid()); - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages); LOG_DEBUG("Now expecting new results"); - usleep(25 * 1000 * 1000); + usleep(3 * 1000 * 1000); ASSERT_FALSE(consumerStats.isValid()); - ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats)); + ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); LOG_DEBUG(consumerStats); - ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages * 2); + ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages * 2); consumer.unsubscribe(); - ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats)); -} \ No newline at end of file + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); +} + + +TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) { + long epochTime=time(NULL); + std::string testName="testAsyncCallOnPartitionedTopic-" + boost::lexical_cast(epochTime); + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/" + testName; + std::string subName = "subscription-name"; + + // call admin api to create partitioned topics + std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions"; + int res = makePutRequest(url, "7"); + + LOG_INFO("res = "< consumerPromise; + BrokerConsumerStats consumerStats; + client.subscribeAsync(topicName, subName, conf, WaitForCallbackValue(consumerPromise)); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); + Future consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // handling dangling subscriptions + consumer.unsubscribe(); + ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats)); + client.subscribe(topicName, subName, conf, consumer); + + // Producing messages + Producer producer; + int numOfMessages = 7 * 5; // 5 message per partition + Promise producerPromise; + ProducerConfiguration config; + config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + client.createProducerAsync(topicName, config, WaitForCallbackValue(producerPromise)); + Future producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + std::string prefix = testName + "-"; + for (int i = 0; i(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting return from 4 callbacks + Latch latch(4); + consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 5, latch, 0)); + + // Now we have 10 messages per partition + for (int i = numOfMessages; i<(numOfMessages*2); i++) { + std::string messageContent = prefix + boost::lexical_cast(i); + Message msg = MessageBuilder().build(); + producer.send(msg); + } + + // Expecting cached result + consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 5, latch, 0)); + + usleep(4.5 * 1000 * 1000); + // Expecting fresh results + consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 2)); + + Message msg; + while (consumer.receive(msg)) { + // Do nothing + } + + // Expecting the backlog to be the same since we didn't acknowledge the messages + consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 3)); + + // Wait for ten seconds only + ASSERT_TRUE(latch.wait(milliseconds(10 * 1000))); +}