提交 bea8ff22 编写于 作者: J jai1 提交者: GitHub

HTTP lookups for c++ client lib (#317)

上级 16d8177c
......@@ -32,6 +32,11 @@ find_library(LOG4CXX_LIBRARY_PATH log4cxx)
find_library(CURL_LIBRARY_PATH curl)
find_path(LOG4CXX_INCLUDE_PATH log4cxx/logger.h)
find_path(GTEST_INCLUDE_PATH gtest/gtest.h)
find_library(LIB_JSON jsoncpp)
if (NOT LIB_JSON)
find_library(LIB_JSON json_cpp)
endif (NOT LIB_JSON)
set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES})
link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} )
......@@ -56,6 +61,7 @@ set(COMMON_LIBS
${LOG4CXX_LIBRARY_PATH}
${CURL_LIBRARY_PATH}
${ADDITIONAL_LIBRARIES}
${LIB_JSON}
)
link_directories(${CMAKE_BINARY_DIR}/lib)
......
......@@ -12,7 +12,7 @@ https://github.com/yahoo/pulsar/tree/master/pulsar-client-cpp/examples
* [Log4CXX](https://logging.apache.org/log4cxx)
* LibCurl
* [GTest](https://github.com/google/googletest)
* JsonCpp
## Platforms
Pulsar C++ Client Library has been tested on:
......
......@@ -16,7 +16,6 @@
#include "DestinationName.h"
#include "BinaryProtoLookupService.h"
#include "LogUtils.h"
#include "SharedBuffer.h"
#include <boost/shared_ptr.hpp>
......
......@@ -18,22 +18,16 @@
#define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_
#include <iostream>
#include <boost/shared_ptr.hpp>
#include <pulsar/Authentication.h>
#include <pulsar/Result.h>
#include "ConnectionPool.h"
#include "DestinationName.h"
#include "Future.h"
#include "LookupDataResult.h"
#include "Backoff.h"
#include <lib/LookupService.h>
#pragma GCC visibility push(default)
namespace pulsar {
class LookupDataResult;
class BinaryProtoLookupService {
class BinaryProtoLookupService : public LookupService {
public:
/*
* constructor
......@@ -70,7 +64,7 @@ class BinaryProtoLookupService {
uint64_t newRequestId();
};
typedef boost::shared_ptr<BinaryProtoLookupService> BinaryProtoLookupServicePtr;
}
#pragma GCC visibility pop
......
......@@ -27,6 +27,7 @@
#include <sstream>
#include <openssl/sha.h>
#include "boost/date_time/posix_time/posix_time.hpp"
#include <lib/HTTPLookupService.h>
DECLARE_LOG_OBJECT()
......@@ -63,11 +64,20 @@ namespace pulsar {
listenerExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
partitionListenerExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(clientConfiguration.getMessageListenerThreads())),
pool_(clientConfiguration, ioExecutorProvider_, clientConfiguration.getAuthenticationPtr(), poolConnections),
lookup_(pool_, serviceUrl),
producerIdGenerator_(0),
consumerIdGenerator_(0),
requestIdGenerator_(0) {
LogUtils::init(clientConfiguration.getLogConfFilePath());
if (serviceUrl_.compare(0, 4, "http") == 0) {
LOG_DEBUG("Using HTTP Lookup");
lookupServicePtr_ = boost::make_shared<HTTPLookupService>(boost::cref(serviceUrl_),
boost::cref(clientConfiguration_),
boost::cref(
clientConfiguration.getAuthenticationPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
lookupServicePtr_ = boost::make_shared<BinaryProtoLookupService>(boost::ref(pool_), boost::ref(serviceUrl));
}
}
ClientImpl::~ClientImpl() {
......@@ -105,7 +115,7 @@ namespace pulsar {
return;
}
}
lookup_.getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleCreateProducer,
lookupServicePtr_->getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleCreateProducer,
shared_from_this(), _1, _2, dn, conf, callback));
}
......@@ -157,7 +167,7 @@ namespace pulsar {
}
}
lookup_.getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleSubscribe,
lookupServicePtr_->getPartitionMetadataAsync(dn).addListener(boost::bind(&ClientImpl::handleSubscribe,
shared_from_this(), _1, _2, dn, consumerName, conf, callback));
}
......@@ -211,7 +221,7 @@ namespace pulsar {
Future<Result, ClientConnectionWeakPtr> ClientImpl::getConnection(const std::string& topic) {
Promise<Result, ClientConnectionWeakPtr> promise;
lookup_.lookupAsync(topic).addListener(boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise));
lookupServicePtr_->lookupAsync(topic).addListener(boost::bind(&ClientImpl::handleLookup, this, _1, _2, promise));
return promise.getFuture();
}
......
......@@ -108,7 +108,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr listenerExecutorProvider_;
ExecutorServiceProviderPtr partitionListenerExecutorProvider_;
BinaryProtoLookupService lookup_;
LookupServicePtr lookupServicePtr_;
ConnectionPool pool_;
uint64_t producerIdGenerator_;
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/HTTPLookupService.h>
DECLARE_LOG_OBJECT()
namespace pulsar {
const static std::string V2_PATH = "/lookup/v2/destination/";
const static std::string PARTITION_PATH = "/admin/persistent/";
const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;
HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;
HTTPLookupService::HTTPLookupService(const std::string &lookupUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(boost::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) {
if (lookupUrl[lookupUrl.length() - 1] == '/') {
// Remove trailing '/'
adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
} else {
adminUrl_ = lookupUrl;
}
}
Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::string &destinationName) {
LookupPromise promise;
boost::shared_ptr<DestinationName> dn = DestinationName::get(destinationName);
if (!dn) {
LOG_ERROR("Unable to parse destination - " << destinationName);
promise.setFailed(ResultInvalidTopicName);
return promise.getFuture();
}
std::stringstream completeUrlStream;
completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << dn->getProperty() << '/' << dn->getCluster()
<< '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName();
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), Lookup));
return promise.getFuture();
}
Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync(const DestinationNamePtr &dn) {
LookupPromise promise;
std::stringstream completeUrlStream;
completeUrlStream << adminUrl_ << PARTITION_PATH << dn->getProperty() << '/' << dn->getCluster()
<< '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName() << '/'
<< PARTITION_METHOD_NAME;
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData));
return promise.getFuture();
}
static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
return size * nmemb;
}
void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl,
RequestType requestType) {
CURL *handle;
CURLcode res;
std::string responseData;
handle = curl_easy_init();
if(!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
promise.setFailed(ResultLookupError);
// No curl_easy_cleanup required since handle not initialized
return;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());
// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);
// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);
// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);
// Redirects
curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_HTTP_REDIRECTS);
// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("All Authentication methods should have AuthenticationData and return true on getAuthData for url " << completeUrl);
promise.setFailed(authResult);
curl_easy_cleanup(handle);
return;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
// Make get call to server
res = curl_easy_perform(handle);
// Free header list
curl_slist_free_all(list);
switch(res) {
case CURLE_OK:
LOG_DEBUG("Response received successfully for url " << completeUrl);
promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData));
break;
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultConnectError);
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultReadError);
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultTimeout);
break;
default:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultLookupError);
break;
}
curl_easy_cleanup(handle);
}
LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) {
Json::Value root;
Json::Reader reader;
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setPartitions(root.get("partitions", 0).asInt());
return lookupDataResultPtr;
}
LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) {
Json::Value root;
Json::Reader reader;
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json : " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return LookupDataResultPtr();
}
const std::string defaultNotFoundString = "Url Not found";
const std::string brokerUrl = root.get("brokerUrl", defaultNotFoundString).asString();
if (brokerUrl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrl not present" << json);
return LookupDataResultPtr();
}
const std::string brokerUrlSsl = root.get("brokerUrlSsl", defaultNotFoundString).asString();
if (brokerUrlSsl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrlSsl not present" << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
return lookupDataResultPtr;
}
}
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_HTTPLOOKUPSERVICE_H
#define PULSAR_CPP_HTTPLOOKUPSERVICE_H
#include <lib/LookupService.h>
#include <lib/ClientImpl.h>
#include <lib/Url.h>
#include <json/value.h>
#include <json/reader.h>
#include <boost/bind.hpp>
#include <curl/curl.h>
namespace pulsar {
class HTTPLookupService : public LookupService, public boost::enable_shared_from_this<HTTPLookupService> {
class CurlInitializer {
public:
CurlInitializer() {
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
curl_global_init (CURL_GLOBAL_ALL);
}
~CurlInitializer() {
curl_global_cleanup();
}
};
static CurlInitializer curlInitializer;
enum RequestType {Lookup, PartitionMetaData};
typedef Promise<Result, LookupDataResultPtr> LookupPromise;
ExecutorServiceProviderPtr executorProvider_;
std::string adminUrl_;
AuthenticationPtr authenticationPtr_;
int lookupTimeoutInSeconds_;
static LookupDataResultPtr parsePartitionData(const std::string&);
static LookupDataResultPtr parseLookupData(const std::string&);
void sendHTTPRequest(LookupPromise, const std::string, RequestType);
public:
HTTPLookupService(const std::string&, const ClientConfiguration&, const AuthenticationPtr&);
Future<Result, LookupDataResultPtr> lookupAsync(const std::string&);
Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr&);
};
}
#endif //PULSAR_CPP_HTTPLOOKUPSERVICE_H
......@@ -17,6 +17,8 @@
#ifndef _PULSAR_LOOKUP_DATA_RESULT_HEADER_
#define _PULSAR_LOOKUP_DATA_RESULT_HEADER_
#include <string>
#include <lib/Future.h>
#include <pulsar/Result.h>
namespace pulsar {
class LookupDataResult;
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CPP_LOOKUPSERVICE_H
#define PULSAR_CPP_LOOKUPSERVICE_H
#include <lib/LookupDataResult.h>
#include <pulsar/Result.h>
#include <lib/Future.h>
#include <lib/DestinationName.h>
#include <lib/LogUtils.h>
namespace pulsar {
class LookupService {
public:
/*
* @param destinationName - topic name
*
* Looks up the owner broker for the given destination name
*/
virtual Future<Result, LookupDataResultPtr> lookupAsync(const std::string& destinationName) = 0;
/*
* @param dn - pointer to destination (topic) name
*
* Gets Partition metadata
*/
virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr& dn) = 0;
};
typedef boost::shared_ptr<LookupService> LookupServicePtr;
}
#endif //PULSAR_CPP_LOOKUPSERVICE_H
......@@ -17,8 +17,7 @@
#include "Url.h"
#include <boost/regex.hpp>
#include <stdlib.h>
#include <map>
#include <iostream>
namespace pulsar {
......@@ -53,6 +52,10 @@ bool Url::parse(const std::string& urlStr, Url& url) {
url.protocol_ = std::string(groups[1].first, groups[1].second);
url.host_ = std::string(groups[2].first, groups[2].second);
std::string portStr(groups[3].first, groups[3].second);
url.pathWithoutFile_ = std::string(groups[4].first, groups[4].second);
url.file_ = std::string(groups[5].first, groups[5].second);
url.parameter_ = std::string(groups[6].first, groups[6].second);
url.path_ = url.pathWithoutFile_ + url.file_;
if (!portStr.empty()) {
url.port_ = atoi(groups[3].first);
......@@ -81,4 +84,26 @@ const int Url::port() const {
return port_;
}
const std::string& Url::path() const {
return path_;
}
const std::string& Url::pathWithoutFile() const {
return pathWithoutFile_;
}
const std::string& Url::file() const {
return file_;
}
const std::string& Url::parameter() const {
return parameter_;
}
std::ostream & operator<<(std::ostream &os, const Url& obj) {
os << "Url [Host = " << obj.host() << ", Protocol = " << obj.protocol()
<< ", Port = " << obj.port() << "]";
return os;
}
} // pulsar
......@@ -33,11 +33,19 @@ public:
const std::string& protocol() const;
const std::string& host() const;
const int port() const;
const std::string& path() const;
const std::string& pathWithoutFile() const;
const std::string& file() const;
const std::string& parameter() const;
friend std::ostream& operator<<(std::ostream &os, const Url& obj);
private:
std::string protocol_;
std::string host_;
int port_;
std::string path_;
std::string pathWithoutFile_;
std::string file_;
std::string parameter_;
};
} // pulsar
......
......@@ -37,7 +37,7 @@ DECLARE_LOG_OBJECT();
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:8885";
static std::string lookupUrl = "http://localhost:8765";
static std::string adminUrl = "http://localhost:8765/";
......
......@@ -62,4 +62,20 @@ TEST(UrlTest, testUrl) {
ASSERT_EQ("example.com", url.host());
ASSERT_EQ("pulsar", url.protocol());
ASSERT_EQ(6650, url.port());
ASSERT_TRUE(Url::parse("http://env-broker3.messaging.cluster.company.com:4080/lookup/v2/destination/persistent/cmscpp/gq1/TESTNS.4/TOPIC_1490664894335_1?authoritative=false", url));
ASSERT_EQ("http", url.protocol());
ASSERT_EQ(4080, url.port());
ASSERT_EQ("/lookup/v2/destination/persistent/cmscpp/gq1/TESTNS.4/TOPIC_1490664894335_1", url.path());
ASSERT_EQ("/lookup/v2/destination/persistent/cmscpp/gq1/TESTNS.4/", url.pathWithoutFile());
ASSERT_EQ("TOPIC_1490664894335_1", url.file());
ASSERT_EQ("?authoritative=false", url.parameter());
ASSERT_TRUE(Url::parse("http://abc.com:8090/ads/ad/asd/TOPIC_1490664894335_1?authoritative=false,temp=true", url));
ASSERT_EQ("http", url.protocol());
ASSERT_EQ(8090, url.port());
ASSERT_EQ("/ads/ad/asd/TOPIC_1490664894335_1", url.path());
ASSERT_EQ("/ads/ad/asd/", url.pathWithoutFile());
ASSERT_EQ("TOPIC_1490664894335_1", url.file());
ASSERT_EQ("?authoritative=false,temp=true", url.parameter());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册