提交 d62cf072 编写于 作者: J jai1 提交者: Matteo Merli

Partitioned topics fix (#226)

上级 b6f061a7
......@@ -354,13 +354,12 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer)
std::string topicName = "persistent://prop/unit/ns/partition-test";
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/partition-test/partitions";
int res = makePutRequest(lookupUrl, "3");
std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-test/partitions";
int res = makePutRequest(url, "3");
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);
......@@ -386,7 +385,7 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer)
client.shutdown();
}
TEST(BasicEndToEndTest, testMessageTooBig)
TEST(BasicEndToEndTest, testMessageTooBig)
{
ClientConfiguration config;
Client client(lookupUrl);
......@@ -512,13 +511,11 @@ TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy)
std::string topicName = "persistent://prop/unit/ns/partition-testSinglePartitionRoutingPolicy";
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/partition-testSinglePartitionRoutingPolicy/partitions";
int res = makePutRequest(lookupUrl, "5");
std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testSinglePartitionRoutingPolicy/partitions";
int res = makePutRequest(url, "5");
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
Producer producer;
ProducerConfiguration producerConfiguration;
......@@ -573,13 +570,11 @@ TEST(BasicEndToEndTest, testSinglePartitionRoutingPolicy)
std::string topicName = "persistent://prop/unit/ns/partition-testDuplicateConsumerCreationOnPartitionedTopic";
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/testDuplicateConsumerCreationOnPartitionedTopic/partitions";
int res = makePutRequest(lookupUrl, "5");
std::string url = adminUrl + "admin/persistent/prop/unit/ns/testDuplicateConsumerCreationOnPartitionedTopic/partitions";
int res = makePutRequest(url, "5");
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
usleep(2 * 1000 * 1000);
......@@ -626,13 +621,11 @@ TEST(BasicEndToEndTest, testRoundRobinRoutingPolicy)
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/ns/partition-testRoundRobinRoutingPolicy";
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/partition-testRoundRobinRoutingPolicy/partitions";
int res = makePutRequest(lookupUrl, "5");
std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testRoundRobinRoutingPolicy/partitions";
int res = makePutRequest(url, "5");
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
Producer producer;
ProducerConfiguration tempProducerConfiguration;
......@@ -693,13 +686,11 @@ TEST(BasicEndToEndTest, testMessageListener)
Client client(lookupUrl);
std::string topicName = "persistent://prop/unit/ns/partition-testMessageListener";
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/partition-testMessageListener/partitions";
int res = makePutRequest(lookupUrl, "5");
std::string url = adminUrl + "admin/persistent/prop/unit/ns/partition-testMessageListener/partitions";
int res = makePutRequest(url, "5");
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
Producer producer;
ProducerConfiguration producerConfiguration;
......@@ -738,13 +729,13 @@ TEST(BasicEndToEndTest, testMessageListenerPause)
std::string topicName = "persistent://property/cluster/namespace/partition-testMessageListener-pauses";
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/partition-testMessageListener-pauses/partitions";
int res = makePutRequest(lookupUrl, "5");
std::string url = adminUrl + "admin/persistent/property/cluster/namespace/partition-testMessageListener-pauses/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
Producer producer;
ProducerConfiguration producerConfiguration;
producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
......
......@@ -763,14 +763,12 @@ TEST(BatchMessageTest, testPartitionedTopics) {
std::string topicName = "persistent://property/cluster/namespace/test-partitioned-batch-messages-" + boost::lexical_cast<std::string>(epochTime) ;
// call admin api to make it partitioned
std::string url = lookupUrl + "/admin/persistent/prop/unit/ns/test-partitioned-batch-messages-"
std::string url = adminUrl + "admin/persistent/property/cluster/namespace/test-partitioned-batch-messages-"
+ boost::lexical_cast<std::string>(epochTime) + "/partitions";
int res = makePutRequest(lookupUrl, "7");
int res = makePutRequest(url, "7");
if (res != 204 && res != 409) {
LOG_DEBUG("Unable to create partitioned topic.");
return;
}
LOG_INFO("res = "<<res);
ASSERT_FALSE(res != 204 && res != 409);
usleep(2 * 1000 * 1000);
......
......@@ -19,20 +19,27 @@
#include <curl/curl.h>
int makePutRequest(const std::string& url, const std::string& body) {
CURL* curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
int res = curl_easy_perform(curl);
if (res != CURLE_OK) {
return -1;
}
int httpResult = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpResult);
curl_easy_cleanup(curl);
return httpResult;
}
int makePutRequest(const std::string& url, const std::string& body) {
CURL* curl = curl_easy_init();
struct curl_slist *list = NULL;
list = curl_slist_append(list, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT");
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
int res = curl_easy_perform(curl);
curl_slist_free_all(list); /* free the list again */
if (res != CURLE_OK) {
return -1;
}
int httpResult = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpResult);
curl_easy_cleanup(curl);
return httpResult;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册