提交 e7f08a83 编写于 作者: R Rajan 提交者: GitHub

Fix: Reset message-header index while verify checksum-strip at Reconnection (#89) (#90)

上级 1868d2ff
......@@ -321,6 +321,70 @@ public class MessageIdTest extends BrokerTestBase {
assertEquals(new String(msg.getData()), "message-3");
}
@Test
public void testChecksumReconnection() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic1";
// 1. producer connect
Producer prod = pulsarClient.createProducer(topicName);
ProducerImpl producer = spy((ProducerImpl) prod);
// mock: broker-doesn't support checksum (remote_version < brokerChecksumSupportedVersion) so, it forces
// client-producer to perform checksum-strip from msg at reconnection
doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion();
Consumer consumer = pulsarClient.subscribe(topicName, "my-sub");
stopBroker();
// stop timer to auto-reconnect as let spy-Producer connect to broker
// manually so, spy-producer object can get
// mock-value from brokerChecksumSupportedVersion
((PulsarClientImpl) pulsarClient).timer().stop();
// set clientCnx mock to get non-checksum supported version
ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
producer.clientCnx.set(mockClientCnx);
Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future1 = producer.sendAsync(msg1);
Message msg2 = MessageBuilder.create().setContent("message-2".getBytes()).build();
CompletableFuture<MessageId> future2 = producer.sendAsync(msg2);
// corrupt the message
msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be
// 'message-3'
// unset mock
producer.clientCnx.set(null);
// Restart the broker to have the messages published
startBroker();
// grab broker connection with mocked producer which has higher version
// compare to broker
producer.grabCnx();
try {
// it should not fail: as due to unsupported version of broker:
// client removes checksum and broker should
// ignore the checksum validation
future1.get(1, TimeUnit.SECONDS);
future2.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail");
}
((ConsumerImpl) consumer).grabCnx();
// We should only receive msg1
Message msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-1");
msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-3");
}
/**
* Verifies: if message is corrupted before sending to broker and if broker gives checksum error: then
......
......@@ -791,9 +791,10 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend);
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
for (OpSendMsg op : pendingMessages) {
if (cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion()) {
if (stripChecksum) {
stripChecksum(op);
}
op.cmd.retain();
......@@ -827,10 +828,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
DoubleByteBuf msg = getDoubleByteBuf(op.cmd);
if (msg != null) {
ByteBuf headerFrame = msg.getFirst();
ByteBuf payloadFrame = msg.getSecond();
msg.markReaderIndex();
headerFrame.markReaderIndex();
payloadFrame.markReaderIndex();
try {
headerFrame.skipBytes(4); // skip [total-size]
int cmdSize = (int) headerFrame.readUnsignedInt();
......@@ -839,6 +838,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
headerFrame.skipBytes(cmdSize);
if (!hasChecksum(headerFrame)) {
headerFrame.resetReaderIndex();
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册