提交 2d07cd58 编写于 作者: J jai1 提交者: GitHub

CPP Client - Async call for getting broker side consumer stats

上级 911a9b67
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H
#define PULSAR_CPP_BROKERCONSUMERSTATS_H
#include <boost/date_time/posix_time/ptime.hpp>
#include <string.h>
#include <iostream>
#include <pulsar/Result.h>
#include <boost/function.hpp>
#include <pulsar/ConsumerType.h>
namespace pulsar {
class BrokerConsumerStatsImplBase;
/* @note: isValid() or getXXX() methods are not allowed on an invalid BrokerConsumerStats */
class BrokerConsumerStats {
private:
boost::shared_ptr<BrokerConsumerStatsImplBase> impl_;
public:
explicit BrokerConsumerStats(boost::shared_ptr<BrokerConsumerStatsImplBase> impl);
BrokerConsumerStats();
/** Returns true if the Stats are still valid **/
virtual bool isValid() const;
/** Returns the rate of messages delivered to the consumer. msg/s */
virtual double getMsgRateOut() const;
/** Returns the throughput delivered to the consumer. bytes/s */
virtual double getMsgThroughputOut() const;
/** Returns the rate of messages redelivered by this consumer. msg/s */
virtual double getMsgRateRedeliver() const;
/** Returns the Name of the consumer */
virtual const std::string getConsumerName() const;
/** Returns the Number of available message permits for the consumer */
virtual uint64_t getAvailablePermits() const;
/** Returns the Number of unacknowledged messages for the consumer */
virtual uint64_t getUnackedMessages() const;
/** Returns true if the consumer is blocked due to unacked messages. */
virtual bool isBlockedConsumerOnUnackedMsgs() const;
/** Returns the Address of this consumer */
virtual const std::string getAddress() const;
/** Returns the Timestamp of connection */
virtual const std::string getConnectedSince() const;
/** Returns Whether this subscription is Exclusive or Shared or Failover */
virtual const ConsumerType getType() const;
/** Returns the rate of messages expired on this subscription. msg/s */
virtual double getMsgRateExpired() const;
/** Returns the Number of messages in the subscription backlog */
virtual uint64_t getMsgBacklog() const;
/** @deprecated */
boost::shared_ptr<BrokerConsumerStatsImplBase> getImpl() const;
friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj);
};
typedef boost::function<void(Result result, BrokerConsumerStats brokerConsumerStats)> BrokerConsumerStatsCallback;
}
#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H
......@@ -23,6 +23,8 @@
#include <pulsar/Result.h>
#include <boost/date_time/posix_time/ptime.hpp>
#include <iostream>
#include <pulsar/ConsumerType.h>
#include <pulsar/BrokerConsumerStats.h>
#pragma GCC visibility push(default)
class PulsarFriend;
......@@ -37,80 +39,6 @@ typedef boost::function<void(Result result)> ResultCallback;
/// Callback definition for MessageListener
typedef boost::function<void(Consumer consumer, const Message& msg)> MessageListener;
enum ConsumerType {
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
ConsumerExclusive,
/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
ConsumerShared,
/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
ConsumerFailover
};
class BrokerConsumerStats {
private:
/*
* validTillInMs_ - Stats will be valid till this time.
*/
boost::posix_time::ptime validTill_;
public:
BrokerConsumerStats();
BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName, int availablePermits,
int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address,
std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog);
/** Returns true if the Message is Expired **/
bool isValid() const;
/** Total rate of messages delivered to the consumer. msg/s */
double msgRateOut_;
/** Total throughput delivered to the consumer. bytes/s */
double msgThroughputOut_;
/** Total rate of messages redelivered by this consumer. msg/s */
double msgRateRedeliver_;
/** Name of the consumer */
std::string consumerName_;
/** Number of available message permits for the consumer */
int availablePermits_;
/** Number of unacknowledged messages for the consumer */
int unackedMessages_;
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
bool blockedConsumerOnUnackedMsgs_;
/** Address of this consumer */
std::string address_;
/** Timestamp of connection */
std::string connectedSince_;
/// Whether this subscription is Exclusive or Shared or Failover
std::string type_;
/// Total rate of messages expired on this subscription. msg/s
double msgRateExpired_;
/// Number of messages in the subscription backlog
long msgBacklog_;
friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj);
};
/**
* Class specifying the configuration of a consumer.
*/
......@@ -182,6 +110,18 @@ class ConsumerConfiguration {
* @return the configured timeout in milliseconds for unacked messages.
*/
long getUnAckedMessagesTimeoutMs() const;
/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
*/
void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);
/**
* @return the configured timeout in milliseconds caching BrokerConsumerStats.
*/
long getBrokerConsumerStatsCacheTimeInMs() const;
private:
struct Impl;
boost::shared_ptr<Impl> impl_;
......@@ -347,12 +287,21 @@ class Consumer {
* still valid.
*
* @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats
* @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned.
*
* @note This is a blocking call with timeout of thirty seconds.
*/
Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1);
Result getBrokerConsumerStats(BrokerConsumerStats& brokerConsumerStats);
/**
* Asynchronous call to gets Consumer Stats from broker.
* The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires
* then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are
* still valid.
*
* @param callback - callback function to get the brokerConsumerStats,
* if result is ResultOk then the brokerConsumerStats will be populated
*/
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
private:
typedef boost::shared_ptr<ConsumerImplBase> ConsumerImplBasePtr;
friend class PulsarFriend;
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_CONSUMERTYPE_H
#define PULSAR_CPP_CONSUMERTYPE_H
namespace pulsar {
enum ConsumerType {
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
ConsumerExclusive,
/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
ConsumerShared,
/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
ConsumerFailover
};
}
#endif //PULSAR_CPP_CONSUMERTYPE_H
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <pulsar/BrokerConsumerStats.h>
#include <lib/BrokerConsumerStatsImplBase.h>
namespace pulsar {
BrokerConsumerStats::BrokerConsumerStats(boost::shared_ptr<BrokerConsumerStatsImplBase> impl)
: impl_(impl) {
}
BrokerConsumerStats::BrokerConsumerStats() {
}
boost::shared_ptr<BrokerConsumerStatsImplBase> BrokerConsumerStats::getImpl() const {
return impl_;
}
bool BrokerConsumerStats::isValid() const {
return impl_->isValid();
}
std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) {
os << "\nBrokerConsumerStats [" << "validTill_ = " << obj.isValid() << ", msgRateOut_ = "
<< obj.getMsgRateOut() << ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
<< ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver() << ", consumerName_ = "
<< obj.getConsumerName() << ", availablePermits_ = " << obj.getAvailablePermits()
<< ", unackedMessages_ = " << obj.getUnackedMessages()
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
<< ", address_ = " << obj.getAddress() << ", connectedSince_ = " << obj.getConnectedSince()
<< ", type_ = " << obj.getType() << ", msgRateExpired_ = " << obj.getMsgRateExpired()
<< ", msgBacklog_ = " << obj.getMsgBacklog() << "]";
return os;
}
double BrokerConsumerStats::getMsgRateOut() const {
if (impl_) {
return impl_->getMsgRateOut();
}
return 0;
}
double BrokerConsumerStats::getMsgThroughputOut() const {
return impl_->getMsgThroughputOut();
}
double BrokerConsumerStats::getMsgRateRedeliver() const {
return impl_->getMsgRateRedeliver();
}
const std::string BrokerConsumerStats::getConsumerName() const {
return impl_->getConsumerName();
}
uint64_t BrokerConsumerStats::getAvailablePermits() const {
return impl_->getAvailablePermits();
}
uint64_t BrokerConsumerStats::getUnackedMessages() const {
return impl_->getUnackedMessages();
}
bool BrokerConsumerStats::isBlockedConsumerOnUnackedMsgs() const {
return impl_->isBlockedConsumerOnUnackedMsgs();
}
const std::string BrokerConsumerStats::getAddress() const {
return impl_->getAddress();
}
const std::string BrokerConsumerStats::getConnectedSince() const {
return impl_->getConnectedSince();
}
const ConsumerType BrokerConsumerStats::getType() const {
return impl_->getType();
}
double BrokerConsumerStats::getMsgRateExpired() const {
return impl_->getMsgRateExpired();
}
uint64_t BrokerConsumerStats::getMsgBacklog() const {
return impl_->getMsgBacklog();
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/BrokerConsumerStatsImpl.h>
#include <boost/date_time/local_time/local_time.hpp>
namespace pulsar {
BrokerConsumerStatsImpl::BrokerConsumerStatsImpl() : validTill_(boost::posix_time::microsec_clock::universal_time()) {};
BrokerConsumerStatsImpl::BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName,
uint64_t availablePermits,
uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs,
std::string address,
std::string connectedSince, const std::string& type,
double msgRateExpired, uint64_t msgBacklog) :
msgRateOut_(msgRateOut),
msgThroughputOut_(msgThroughputOut),
msgRateRedeliver_(msgRateRedeliver),
consumerName_(consumerName),
availablePermits_(availablePermits),
unackedMessages_(unackedMessages),
blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs),
address_(address),
connectedSince_(connectedSince),
type_(convertStringToConsumerType(type)),
msgRateExpired_(msgRateExpired),
msgBacklog_(msgBacklog) {}
bool BrokerConsumerStatsImpl::isValid() const {
return boost::posix_time::microsec_clock::universal_time() <= validTill_;
}
std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl& obj) {
os << "\nBrokerConsumerStatsImpl ["
<< "validTill_ = " << obj.isValid()
<< ", msgRateOut_ = " << obj.getMsgRateOut()
<< ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
<< ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
<< ", consumerName_ = " << obj.getConsumerName()
<< ", availablePermits_ = " << obj.getAvailablePermits()
<< ", unackedMessages_ = " << obj.getUnackedMessages()
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
<< ", address_ = " << obj.getAddress()
<< ", connectedSince_ = " << obj.getConnectedSince()
<< ", type_ = " << obj.getType()
<< ", msgRateExpired_ = " << obj.getMsgRateExpired()
<< ", msgBacklog_ = " << obj.getMsgBacklog()
<< "]";
return os;
}
double BrokerConsumerStatsImpl::getMsgRateOut() const {
return msgRateOut_;
}
double BrokerConsumerStatsImpl::getMsgThroughputOut() const {
return msgThroughputOut_;
}
double BrokerConsumerStatsImpl::getMsgRateRedeliver() const {
return msgRateRedeliver_;
}
const std::string BrokerConsumerStatsImpl::getConsumerName() const {
return consumerName_;
}
uint64_t BrokerConsumerStatsImpl::getAvailablePermits() const {
return availablePermits_;
}
uint64_t BrokerConsumerStatsImpl::getUnackedMessages() const {
return unackedMessages_;
}
bool BrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const {
return blockedConsumerOnUnackedMsgs_;
}
const std::string BrokerConsumerStatsImpl::getAddress() const {
return address_;
}
const std::string BrokerConsumerStatsImpl::getConnectedSince() const {
return connectedSince_;
}
const ConsumerType BrokerConsumerStatsImpl::getType() const {
return type_;
}
double BrokerConsumerStatsImpl::getMsgRateExpired() const {
return msgRateExpired_;
}
uint64_t BrokerConsumerStatsImpl::getMsgBacklog() const {
return msgBacklog_;
}
void BrokerConsumerStatsImpl::setCacheTime(uint64_t cacehTimeInMs) {
validTill_ = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(cacehTimeInMs);
}
ConsumerType BrokerConsumerStatsImpl::convertStringToConsumerType(const std::string& str) {
if (str == "ConsumerFailover" || str == "Failover") {
return ConsumerFailover;
} else if (str == "ConsumerShared" || str == "Shared") {
return ConsumerShared;
} else {
return ConsumerExclusive;
}
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
#define PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
#include <string.h>
#include <iostream>
#include <pulsar/Result.h>
#include <boost/function.hpp>
#include <boost/date_time/microsec_time_clock.hpp>
#include <pulsar/BrokerConsumerStats.h>
#include <lib/BrokerConsumerStatsImplBase.h>
namespace pulsar {
class BrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
private:
/** validTill_ - Stats will be valid till this time.*/
boost::posix_time::ptime validTill_;
/** Total rate of messages delivered to the consumer. msg/s */
double msgRateOut_;
/** Total throughput delivered to the consumer. bytes/s */
double msgThroughputOut_;
/** Total rate of messages redelivered by this consumer. msg/s */
double msgRateRedeliver_;
/** Name of the consumer */
std::string consumerName_;
/** Number of available message permits for the consumer */
uint64_t availablePermits_;
/** Number of unacknowledged messages for the consumer */
uint64_t unackedMessages_;
/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
bool blockedConsumerOnUnackedMsgs_;
/** Address of this consumer */
std::string address_;
/** Timestamp of connection */
std::string connectedSince_;
/** Whether this subscription is Exclusive or Shared or Failover */
ConsumerType type_;
/** Total rate of messages expired on this subscription. msg/s */
double msgRateExpired_;
/** Number of messages in the subscription backlog */
uint64_t msgBacklog_;
public:
BrokerConsumerStatsImpl();
BrokerConsumerStatsImpl(double msgRateOut, double msgThroughputOut, double msgRateRedeliver,
std::string consumerName, uint64_t availablePermits,
uint64_t unackedMessages, bool blockedConsumerOnUnackedMsgs,
std::string address, std::string connectedSince, const std::string& type,
double msgRateExpired, uint64_t msgBacklog);
/** Returns true if the Stats are still valid **/
virtual bool isValid() const;
/** Returns the rate of messages delivered to the consumer. msg/s */
virtual double getMsgRateOut() const;
/** Returns the throughput delivered to the consumer. bytes/s */
virtual double getMsgThroughputOut() const;
/** Returns the rate of messages redelivered by this consumer. msg/s */
virtual double getMsgRateRedeliver() const;
/** Returns the Name of the consumer */
virtual const std::string getConsumerName() const;
/** Returns the Number of available message permits for the consumer */
virtual uint64_t getAvailablePermits() const;
/** Returns the Number of unacknowledged messages for the consumer */
virtual uint64_t getUnackedMessages() const;
/** Returns true if the consumer is blocked due to unacked messages. */
virtual bool isBlockedConsumerOnUnackedMsgs() const;
/** Returns the Address of this consumer */
virtual const std::string getAddress() const;
/** Returns the Timestamp of connection */
virtual const std::string getConnectedSince() const;
/** Returns Whether this subscription is Exclusive or Shared or Failover */
virtual const ConsumerType getType() const;
/** Returns the rate of messages expired on this subscription. msg/s */
virtual double getMsgRateExpired() const;
/** Returns the Number of messages in the subscription backlog */
virtual uint64_t getMsgBacklog() const;
void setCacheTime(uint64_t cacehTimeInMs);
friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStatsImpl &obj);
static ConsumerType convertStringToConsumerType(const std::string& str);
};
}
#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H
#define PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H
#include <pulsar/BrokerConsumerStats.h>
namespace pulsar {
class BrokerConsumerStatsImplBase {
public:
/** Returns true if the Stats are still valid **/
virtual bool isValid() const = 0;
/** Returns the rate of messages delivered to the consumer. msg/s */
virtual double getMsgRateOut() const = 0;
/** Returns the throughput delivered to the consumer. bytes/s */
virtual double getMsgThroughputOut() const = 0;
/** Returns the rate of messages redelivered by this consumer. msg/s */
virtual double getMsgRateRedeliver() const = 0;
/** Returns the Name of the consumer */
virtual const std::string getConsumerName() const = 0;
/** Returns the Number of available message permits for the consumer */
virtual uint64_t getAvailablePermits() const = 0;
/** Returns the Number of unacknowledged messages for the consumer */
virtual uint64_t getUnackedMessages() const = 0;
/** Returns true if the consumer is blocked due to unacked messages. */
virtual bool isBlockedConsumerOnUnackedMsgs() const = 0;
/** Returns the Address of this consumer */
virtual const std::string getAddress() const = 0;
/** Returns the Timestamp of connection */
virtual const std::string getConnectedSince() const = 0;
/** Returns Whether this subscription is Exclusive or Shared or Failover */
virtual const ConsumerType getType() const = 0;
/** Returns the rate of messages expired on this subscription. msg/s */
virtual double getMsgRateExpired() const = 0;
/** Returns the Number of messages in the subscription backlog */
virtual uint64_t getMsgBacklog() const = 0;
};
typedef boost::shared_ptr<BrokerConsumerStatsImplBase> BrokerConsumerStatsImplBasePtr;
}
#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPLBASE_H
......@@ -124,7 +124,6 @@ havePendingPingRequest_(false),
keepAliveTimer_(),
maxPendingLookupRequest_(clientConfiguration.getConcurrentLookupRequest()),
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
consumerStatsTTLMs_(30 * 1000),
numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) {
if (clientConfiguration.isUseTls()) {
......@@ -205,7 +204,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
}
void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) {
std::vector<Promise<Result, BrokerConsumerStats> > consumerStatsPromises;
std::vector<Promise<Result, BrokerConsumerStatsImpl> > consumerStatsPromises;
Lock lock(mutex_);
for (int i = 0; i < consumerStatsRequests.size(); i++) {
......@@ -715,7 +714,7 @@ void ClientConnection::handleIncomingCommand() {
PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.find(
consumerStatsResponse.request_id());
if (it != pendingConsumerStatsMap_.end()) {
Promise<Result, BrokerConsumerStats> consumerStatsPromise = it->second;
Promise<Result, BrokerConsumerStatsImpl> consumerStatsPromise = it->second;
pendingConsumerStatsMap_.erase(it);
lock.unlock();
......@@ -730,10 +729,7 @@ void ClientConnection::handleIncomingCommand() {
cnxString_
<< "ConsumerStatsResponse command - Received consumer stats response from server. req_id: "
<< consumerStatsResponse.request_id() << " Stats: ");
boost::posix_time::ptime validTill = now() + milliseconds(consumerStatsTTLMs_);
BrokerConsumerStats brokerStats =
BrokerConsumerStats(validTill,
consumerStatsResponse.msgrateout(),
BrokerConsumerStatsImpl brokerStats(consumerStatsResponse.msgrateout(),
consumerStatsResponse.msgthroughputout(),
consumerStatsResponse.msgrateredeliver(),
consumerStatsResponse.consumername(),
......@@ -923,11 +919,11 @@ void ClientConnection::handleIncomingCommand() {
}
}
Future<Result, BrokerConsumerStats>
Future<Result, BrokerConsumerStatsImpl>
ClientConnection::newConsumerStats(const std::string topicName, const std::string subscriptionName,
uint64_t consumerId, uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, BrokerConsumerStats> promise;
Promise<Result, BrokerConsumerStatsImpl> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << " Client is not connected to the broker");
......
......@@ -40,6 +40,7 @@
#include "UtilAllocator.h"
#include <pulsar/Client.h>
#include <set>
#include <lib/BrokerConsumerStatsImpl.h>
using namespace pulsar;
......@@ -132,11 +133,9 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
Commands::ChecksumType getChecksumType() const;
Future<Result, BrokerConsumerStats> newConsumerStats(const std::string topicName, const std::string subscriptionName,
Future<Result, BrokerConsumerStatsImpl> newConsumerStats(const std::string topicName, const std::string subscriptionName,
uint64_t consumerId, uint64_t requestId) ;
private:
long consumerStatsTTLMs_ ;
struct PendingRequestData {
Promise<Result, std::string> promise;
DeadlineTimerPtr timer;
......@@ -254,7 +253,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
ConsumersMap consumers_;
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStats> > PendingConsumerStatsMap;
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> > PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;
......
......@@ -18,55 +18,13 @@
#include <pulsar/MessageBuilder.h>
#include "ConsumerImpl.h"
#include "Utils.h"
#include <lib/BrokerConsumerStatsImpl.h>
#include <lib/Latch.h>
namespace pulsar {
const std::string EMPTY_STRING;
BrokerConsumerStats::BrokerConsumerStats():validTill_(now()) {};
BrokerConsumerStats::BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName, int availablePermits,
int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address,
std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog):
validTill_(validTill),
msgRateOut_(msgRateOut),
msgThroughputOut_(msgThroughputOut),
msgRateRedeliver_(msgRateRedeliver),
consumerName_(consumerName),
availablePermits_(availablePermits),
unackedMessages_(unackedMessages),
blockedConsumerOnUnackedMsgs_(blockedConsumerOnUnackedMsgs),
address_(address),
connectedSince_(connectedSince),
type_(type),
msgRateExpired_(msgRateExpired),
msgBacklog_(msgBacklog)
{}
bool BrokerConsumerStats::isValid() const {
return now() <= validTill_;
}
std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj) {
os << "\nBrokerConsumerStats ["
<< "validTill_ = " << obj.validTill_
<< ", msgRateOut_ = " << obj.msgRateOut_
<< ", msgThroughputOut_ = " << obj.msgThroughputOut_
<< ", msgRateRedeliver_ = " << obj.msgRateRedeliver_
<< ", consumerName_ = " << obj.consumerName_
<< ", availablePermits_ = " << obj.availablePermits_
<< ", unackedMessages_ = " << obj.unackedMessages_
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.blockedConsumerOnUnackedMsgs_
<< ", address_ = " << obj.address_
<< ", connectedSince_ = " << obj.connectedSince_
<< ", type_ = " << obj.type_
<< ", msgRateExpired_ = " << obj.msgRateExpired_
<< ", msgBacklog_ = " << obj.msgBacklog_
<< "]";
return os;
}
struct ConsumerConfiguration::Impl {
long unAckedMessagesTimeoutMs;
ConsumerType consumerType;
......@@ -74,11 +32,13 @@ struct ConsumerConfiguration::Impl {
bool hasMessageListener;
int receiverQueueSize;
std::string consumerName;
long brokerConsumerStatsCacheTimeInMs;
Impl()
: unAckedMessagesTimeoutMs(0),
consumerType(ConsumerExclusive),
messageListener(),
hasMessageListener(false),
brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds
receiverQueueSize(1000) {
}
};
......@@ -99,6 +59,14 @@ ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfigurat
return *this;
}
long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const {
return impl_->brokerConsumerStatsCacheTimeInMs;
}
void ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs) {
impl_->brokerConsumerStatsCacheTimeInMs = cacheTimeInMs;
}
ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) {
impl_->consumerType = consumerType;
return *this;
......@@ -304,10 +272,19 @@ void Consumer::redeliverUnacknowledgedMessages() {
}
}
Result Consumer::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) {
Result Consumer::getBrokerConsumerStats(BrokerConsumerStats& brokerConsumerStats) {
if (!impl_) {
return ResultConsumerNotInitialized;
}
return impl_->getConsumerStats(BrokerConsumerStats, partitionIndex);
Promise<Result, BrokerConsumerStats> promise;
getBrokerConsumerStatsAsync(WaitForCallbackValue<BrokerConsumerStats>(promise));
return promise.getFuture().get(brokerConsumerStats);
}
void Consumer::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
if (!impl_) {
return callback(ResultConsumerNotInitialized, BrokerConsumerStats());
}
return impl_->getBrokerConsumerStatsAsync(callback);
}
}
......@@ -687,22 +687,21 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const {
return incomingMessages_.size();
}
Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex) {
if (partitionIndex != -1) {
LOG_WARN(getName() << "Ignoring the partitionIndex since the topic is not partitioned")
}
if (!isOpen()) {
void ConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
Lock lock(mutex_);
if (state_ != Ready) {
LOG_ERROR(getName() << "Client connection is not open, please try again later.")
return ResultConsumerNotInitialized;
lock.unlock();
return callback(ResultConsumerNotInitialized, BrokerConsumerStats());
}
if (brokerConsumerStats_.isValid()) {
LOG_DEBUG(getName() << "Serving data from cache");
brokerConsumerStats = brokerConsumerStats_;
return ResultOk;
BrokerConsumerStatsImpl brokerConsumerStats = brokerConsumerStats_;
lock.unlock();
return callback(ResultOk, BrokerConsumerStats(boost::make_shared<BrokerConsumerStatsImpl>(brokerConsumerStats_)));
}
lock.unlock();
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
......@@ -712,19 +711,30 @@ Result ConsumerImpl::getConsumerStats(BrokerConsumerStats& brokerConsumerStats,
LOG_DEBUG(getName() <<
" Sending ConsumerStats Command for Consumer - " << getConsumerId() << ", requestId - "<<requestId);
BrokerConsumerStats consumerStats;
Result res = cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).get(consumerStats);
if (res == ResultOk) {
brokerConsumerStats = brokerConsumerStats_ = consumerStats;
}
return res;
cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).addListener(
boost::bind(&ConsumerImpl::brokerConsumerStatsListener, shared_from_this(), _1, _2, callback));
return;
} else {
LOG_ERROR(getName() << " Operation not supported since server protobuf version " << cnx->getServerProtocolVersion() << " is older than proto::v7");
return ResultOperationNotSupported;
return callback(ResultUnsupportedVersionError, BrokerConsumerStats());
}
}
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
return ResultNotConnected;
return callback(ResultNotConnected, BrokerConsumerStats());
}
void ConsumerImpl::brokerConsumerStatsListener(Result res, BrokerConsumerStatsImpl brokerConsumerStats
, BrokerConsumerStatsCallback callback) {
if (res == ResultOk) {
Lock lock(mutex_);
brokerConsumerStats.setCacheTime(config_.getBrokerConsumerStatsCacheTimeInMs());
brokerConsumerStats_ = brokerConsumerStats;
}
if (!callback.empty()) {
callback(res, BrokerConsumerStats(boost::make_shared<BrokerConsumerStatsImpl>(brokerConsumerStats)));
}
}
} /* namespace pulsar */
......@@ -36,6 +36,7 @@
#include <map>
#include "BatchAcknowledgementTracker.h"
#include <limits>
#include <lib/BrokerConsumerStatsImpl.h>
using namespace pulsar;
......@@ -91,8 +92,8 @@ enum ConsumerTopicType {
virtual Result pauseMessageListener();
virtual Result resumeMessageListener();
virtual void redeliverUnacknowledgedMessages();
virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1);
protected:
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
protected:
void connectionOpened(const ClientConnectionPtr& cnx);
void connectionFailed(Result result);
void handleCreateConsumer(const ClientConnectionPtr& cnx, Result result);
......@@ -114,6 +115,7 @@ private:
void increaseAvailablePermits(const ClientConnectionPtr& currentCnx);
void drainIncomingMessageQueue(size_t count);
unsigned int receiveIndividualMessagesFromBatch(Message &batchedMessage);
void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback);
boost::mutex mutexForReceiveWithZeroQueueSize;
const ConsumerConfiguration config_;
......@@ -134,7 +136,7 @@ private:
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStats brokerConsumerStats_;
BrokerConsumerStatsImpl brokerConsumerStats_;
};
} /* namespace pulsar */
......
......@@ -47,7 +47,7 @@ public:
virtual void redeliverUnacknowledgedMessages() = 0;
virtual const std::string& getName() const = 0;
virtual int getNumOfPrefetchedMessages() const = 0;
virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1) = 0;
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
};
}
#endif //PULSAR_CONSUMER_IMPL_BASE_HEADER
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/PartitionedBrokerConsumerStatsImpl.h>
#include <boost/date_time/local_time/local_time.hpp>
#include <algorithm>
#include <numeric>
namespace pulsar {
const std::string PartitionedBrokerConsumerStatsImpl::DELIMITER = ";";
PartitionedBrokerConsumerStatsImpl::PartitionedBrokerConsumerStatsImpl(size_t size) {
statsList_.resize(size);
}
bool PartitionedBrokerConsumerStatsImpl::isValid() const {
bool isValid = true;
for (int i = 0; i<statsList_.size(); i++) {
isValid &= statsList_[i].isValid();
}
return isValid;
}
std::ostream& operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl& obj) {
os << "\nPartitionedBrokerConsumerStatsImpl ["
<< "validTill_ = " << obj.isValid()
<< ", msgRateOut_ = " << obj.getMsgRateOut()
<< ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
<< ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
<< ", consumerName_ = " << obj.getConsumerName()
<< ", availablePermits_ = " << obj.getAvailablePermits()
<< ", unackedMessages_ = " << obj.getUnackedMessages()
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
<< ", address_ = " << obj.getAddress()
<< ", connectedSince_ = " << obj.getConnectedSince()
<< ", type_ = " << obj.getType()
<< ", msgRateExpired_ = " << obj.getMsgRateExpired()
<< ", msgBacklog_ = " << obj.getMsgBacklog()
<< "]";
return os;
}
double PartitionedBrokerConsumerStatsImpl::getMsgRateOut() const {
double sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getMsgRateOut();
}
return sum;
}
double PartitionedBrokerConsumerStatsImpl::getMsgThroughputOut() const {
double sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getMsgThroughputOut();
}
return sum;
}
double PartitionedBrokerConsumerStatsImpl::getMsgRateRedeliver() const {
double sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getMsgRateRedeliver();
}
return sum;
}
const std::string PartitionedBrokerConsumerStatsImpl::getConsumerName() const {
std::string str;
for (int i = 0; i<statsList_.size(); i++) {
str += statsList_[i].getConsumerName() + DELIMITER;
}
return str;
}
uint64_t PartitionedBrokerConsumerStatsImpl::getAvailablePermits() const {
uint64_t sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getAvailablePermits();
}
return sum;
}
uint64_t PartitionedBrokerConsumerStatsImpl::getUnackedMessages() const {
uint64_t sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getUnackedMessages();
}
return sum;
}
bool PartitionedBrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const {
if (statsList_.size() == 0) {
return false;
}
bool isValid = true;
for (int i = 0; i<statsList_.size(); i++) {
isValid &= statsList_[i].isValid();
}
return isValid;
}
const std::string PartitionedBrokerConsumerStatsImpl::getAddress() const {
std::string str;
for (int i = 0; i<statsList_.size(); i++) {
str += statsList_[i].getAddress() + DELIMITER;
}
return str;
}
const std::string PartitionedBrokerConsumerStatsImpl::getConnectedSince() const {
std::string str;
for (int i = 0; i<statsList_.size(); i++) {
str += statsList_[i].getConnectedSince() + DELIMITER;
}
return str;
}
const ConsumerType PartitionedBrokerConsumerStatsImpl::getType() const {
if (! statsList_.size()) {
return ConsumerExclusive;
}
return statsList_[0].getType();
}
double PartitionedBrokerConsumerStatsImpl::getMsgRateExpired() const {
double sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getMsgRateExpired();
}
return sum;
}
uint64_t PartitionedBrokerConsumerStatsImpl::getMsgBacklog() const {
uint64_t sum = 0;
for (int i = 0; i<statsList_.size(); i++) {
sum += statsList_[i].getMsgBacklog();
}
return sum;
}
BrokerConsumerStats PartitionedBrokerConsumerStatsImpl::getBrokerConsumerStats(int index) {
return statsList_[index];
}
void PartitionedBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int index) {
statsList_[index] = stats;
}
void PartitionedBrokerConsumerStatsImpl::clear() {
statsList_.clear();
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_PARTITIONEDBROKERCONSUMERSTATSIMPL_H
#define PULSAR_CPP_PARTITIONEDBROKERCONSUMERSTATSIMPL_H
#include <string.h>
#include <iostream>
#include <vector>
#include <pulsar/Result.h>
#include <boost/function.hpp>
#include <boost/date_time/microsec_time_clock.hpp>
#include <lib/BrokerConsumerStatsImpl.h>
#include <boost/shared_ptr.hpp>
#include <boost/make_shared.hpp>
namespace pulsar {
class PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
private:
std::vector<BrokerConsumerStats> statsList_;
static const std::string DELIMITER;
public:
PartitionedBrokerConsumerStatsImpl(size_t size);
/** Returns true if the Stats are still valid **/
virtual bool isValid() const;
/** Returns the rate of messages delivered to the consumer. msg/s */
virtual double getMsgRateOut() const;
/** Returns the throughput delivered to the consumer. bytes/s */
virtual double getMsgThroughputOut() const;
/** Returns the rate of messages redelivered by this consumer. msg/s */
virtual double getMsgRateRedeliver() const;
/** Returns the Name of the consumer */
virtual const std::string getConsumerName() const;
/** Returns the Number of available message permits for the consumer */
virtual uint64_t getAvailablePermits() const;
/** Returns the Number of unacknowledged messages for the consumer */
virtual uint64_t getUnackedMessages() const;
/** Returns true if the consumer is blocked due to unacked messages. */
virtual bool isBlockedConsumerOnUnackedMsgs() const;
/** Returns the Address of this consumer */
virtual const std::string getAddress() const;
/** Returns the Timestamp of connection */
virtual const std::string getConnectedSince() const;
/** Returns Whether this subscription is Exclusive or Shared or Failover */
virtual const ConsumerType getType() const;
/** Returns the rate of messages expired on this subscription. msg/s */
virtual double getMsgRateExpired() const;
/** Returns the Number of messages in the subscription backlog */
virtual uint64_t getMsgBacklog() const;
/** Returns the BrokerConsumerStatsImpl at of ith partition */
BrokerConsumerStats getBrokerConsumerStats(int index);
void add(BrokerConsumerStats stats, int index);
void clear();
friend std::ostream& operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj);
};
typedef boost::shared_ptr<PartitionedBrokerConsumerStatsImpl> PartitionedBrokerConsumerStatsPtr;
}
#endif //PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
......@@ -15,11 +15,6 @@
*/
#include "PartitionedConsumerImpl.h"
#include "LogUtils.h"
#include <boost/bind.hpp>
#include "pulsar/Result.h"
#include "MessageImpl.h"
#include "Utils.h"
DECLARE_LOG_OBJECT()
......@@ -176,6 +171,7 @@ namespace pulsar {
// all the partitioned-consumer belonging to one partitioned topic should have same name
config.setConsumerName(conf_.getConsumerName());
config.setConsumerType(conf_.getConsumerType());
config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
config.setMessageListener(boost::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(), _1, _2));
// create consumer on each partition
for (unsigned int i = 0; i < numPartitions_; i++ ) {
......@@ -380,12 +376,37 @@ namespace pulsar {
return messages_.size();
}
Result PartitionedConsumerImpl::getConsumerStats(BrokerConsumerStats& BrokerConsumerStats, int partitionIndex) {
if (partitionIndex >= numPartitions_ && partitionIndex < 0 && consumers_.size() <= partitionIndex)
{
LOG_ERROR(getName() << " PartitionIndex must be positive and less than number of partitiones")
return ResultInvalidConfiguration;
void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
Lock lock(mutex_);
if (state_ != Ready) {
lock.unlock();
return callback(ResultConsumerNotInitialized, BrokerConsumerStats());
}
PartitionedBrokerConsumerStatsPtr statsPtr = boost::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions_);
LatchPtr latchPtr = boost::make_shared<Latch>(numPartitions_);
ConsumerList consumerList = consumers_;
lock.unlock();
for (int i = 0; i<consumerList.size(); i++) {
consumerList[i]->getBrokerConsumerStatsAsync(boost::bind(&PartitionedConsumerImpl::handleGetConsumerStats,
shared_from_this(), _1, _2, latchPtr,
statsPtr, i, callback));
}
}
void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
LatchPtr latchPtr, PartitionedBrokerConsumerStatsPtr statsPtr,
size_t index, BrokerConsumerStatsCallback callback) {
Lock lock(mutex_);
if (res == ResultOk) {
latchPtr->countdown();
statsPtr->add(brokerConsumerStats, index);
} else {
lock.unlock();
return callback(res, BrokerConsumerStats());
}
if (latchPtr->getCount() == 0) {
lock.unlock();
callback(ResultOk, BrokerConsumerStats(statsPtr));
}
return consumers_[partitionIndex]->getConsumerStats(BrokerConsumerStats);
}
}
......@@ -25,6 +25,9 @@
#include "boost/enable_shared_from_this.hpp"
#include "ConsumerImplBase.h"
#include "lib/UnAckedMessageTrackerDisabled.h"
#include <lib/Latch.h>
#include <lib/PartitionedBrokerConsumerStatsImpl.h>
namespace pulsar {
class PartitionedConsumerImpl;
class PartitionedConsumerImpl: public ConsumerImplBase, public boost::enable_shared_from_this<PartitionedConsumerImpl> {
......@@ -60,7 +63,10 @@ namespace pulsar {
virtual void redeliverUnacknowledgedMessages();
virtual const std::string& getName() const;
virtual int getNumOfPrefetchedMessages() const ;
virtual Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex);
virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
void handleGetConsumerStats(Result , BrokerConsumerStats, LatchPtr,
PartitionedBrokerConsumerStatsPtr, size_t, BrokerConsumerStatsCallback);
private:
const ClientImplPtr client_;
const std::string subscriptionName_;
......
......@@ -281,7 +281,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
// reserving a spot and going forward - not blocking
if (!conf_.getBlockIfQueueFull() && !pendingMessagesQueue_.tryReserve(1)) {
LOG_DEBUG(getName() << " - Producer Queue is full");
// If queue is full sending the batch immediately, no point waiting till batchMessageimeout
// If queue is full sending the batch immediately, no point waiting till batchMessagetimeout
if (batchMessageContainer) {
LOG_DEBUG(getName() << " - sending batch message immediately");
batchMessageContainer->sendMessage();
......
......@@ -18,21 +18,18 @@
#include <pulsar/Client.h>
#include <boost/lexical_cast.hpp>
#include <lib/LogUtils.h>
#include <pulsar/MessageBuilder.h>
#include "DestinationName.h"
#include <lib/DestinationName.h>
#include <lib/Commands.h>
#include <sstream>
#include "boost/date_time/posix_time/posix_time.hpp"
#include "CustomRoutingPolicy.h"
#include <boost/thread.hpp>
#include "lib/Future.h"
#include "lib/Utils.h"
#include <ctime>
#include "LogUtils.h"
#include "PulsarFriend.h"
#include <unistd.h>
#include "ConsumerTest.h"
#include "HttpHelper.h"
#include <lib/Latch.h>
#include <lib/PartitionedConsumerImpl.h>
DECLARE_LOG_OBJECT();
using namespace pulsar;
......@@ -40,23 +37,39 @@ using namespace pulsar;
static std::string lookupUrl = "http://localhost:8765";
static std::string adminUrl = "http://localhost:8765/";
void partitionedCallbackFunction(Result result, BrokerConsumerStats brokerConsumerStats, long expectedBacklog, Latch& latch, int index) {
ASSERT_EQ(result, ResultOk);
PartitionedBrokerConsumerStatsImpl* statsPtr = (PartitionedBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get());
LOG_DEBUG(statsPtr);
ASSERT_EQ(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog());
latch.countdown();
}
void simpleCallbackFunction(Result result, BrokerConsumerStats& brokerConsumerStats, Result expectedResult,
uint64_t expectedBacklog, ConsumerType expectedConsumerType) {
LOG_DEBUG(brokerConsumerStats);
ASSERT_EQ(result, expectedResult);
ASSERT_EQ(brokerConsumerStats.getMsgBacklog(), expectedBacklog);
ASSERT_EQ(brokerConsumerStats.getType(), expectedConsumerType);
}
TEST(ConsumerStatsTest, testBacklogInfo) {
long epochTime=time(NULL);
std::string testName="testBacklogInfo-" + boost::lexical_cast<std::string>(epochTime);
Client client(lookupUrl);
std::string topicName = "persistent://property/cluster/namespace/" + testName;
std::string subName = "subscription-name";
ConsumerConfiguration conf;
conf.setBrokerConsumerStatsCacheTimeInMs(3 * 1000);
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
client.subscribeAsync(topicName, subName, conf, WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
Result result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.unsubscribe();
client.subscribe(topicName, subName, consumer);
client.subscribe(topicName, subName, conf, consumer);
// Producing messages
Producer producer;
......@@ -74,12 +87,8 @@ TEST(ConsumerStatsTest, testBacklogInfo) {
producer.send(msg);
}
BrokerConsumerStats consumerStats;
Result res = consumer.getConsumerStats(consumerStats);
ASSERT_EQ(res, ResultOk);
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages);
LOG_DEBUG("Calling consumer.getBrokerConsumerStats");
consumer.getBrokerConsumerStatsAsync(boost::bind(simpleCallbackFunction, _1, _2, ResultOk, numOfMessages, ConsumerExclusive));
for (int i = numOfMessages; i<(numOfMessages*2); i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
......@@ -87,12 +96,13 @@ TEST(ConsumerStatsTest, testBacklogInfo) {
producer.send(msg);
}
usleep(35 * 1000 * 1000);
res = consumer.getConsumerStats(consumerStats);
usleep(3.5 * 1000 * 1000);
BrokerConsumerStats consumerStats;
Result res = consumer.getBrokerConsumerStats(consumerStats);
ASSERT_EQ(res, ResultOk);
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, 2 * numOfMessages);
ASSERT_EQ(consumerStats.getMsgBacklog(), 2 * numOfMessages);
ASSERT_EQ(consumerStats.getType(), ConsumerExclusive);
consumer.unsubscribe();
}
......@@ -106,14 +116,14 @@ TEST(ConsumerStatsTest, testFailure) {
Promise<Result, Consumer> consumerPromise;
BrokerConsumerStats consumerStats;
client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
Result result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.unsubscribe();
ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
client.subscribe(topicName, subName, consumer);
// Producing messages
......@@ -132,13 +142,13 @@ TEST(ConsumerStatsTest, testFailure) {
producer.send(msg);
}
ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages);
ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages);
consumer.unsubscribe();
ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
}
TEST(ConsumerStatsTest, testCachingMechanism) {
......@@ -147,19 +157,21 @@ TEST(ConsumerStatsTest, testCachingMechanism) {
Client client(lookupUrl);
std::string topicName = "persistent://property/cluster/namespace/" + testName;
std::string subName = "subscription-name";
ConsumerConfiguration conf;
conf.setBrokerConsumerStatsCacheTimeInMs(3.5 * 1000);
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
BrokerConsumerStats consumerStats;
client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats));
client.subscribeAsync(topicName, subName, conf, WaitForCallbackValue<Consumer>(consumerPromise));
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
Result result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.unsubscribe();
ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats));
client.subscribe(topicName, subName, consumer);
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
client.subscribe(topicName, subName, conf, consumer);
// Producing messages
Producer producer;
......@@ -177,10 +189,10 @@ TEST(ConsumerStatsTest, testCachingMechanism) {
producer.send(msg);
}
ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages);
ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages);
for (int i = numOfMessages; i<(numOfMessages*2); i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
......@@ -190,26 +202,105 @@ TEST(ConsumerStatsTest, testCachingMechanism) {
LOG_DEBUG("Expecting cached results");
ASSERT_TRUE(consumerStats.isValid());
ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages);
ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages);
LOG_DEBUG("Still Expecting cached results");
usleep(10 * 1000 * 1000);
usleep(1 * 1000 * 1000);
ASSERT_TRUE(consumerStats.isValid());
ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages);
ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages);
LOG_DEBUG("Now expecting new results");
usleep(25 * 1000 * 1000);
usleep(3 * 1000 * 1000);
ASSERT_FALSE(consumerStats.isValid());
ASSERT_EQ(ResultOk, consumer.getConsumerStats(consumerStats));
ASSERT_EQ(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
LOG_DEBUG(consumerStats);
ASSERT_EQ(consumerStats.msgBacklog_, numOfMessages * 2);
ASSERT_EQ(consumerStats.getMsgBacklog(), numOfMessages * 2);
consumer.unsubscribe();
ASSERT_NE(ResultOk, consumer.getConsumerStats(consumerStats));
}
\ No newline at end of file
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
}
TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
long epochTime=time(NULL);
std::string testName="testAsyncCallOnPartitionedTopic-" + boost::lexical_cast<std::string>(epochTime);
Client client(lookupUrl);
std::string topicName = "persistent://property/cluster/namespace/" + testName;
std::string subName = "subscription-name";
// call admin api to create partitioned topics
std::string url = adminUrl + "admin/persistent/property/cluster/namespace/" + testName + "/partitions";
int res = makePutRequest(url, "7");
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
ConsumerConfiguration conf;
conf.setBrokerConsumerStatsCacheTimeInMs(3.5 * 1000);
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
BrokerConsumerStats consumerStats;
client.subscribeAsync(topicName, subName, conf, WaitForCallbackValue<Consumer>(consumerPromise));
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
Result result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.unsubscribe();
ASSERT_NE(ResultOk, consumer.getBrokerConsumerStats(consumerStats));
client.subscribe(topicName, subName, conf, consumer);
// Producing messages
Producer producer;
int numOfMessages = 7 * 5; // 5 message per partition
Promise<Result, Producer> producerPromise;
ProducerConfiguration config;
config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
client.createProducerAsync(topicName, config, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
std::string prefix = testName + "-";
for (int i = 0; i<numOfMessages; i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder().build();
producer.send(msg);
}
// Expecting return from 4 callbacks
Latch latch(4);
consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 5, latch, 0));
// Now we have 10 messages per partition
for (int i = numOfMessages; i<(numOfMessages*2); i++) {
std::string messageContent = prefix + boost::lexical_cast<std::string>(i);
Message msg = MessageBuilder().build();
producer.send(msg);
}
// Expecting cached result
consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 5, latch, 0));
usleep(4.5 * 1000 * 1000);
// Expecting fresh results
consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 2));
Message msg;
while (consumer.receive(msg)) {
// Do nothing
}
// Expecting the backlog to be the same since we didn't acknowledge the messages
consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 3));
// Wait for ten seconds only
ASSERT_TRUE(latch.wait(milliseconds(10 * 1000)));
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册