提交 ec850a7f 编写于 作者: M Matteo Merli

In C++ allow messages that compress to <5mb to be sent with batching enabled (#3673)

* In C++ allow messages that compress to <5mb to be sent with batching enabled

* Fixed related test
上级 d3e17f4d
......@@ -102,6 +102,14 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload);
impl_->payload = encryptedPayload;
if (impl_->payload.readableBytes() > Commands::MaxMessageSize) {
// At this point the compressed batch is above the overall MaxMessageSize. There
// can only 1 single message in the batch at this point.
batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, nullptr);
clear();
return;
}
Message msg;
msg.impl_ = impl_;
......
......@@ -351,12 +351,13 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
return;
}
payload = encryptedPayload;
}
if (payloadSize > Commands::MaxMessageSize) {
LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
<< Commands::MaxMessageSize << " bytes");
cb(ResultMessageTooBig, msg);
return;
if (payloadSize > Commands::MaxMessageSize) {
LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
<< Commands::MaxMessageSize << " bytes");
cb(ResultMessageTooBig, msg);
return;
}
}
// Reserve a spot in the messages queue before acquiring the ProducerImpl
......
......@@ -555,7 +555,9 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
Client client(lookupUrl);
std::string topicName = "testMessageTooBig";
Producer producer;
Result result = client.createProducer(topicName, producer);
ProducerConfiguration conf;
conf.setBatchingEnabled(false);
Result result = client.createProducer(topicName, conf, producer);
ASSERT_EQ(ResultOk, result);
int size = Commands::MaxMessageSize + 1;
......@@ -1140,6 +1142,45 @@ TEST(BasicEndToEndTest, testProduceMessageSize) {
delete[] content;
}
TEST(BasicEndToEndTest, testBigMessageSizeBatching) {
ClientConfiguration config;
Client client(lookupUrl);
std::string topicName = "testBigMessageSizeBatching";
std::string subName = "my-sub-name";
ProducerConfiguration conf1;
conf1.setCompressionType(CompressionNone);
conf1.setBatchingEnabled(true);
Producer producer1;
Result result = client.createProducer(topicName, conf1, producer1);
ASSERT_EQ(ResultOk, result);
ProducerConfiguration conf2;
conf2.setCompressionType(CompressionLZ4);
conf2.setBatchingEnabled(true);
Producer producer2;
result = client.createProducer(topicName, conf2, producer2);
ASSERT_EQ(ResultOk, result);
int size = Commands::MaxMessageSize + 1;
char* content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer1.send(msg);
ASSERT_EQ(ResultMessageTooBig, result);
msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer2.send(msg);
ASSERT_EQ(ResultOk, result);
producer1.close();
producer2.close();
client.close();
delete[] content;
}
TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
Client client(adminUrl);
std::string topicName = "testHandlerReconnectionLogic";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册