提交 745f7d02 编写于 作者: B Boyang Jerry Peng 提交者: Matteo Merli

fix: non-batched messages cause sql query to fail (#3684)

上级 3a3966cb
......@@ -86,7 +86,7 @@ public class MessageParser {
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
processor.process(
RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload, ledgerId, entryId, 0));
RawMessageImpl.get(refCntMsgMetadata, null, uncompressedPayload.retain(), ledgerId, entryId, 0));
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(refCntMsgMetadata, uncompressedPayload, ledgerId, entryId, processor);
......
......@@ -67,7 +67,16 @@ public class TestBasicPresto extends PulsarTestSuite {
}
@Test
public void testSimpleSQLQuery() throws Exception {
public void testSimpleSQLQueryBatched() throws Exception {
testSimpleSQLQuery(true);
}
@Test
public void testSimpleSQLQueryNonBatched() throws Exception {
testSimpleSQLQuery(false);
}
public void testSimpleSQLQuery(boolean isBatched) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
......@@ -79,9 +88,9 @@ public class TestBasicPresto extends PulsarTestSuite {
@Cleanup
Producer<Stock> producer = pulsarClient.newProducer(JSONSchema.of(Stock.class))
.topic(stocksTopic)
.enableBatching(isBatched)
.create();
for (int i = 0 ; i < NUM_OF_STOCKS; ++i) {
final Stock stock = new Stock(i,"STOCK_" + i , 100.0 + i * 10);
producer.send(stock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册