提交 309d7531 编写于 作者: R Rajan 提交者: Matteo Merli

Add crc32c-checksum verification on message header-payload (#43)

上级 7892754a
......@@ -17,6 +17,7 @@ package com.yahoo.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import java.util.Date;
import java.util.Iterator;
......@@ -36,6 +37,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
......@@ -137,6 +139,11 @@ public class Consumer {
ByteBuf metadataAndPayload = entry.getDataBuffer();
// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v6.getNumber()) {
readChecksum(metadataAndPayload);
}
// stats
msgOut.recordEvent(metadataAndPayload.readableBytes());
......
......@@ -17,6 +17,8 @@ package com.yahoo.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
......@@ -29,11 +31,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.yahoo.pulsar.broker.service.Topic.PublishCallback;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.PublisherStats;
import com.yahoo.pulsar.common.util.XXHashChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
......@@ -110,7 +112,7 @@ public class Producer {
if (!verifyChecksum(headersAndPayload)) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(
Commands.newSendError(producerId, sequenceId, new Exception("Checksum failed on the broker")));
Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"));
cnx.completedSendOperation();
});
return;
......@@ -122,42 +124,28 @@ public class Producer {
}
private boolean verifyChecksum(ByteBuf headersAndPayload) {
MessageMetadata metadata = null;
int readerIndex = headersAndPayload.readerIndex();
try {
metadata = Commands.parseMessageMetadata(headersAndPayload);
if (!metadata.hasChecksum()) {
// if we do not have the checksum, we do not verify checksum and return true
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName);
}
return true;
}
if (metadata.hasCompression()) {
// if the message is compressed, we do not verify checksum and return true
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload is compressed, not verifying checksum", topic, producerName);
if (hasChecksum(headersAndPayload)) {
int checksum = readChecksum(headersAndPayload).intValue();
int readerIndex = headersAndPayload.readerIndex();
try {
long computedChecksum = computeChecksum(headersAndPayload);
if (checksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
return false;
}
return true;
}
long storedChecksum = metadata.getChecksum();
long computedChecksum = XXHashChecksum.computeChecksum(headersAndPayload);
if (storedChecksum == computedChecksum) {
return true;
} else {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName);
} finally {
headersAndPayload.readerIndex(readerIndex);
}
} catch (Throwable t) {
log.error("[{}] [{}] Failed to verify checksum", topic, producerName, t);
} finally {
headersAndPayload.readerIndex(readerIndex);
if (metadata != null) {
metadata.recycle();
} else {
// ignore if checksum is not available
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Payload does not have checksum to verify", topic, producerName);
}
return true;
}
return false;
}
private void startPublishOperation() {
......
......@@ -1028,8 +1028,7 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
assertTrue(msgInRate > 0);
}
// TODO: Re-enable once header+payload checksum changes are merged
@Test(enabled = false)
@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic1";
......@@ -1066,10 +1065,10 @@ public class PersistentTopicE2ETest extends BrokerTestBase {
}
// We should only receive msg1
Message msg = consumer.receive(10, TimeUnit.SECONDS);
Message msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-1");
while ((msg = consumer.receive(5, TimeUnit.SECONDS)) != null) {
while ((msg = consumer.receive(1, TimeUnit.SECONDS)) != null) {
assertEquals(new String(msg.getData()), "message-1");
}
}
......
......@@ -71,6 +71,7 @@ import com.yahoo.pulsar.broker.service.ServerCnx.State;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.broker.service.utils.ClientChannelHelper;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
......@@ -519,7 +520,7 @@ public class ServerCnxTest {
.setProducerName("prod-name").setSequenceId(0).build();
ByteBuf data = Unpooled.buffer(1024);
clientCommand = Commands.newSend(1, 0, 1, messageMetadata, data);
clientCommand = Commands.newSend(1, 0, 1, ChecksumType.None, messageMetadata, data);
channel.writeInbound(Unpooled.copiedBuffer(clientCommand));
clientCommand.release();
......
......@@ -967,6 +967,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
* 2. Consumer has receive size (10) and receive message without acknowledging
* 3. Consumer will stop receiving message after unAckThreshold = 500
* 4. Consumer acks messages and starts consuming remanining messages
* This testcase enables checksum sending while producing message and broker verifies the checksum for the message.
*
* @throws Exception
*/
......@@ -1496,5 +1497,42 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
}
}
@Test
public void testEnabledChecksumClient() throws Exception {
log.info("-- Starting {} test --", methodName);
final int totalMsg = 10;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);
ProducerConfiguration producerConf = new ProducerConfiguration();
final int batchMessageDelayMs = 300;
if (batchMessageDelayMs != 0) {
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS);
producerConf.setBatchingMaxMessages(5);
}
Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf);
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Message msg = null;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < totalMsg; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
}
// Acknowledge the consumption of all messages at once
consumer.acknowledgeCumulative(msg);
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
}
......@@ -15,37 +15,64 @@
*/
package com.yahoo.pulsar.client.impl;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.api.Consumer;
import com.yahoo.pulsar.client.api.Message;
import com.yahoo.pulsar.client.api.MessageBuilder;
import com.yahoo.pulsar.client.api.MessageId;
import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.impl.ProducerImpl.OpSendMsg;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.DoubleByteBuf;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ResourceLeakDetector;
public class MessageIdTest extends BrokerTestBase {
private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);
@BeforeClass
@BeforeMethod
@Override
public void setup() throws Exception {
baseSetup();
}
@AfterClass
@AfterMethod
@Override
protected void cleanup() throws Exception {
internalCleanup();
......@@ -55,7 +82,7 @@ public class MessageIdTest extends BrokerTestBase {
public void producerSendAsync() throws PulsarClientException {
// 1. Basic Config
String key = "producerSendAsync";
final String topicName = "persistent://property/cluster/namespace/topic-" + key;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String subscriptionName = "my-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int numberOfMessages = 30;
......@@ -108,7 +135,7 @@ public class MessageIdTest extends BrokerTestBase {
public void producerSend() throws PulsarClientException {
// 1. Basic Config
String key = "producerSend";
final String topicName = "persistent://property/cluster/namespace/topic-" + key;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String subscriptionName = "my-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int numberOfMessages = 30;
......@@ -143,7 +170,7 @@ public class MessageIdTest extends BrokerTestBase {
public void partitionedProducerSendAsync() throws PulsarClientException, PulsarAdminException {
// 1. Basic Config
String key = "partitionedProducerSendAsync";
final String topicName = "persistent://property/cluster/namespace/topic-" + key;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String subscriptionName = "my-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int numberOfMessages = 30;
......@@ -190,7 +217,7 @@ public class MessageIdTest extends BrokerTestBase {
public void partitionedProducerSend() throws PulsarClientException, PulsarAdminException {
// 1. Basic Config
String key = "partitionedProducerSend";
final String topicName = "persistent://property/cluster/namespace/topic-" + key;
final String topicName = "persistent://prop/cluster/namespace/topic-" + key;
final String subscriptionName = "my-subscription-" + key;
final String messagePredicate = "my-message-" + key + "-";
final int numberOfMessages = 30;
......@@ -223,4 +250,176 @@ public class MessageIdTest extends BrokerTestBase {
// it
// consumer.unsubscribe();;
}
/**
* Verifies: different versions of broker-deployment (one broker understands Checksum and other
* doesn't in that case remove checksum before sending to broker-2)
*
* client first produce message with checksum and then retries to send message due to connection unavailable. But this time, if
* broker doesn't understand checksum: then client should remove checksum from the message before sending to broker.
*
* 1. stop broker
* 2. client compute checksum and add into message
* 3. produce 2 messages and corrupt 1 message
* 4. start broker with lower version (which doesn't support checksum)
* 5. client reconnects to broker and due to incompatibility of version: removes checksum from message
* 6. broker doesn't do checksum validation and persist message
* 7. client receives ack
*
* @throws Exception
*/
@Test
public void testChecksumVersionComptability() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic1";
// 1. producer connect
Producer prod = pulsarClient.createProducer(topicName);
ProducerImpl producer = spy((ProducerImpl) prod);
// return higher version compare to broker : so, it forces client-producer to remove checksum from payload
doReturn(producer.brokerChecksumSupportedVersion() + 1).when(producer).brokerChecksumSupportedVersion();
Consumer consumer = pulsarClient.subscribe(topicName, "my-sub");
// Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're checksums
// would have already been computed. If we change the message content at that point, it should result in a
// checksum validation error
stopBroker();
// stop timer to auto-reconnect as let spy-Producer connect to broker manually so, spy-producer object can get
// mock-value from brokerChecksumSupportedVersion
((PulsarClientImpl) pulsarClient).timer().stop();
Message msg1 = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future1 = producer.sendAsync(msg1);
Message msg2 = MessageBuilder.create().setContent("message-2".getBytes()).build();
CompletableFuture<MessageId> future2 = producer.sendAsync(msg2);
// corrupt the message
msg2.getData()[msg2.getData().length - 1] = '3'; // new content would be 'message-3'
// Restart the broker to have the messages published
startBroker();
// grab broker connection with mocked producer which has higher version compare to broker
producer.grabCnx();
try {
// it should not fail: as due to unsupported version of broker: client removes checksum and broker should
// ignore the checksum validation
future1.get();
future2.get();
} catch (Exception e) {
fail("Broker shouldn't verify checksum for corrupted message and it shouldn't fail");
}
((ConsumerImpl) consumer).grabCnx();
// We should only receive msg1
Message msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-1");
msg = consumer.receive(1, TimeUnit.SECONDS);
assertEquals(new String(msg.getData()), "message-3");
}
/**
* Verifies: if message is corrupted before sending to broker and if broker gives checksum error: then
* 1. Client-Producer recomputes checksum with modified data
* 2. Retry message-send again
* 3. Broker verifies checksum
* 4. client receives send-ack success
*
* @throws Exception
*/
@Test
public void testCorruptMessageRemove() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/retry-topic";
ProducerConfiguration config = new ProducerConfiguration();
config.setSendTimeout(10, TimeUnit.MINUTES);
// 1. producer connect
Producer prod = pulsarClient.createProducer(topicName, config);
ProducerImpl producer = spy((ProducerImpl) prod);
Field producerIdField = ProducerImpl.class.getDeclaredField("producerId");
producerIdField.setAccessible(true);
long producerId = (long) producerIdField.get(producer);
producer.cnx().registerProducer(producerId, producer); // registered spy ProducerImpl
Consumer consumer = pulsarClient.subscribe(topicName, "my-sub");
// 2. Stop the broker, and publishes messages. Messages are accumulated in the producer queue and they're
// checksums
// would have already been computed. If we change the message content at that point, it should result in a
// checksum validation error
// enable checksum at producer
stopBroker();
Message msg = MessageBuilder.create().setContent("message-1".getBytes()).build();
CompletableFuture<MessageId> future = producer.sendAsync(msg);
// 3. corrupt the message
msg.getData()[msg.getData().length - 1] = '2'; // new content would be 'message-3'
// 4. Restart the broker to have the messages published
startBroker();
try {
future.get();
fail("send message should have failed with checksum excetion");
} catch (Exception e) {
if (e.getCause() instanceof PulsarClientException.ChecksumException) {
//ok (callback should get checksum exception as message was modified and corrupt)
} else {
fail("Callback should have only failed with ChecksumException", e);
}
}
// 5. Verify
// (5.1) Verify: producer's recoverChecksumError and updateChecksum invoked
verify(producer, times(1)).recoverChecksumError(any(), anyLong());
verify(producer, times(1)).verifyLocalBufferIsNotCorrupted(any());
/**
* (5.3) verify: ProducerImpl.verifyLocalBufferIsNotCorrupted() => validates if message
* is corrupt
*/
MessageImpl msg2 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
ByteBuf payload = msg2.getDataBuffer();
Builder metadataBuilder = ((MessageImpl) msg).getMessageBuilder();
MessageMetadata msgMetadata = metadataBuilder.setProducerName("test").setSequenceId(1).setPublishTime(10L)
.build();
ByteBuf cmd = Commands.newSend(producerId, 1, 1, ChecksumType.Crc32c, msgMetadata, payload);
// (a) create OpSendMsg with message-data : "message-1"
OpSendMsg op = OpSendMsg.create(((MessageImpl) msg), cmd, 1, null);
// a.verify: as message is not corrupt: no need to update checksum
assertTrue(producer.verifyLocalBufferIsNotCorrupted(op));
// (b) corrupt message
msg2.getData()[msg2.getData().length - 1] = '2'; // new content would be 'message-2'
// b. verify: as message is corrupt: update checksum
assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));
assertEquals(producer.getPendingQueueSize(), 0);
// [2] test-recoverChecksumError functionality
stopBroker();
MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
future = producer.sendAsync(msg1);
ClientCnx cnx = spy(new ClientCnx((PulsarClientImpl)pulsarClient) {});
String exc = "broker is already stopped";
// when client-try to recover checksum by resending to broker: throw exception as broker is stopped
doThrow(new IllegalStateException(exc)).when(cnx).ctx();
try {
producer.recoverChecksumError(cnx, 1);
fail("it should call : resendMessages() => which should throw above mocked exception");
}catch(IllegalStateException e) {
assertEquals(exc, e.getMessage());
}
producer.close();
consumer.close();
producer = null; // clean reference of mocked producer
}
}
......@@ -17,21 +17,25 @@ package com.yahoo.pulsar.checksum.utils;
import static com.scurrilous.circe.params.CrcParameters.CRC32C;
import java.nio.ByteBuffer;
import com.scurrilous.circe.IncrementalIntHash;
import com.scurrilous.circe.crc.Sse42Crc32C;
import com.scurrilous.circe.crc.StandardCrcProvider;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class Crc32cChecksum {
private static final Logger log = LoggerFactory.getLogger(Crc32cChecksum.class);
private final static IncrementalIntHash CRC32C_HASH;
static {
if (Sse42Crc32C.isSupported()) {
CRC32C_HASH = new Crc32cSse42Provider().getIncrementalInt(CRC32C);
if (log.isDebugEnabled()) {
log.debug("SSE4.2 CRC32C provider initialized");
}
} else {
CRC32C_HASH = new StandardCrcProvider().getIncrementalInt(CRC32C);
}
......@@ -56,6 +60,13 @@ public class Crc32cChecksum {
}
/**
* Computes incremental checksum with input previousChecksum and input payload
*
* @param previousChecksum : previously computed checksum
* @param payload
* @return
*/
public static int resumeChecksum(int previousChecksum, ByteBuf payload) {
if (payload.hasMemoryAddress() && (CRC32C_HASH instanceof Sse42Crc32C)) {
return CRC32C_HASH.resume(previousChecksum, payload.memoryAddress() + payload.readerIndex(),
......
......@@ -163,4 +163,10 @@ public class PulsarClientException extends IOException {
super(msg);
}
}
public static class ChecksumException extends PulsarClientException {
public ChecksumException(String msg) {
super(msg);
}
}
}
\ No newline at end of file
......@@ -28,7 +28,6 @@ import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.compression.CompressionCodec;
import com.yahoo.pulsar.common.compression.CompressionCodecProvider;
import com.yahoo.pulsar.common.util.XXHashChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
......@@ -114,12 +113,6 @@ class BatchMessageContainer {
return compressedPayload;
}
void setChecksum() {
checkArgument(!messageMetadata.hasChecksum());
long checksum = XXHashChecksum.computeChecksum(batchedMessageMetadataAndPayload);
messageMetadata.setChecksum(checksum);
}
PulsarApi.MessageMetadata setBatchAndBuild() {
messageMetadata.setNumMessagesInBatch(numMessagesInBatch);
if (log.isDebugEnabled()) {
......
......@@ -28,7 +28,7 @@ import com.yahoo.pulsar.client.api.Authentication;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.PulsarHandler;
import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnected;
......@@ -195,7 +195,13 @@ public class ClientCnx extends PulsarHandler {
@Override
protected void handleSendError(CommandSendError sendError) {
log.warn("{} Received send error from server: {}", ctx.channel(), sendError);
ctx.close();
if (ServerError.ChecksumError.equals(sendError.getError())) {
long producerId = sendError.getProducerId();
long sequenceId = sendError.getSequenceId();
producers.get(producerId).recoverChecksumError(this, sequenceId);
} else {
ctx.close();
}
}
@Override
......@@ -204,7 +210,7 @@ public class ClientCnx extends PulsarHandler {
log.warn("{} Received error from server: {}", ctx.channel(), error.getMessage());
long requestId = error.getRequestId();
if (error.getError() == PulsarApi.ServerError.ProducerBlockedQuotaExceededError) {
if (error.getError() == ServerError.ProducerBlockedQuotaExceededError) {
log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic",
ctx.channel());
}
......
......@@ -52,13 +52,15 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.compression.CompressionCodec;
import com.yahoo.pulsar.common.compression.CompressionCodecProvider;
import com.yahoo.pulsar.common.util.XXHashChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
public class ConsumerImpl extends ConsumerBase {
......@@ -584,6 +586,13 @@ public class ConsumerImpl extends ConsumerBase {
MessageMetadata msgMetadata = null;
ByteBuf payload = headersAndPayload;
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
try {
msgMetadata = Commands.parseMessageMetadata(payload);
} catch (Throwable t) {
......@@ -597,11 +606,6 @@ public class ConsumerImpl extends ConsumerBase {
return;
}
if (!verifyChecksum(messageId, msgMetadata, uncompressedPayload, cnx)) {
// Message discarded for checksum error
return;
}
final int numMessages = msgMetadata.getNumMessagesInBatch();
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
......@@ -810,28 +814,21 @@ public class ConsumerImpl extends ConsumerBase {
}
}
private boolean verifyChecksum(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
ClientCnx currentCnx) {
if (!msgMetadata.hasChecksum()) {
// No checksum to validate
return true;
}
long storedChecksum = msgMetadata.getChecksum();
long computedChecksum = XXHashChecksum.computeChecksum(payload);
private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) {
if (storedChecksum == computedChecksum) {
return true;
} else {
log.error(
"[{}][{}] Checksum mismatch for message at {}:{}. Received content:\n{}"
+ "\nReceived checksum: 0x{} -- Computed checksum: 0x{}",
topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
ByteBufUtil.prettyHexDump(payload), Long.toHexString(storedChecksum),
Long.toHexString(computedChecksum));
discardCorruptedMessage(messageId, currentCnx, ValidationError.ChecksumMismatch);
return false;
if(hasChecksum(headersAndPayload)) {
int checksum = readChecksum(headersAndPayload).intValue();
int computedChecksum = computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
"[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{}, Computed checksum: 0x{}",
topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
Long.toHexString(checksum), Integer.toHexString(computedChecksum));
return false;
}
}
return true;
}
private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
......
......@@ -16,7 +16,10 @@
package com.yahoo.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
......@@ -38,11 +41,13 @@ import com.yahoo.pulsar.client.api.Producer;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.DoubleByteBuf;
import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.compression.CompressionCodec;
import com.yahoo.pulsar.common.compression.CompressionCodecProvider;
import com.yahoo.pulsar.common.util.XXHashChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
......@@ -50,6 +55,8 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
public class ProducerImpl extends ProducerBase implements TimerTask {
......@@ -183,10 +190,6 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
MessageMetadata.Builder msgMetadata = msg.getMessageBuilder();
ByteBuf payload = msg.getDataBuffer();
if (!msgMetadata.hasChecksum()) {
msgMetadata.setChecksum(XXHashChecksum.computeChecksum(payload));
}
// If compression is enabled, we are compressing, otherwise it will simply use the same buffer
int uncompressedSize = payload.readableBytes();
ByteBuf compressedPayload = payload;
......@@ -233,7 +236,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
doBatchSendAndAdd(msg, callback, payload);
}
} else {
ByteBuf cmd = Commands.newSend(producerId, sequenceId, 1, msgMetadata.build(), compressedPayload);
ByteBuf cmd = sendMessage(producerId, sequenceId, 1, msgMetadata.build(), compressedPayload);
msgMetadata.recycle();
final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
......@@ -266,6 +269,18 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
}
private ByteBuf sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata,
ByteBuf compressedPayload) throws IOException {
ChecksumType checksumType;
if (clientCnx.get() == null
|| clientCnx.get().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) {
checksumType = ChecksumType.Crc32c;
} else {
checksumType = ChecksumType.None;
}
return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload);
}
private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", topic, producerName,
......@@ -484,7 +499,102 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
}
private static final class OpSendMsg {
/**
* Checks message checksum to retry if message was corrupted while sending to broker. Recomputes checksum of the
* message header-payload again.
* <ul>
* <li><b>if matches with existing checksum</b>: it means message was corrupt while sending to broker. So, resend message</li>
* <li><b>if doesn't match with existing checksum</b>: it means message is already corrupt and can't retry again. So, fail
* send-message by failing callback</li>
* </ul>
*
* @param cnx
* @param sequenceId
*/
protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) {
OpSendMsg op = pendingMessages.peek();
if (op == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Got send failure for timed out msg {}", topic, producerName, sequenceId);
}
} else {
long expectedSequenceId = op.sequenceId;
if (sequenceId == expectedSequenceId) {
boolean corrupted = !verifyLocalBufferIsNotCorrupted(op);
if (corrupted) {
// remove message from pendingMessages queue and fail callback
pendingMessages.remove();
semaphore.release(op.numMessagesInBatch);
try {
op.callback.sendComplete(
new PulsarClientException.ChecksumException("Checksum failded on corrupt message"));
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName,
sequenceId, t);
}
ReferenceCountUtil.safeRelease(op.cmd);
op.recycle();
return;
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", topic,
producerName, sequenceId);
}
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Corrupt message is already timed out {}", topic, producerName, sequenceId);
}
}
}
// as msg is not corrupted : let producer resend pending-messages again including checksum failed message
resendMessages(cnx);
}
/**
* Computes checksum again and verifies it against existing checksum. If checksum doesn't match it means that
* message is corrupt.
*
* @param op
* @return returns true only if message is not modified and computed-checksum is same as previous checksum else
* return false that means that message is corrupted. Returns true if checksum is not present.
*/
protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
DoubleByteBuf msg = getDoubleByteBuf(op.cmd);
if (msg != null) {
ByteBuf headerFrame = msg.getFirst();
msg.markReaderIndex();
headerFrame.markReaderIndex();
try {
// skip bytes up to checksum index
headerFrame.skipBytes(4); // skip [total-size]
int cmdSize = (int) headerFrame.readUnsignedInt();
headerFrame.skipBytes(cmdSize);
// verify if checksum present
if (hasChecksum(headerFrame)) {
int checksum = readChecksum(headerFrame).intValue();
// msg.readerIndex is already at header-payload index, Recompute checksum for headers-payload
int metadataChecksum = computeChecksum(headerFrame);
long computedChecksum = resumeChecksum(metadataChecksum, msg.getSecond());
return checksum == computedChecksum;
} else {
log.warn("[{}] [{}] checksum is not present into message with id {}", topic, producerName,
op.sequenceId);
}
} finally {
headerFrame.resetReaderIndex();
msg.resetReaderIndex();
}
return true;
} else {
log.warn("[{}] Failed while casting {} into DoubleByteBuf", producerName, op.cmd.getClass().getName());
return false;
}
}
protected static final class OpSendMsg {
MessageImpl msg;
List<MessageImpl> msgs;
ByteBuf cmd;
......@@ -681,6 +791,10 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
log.info("[{}] [{}] Re-Sending {} messages to server", topic, producerName, messagesToResend);
for (OpSendMsg op : pendingMessages) {
if (cnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion()) {
stripChecksum(op);
}
op.cmd.retain();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", topic, producerName,
......@@ -701,6 +815,60 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
});
}
/**
* Strips checksum from {@link OpSendMsg} command if present else ignore it.
*
* @param op
*/
private void stripChecksum(OpSendMsg op) {
op.cmd.markReaderIndex();
int totalMsgBufSize = op.cmd.readableBytes();
DoubleByteBuf msg = getDoubleByteBuf(op.cmd);
if (msg != null) {
ByteBuf headerFrame = msg.getFirst();
ByteBuf payloadFrame = msg.getSecond();
msg.markReaderIndex();
headerFrame.markReaderIndex();
payloadFrame.markReaderIndex();
try {
headerFrame.skipBytes(4); // skip [total-size]
int cmdSize = (int) headerFrame.readUnsignedInt();
// verify if checksum present
headerFrame.skipBytes(cmdSize);
if (!hasChecksum(headerFrame)) {
return;
}
int headerSize = 4 + 4 + cmdSize; // [total-size] [cmd-length] [cmd-size]
int checksumSize = 4 + 2; // [magic-number] [checksum-size]
int checksumMark = (headerSize + checksumSize); // [header-size] [checksum-size]
int metaPayloadSize = (totalMsgBufSize - checksumMark); // metadataPayload = totalSize - checksumMark
int newTotalFrameSizeLength = 4 + cmdSize + metaPayloadSize; // new total-size without checksum
headerFrame.resetReaderIndex();
int headerFrameSize = headerFrame.readableBytes();
headerFrame.setInt(0, newTotalFrameSizeLength); // rewrite new [total-size]
ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark); // sliced only
// metadata
headerFrame.writerIndex(headerSize); // set headerFrame write-index to overwrite metadata over checksum
metadata.readBytes(headerFrame, metadata.readableBytes());
headerFrame.capacity(headerFrameSize - checksumSize); // reduce capacity by removed checksum bytes
headerFrame.resetReaderIndex();
} finally {
op.cmd.resetReaderIndex();
}
} else {
log.warn("[{}] Failed while casting {} into DoubleByteBuf", producerName, op.cmd.getClass().getName());
}
}
public int brokerChecksumSupportedVersion() {
return ProtocolVersion.v6.getNumber();
}
@Override
String getHandlerName() {
return producerName;
......@@ -836,11 +1004,9 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
try {
if (!batchMessageContainer.isEmpty()) {
numMessagesInBatch = batchMessageContainer.numMessagesInBatch;
// checksum is on uncompressed payload for batch
batchMessageContainer.setChecksum();
ByteBuf compressedPayload = batchMessageContainer.getCompressedBatchMetadataAndPayload();
long sequenceId = batchMessageContainer.sequenceId;
ByteBuf cmd = Commands.newSend(producerId, sequenceId, batchMessageContainer.numMessagesInBatch,
ByteBuf cmd = sendMessage(producerId, sequenceId, batchMessageContainer.numMessagesInBatch,
batchMessageContainer.setBatchAndBuild(), compressedPayload);
op = OpSendMsg.create(batchMessageContainer.messages, cmd, sequenceId,
......@@ -881,6 +1047,30 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
}
/**
* Casts input cmd to {@link DoubleByteBuf}
*
* Incase if leak-detection level is enabled: pulsar instruments {@link DoubleByteBuf} into LeakAwareByteBuf (type of {@link io.netty.buffer.WrappedByteBuf})
* So, this method casts input cmd to {@link DoubleByteBuf} else retrieves it from LeakAwareByteBuf.
*
* @param cmd
* @return DoubleByteBuf or null in case failed to cast input {@link ByteBuf}
*/
private DoubleByteBuf getDoubleByteBuf(ByteBuf cmd) {
DoubleByteBuf msg = null;
if (cmd instanceof DoubleByteBuf) {
msg = (DoubleByteBuf) cmd;
} else {
try {
msg = (DoubleByteBuf) cmd.unwrap();
} catch (Exception e) {
log.error("[{}] Failed while casting {} into DoubleByteBuf", producerName, cmd.getClass().getName(),
e);
}
}
return msg;
}
public long getDelayInMillis() {
OpSendMsg firstMsg = pendingMessages.peek();
if (firstMsg != null) {
......
......@@ -66,7 +66,6 @@
<groupId>com.yahoo.pulsar</groupId>
<artifactId>pulsar-checksum</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
......@@ -18,10 +18,15 @@ package com.yahoo.pulsar.common.api;
import java.io.IOException;
import com.google.protobuf.ByteString;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
import com.yahoo.pulsar.common.api.proto.PulsarApi;
import com.yahoo.pulsar.common.api.proto.PulsarApi.AuthMethod;
import com.yahoo.pulsar.common.api.proto.PulsarApi.BaseCommand;
import com.yahoo.pulsar.common.api.proto.PulsarApi.BaseCommand.Type;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseConsumer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandConnect;
......@@ -38,16 +43,13 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSend;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSendError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSendReceipt;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.BaseCommand.Type;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
......@@ -60,6 +62,10 @@ import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
public class Commands {
public static final short magicCrc32c = 0x0e01;
private static final int checksumSize = 4;
public static ByteBuf newConnect(String authMethodName, String authData) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion());
}
......@@ -184,20 +190,41 @@ public class Commands {
}
public static ByteBuf newSendError(long producerId, long sequenceId, Throwable t) {
return newSendError(producerId, sequenceId, ServerError.PersistenceError, t.getMessage());
}
public static ByteBuf newSendError(long producerId, long sequenceId, ServerError error, String errorMsg) {
CommandSendError.Builder sendErrorBuilder = CommandSendError.newBuilder();
sendErrorBuilder.setProducerId(producerId);
sendErrorBuilder.setSequenceId(sequenceId);
sendErrorBuilder.setError(ServerError.PersistenceError);
sendErrorBuilder.setMessage(t.getMessage());
sendErrorBuilder.setError(error);
sendErrorBuilder.setMessage(errorMsg);
CommandSendError sendError = sendErrorBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SEND_ERROR).setSendError(sendError));
sendErrorBuilder.recycle();
sendError.recycle();
return res;
}
public static boolean hasChecksum(ByteBuf buffer) {
return buffer.getShort(buffer.readerIndex()) == magicCrc32c;
}
public static Long readChecksum(ByteBuf buffer) {
if(hasChecksum(buffer)) {
buffer.skipBytes(2); //skip magic bytes
return buffer.readUnsignedInt();
} else{
return null;
}
}
public static MessageMetadata parseMessageMetadata(ByteBuf buffer) {
try {
// initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata to parse
// metadata
readChecksum(buffer);
int metadataSize = (int) buffer.readUnsignedInt();
int writerIndex = buffer.writerIndex();
......@@ -230,8 +257,8 @@ public class Commands {
return res;
}
public static ByteBuf newSend(long producerId, long sequenceId, int numMessages, MessageMetadata messageData,
ByteBuf payload) {
public static ByteBuf newSend(long producerId, long sequenceId, int numMessages, ChecksumType checksumType,
MessageMetadata messageData, ByteBuf payload) {
CommandSend.Builder sendBuilder = CommandSend.newBuilder();
sendBuilder.setProducerId(producerId);
sendBuilder.setSequenceId(sequenceId);
......@@ -241,7 +268,7 @@ public class Commands {
CommandSend send = sendBuilder.build();
ByteBuf res = serializeCommandSendWithSize(BaseCommand.newBuilder().setType(Type.SEND).setSend(send),
messageData, payload);
checksumType, messageData, payload);
send.recycle();
sendBuilder.recycle();
return res;
......@@ -406,18 +433,24 @@ public class Commands {
return buf;
}
private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuilder, MessageMetadata msgMetadata,
private static ByteBuf serializeCommandSendWithSize(BaseCommand.Builder cmdBuilder, ChecksumType checksumType, MessageMetadata msgMetadata,
ByteBuf payload) {
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [METADATA_SIZE][METADATA] [PAYLOAD]
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
BaseCommand cmd = cmdBuilder.build();
int cmdSize = cmd.getSerializedSize();
int msgMetadataSize = msgMetadata.getSerializedSize();
int payloadSize = payload.readableBytes();
int totalSize = 4 + cmdSize + 4 + msgMetadataSize + payloadSize;
int headersSize = 4 + 4 + cmdSize + 4 + msgMetadataSize;
int magicAndChecksumLength = ChecksumType.Crc32c.equals(checksumType) ? (2 + 4 /* magic + checksumLength*/) : 0;
boolean includeChecksum = magicAndChecksumLength > 0;
int headerContentSize = 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize; // cmdLength + cmdSize + magicLength +
// checksumSize + msgMetadataLength +
// msgMetadataSize
int totalSize = headerContentSize + payloadSize;
int headersSize = 4 + headerContentSize; // totalSize + headerLength
int checksumReaderIndex = -1;
ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
headers.writeInt(totalSize); // External frame
......@@ -429,18 +462,36 @@ public class Commands {
cmd.writeTo(outStream);
cmd.recycle();
cmdBuilder.recycle();
//Create checksum placeholder
if (includeChecksum) {
headers.writeShort(magicCrc32c);
checksumReaderIndex = headers.writerIndex();
headers.writerIndex(headers.writerIndex() + checksumSize); //skip 4 bytes of checksum
}
// Write metadata
headers.writeInt(msgMetadataSize);
msgMetadata.writeTo(outStream);
outStream.recycle();
} catch (IOException e) {
// This is in-memory serialization, should not fail
throw new RuntimeException(e);
}
return DoubleByteBuf.get(headers, payload);
ByteBuf command = DoubleByteBuf.get(headers, payload);
// write checksum at created checksum-placeholder
if (includeChecksum) {
headers.markReaderIndex();
headers.readerIndex(checksumReaderIndex + checksumSize);
int metadataChecksum = computeChecksum(headers);
int computedChecksum = resumeChecksum(metadataChecksum, payload);
// set computed checksum
headers.setInt(checksumReaderIndex, computedChecksum);
headers.resetReaderIndex();
}
return command;
}
public static long initBatchMessageMetadata(PulsarApi.MessageMetadata.Builder messageMetadata,
......@@ -566,4 +617,8 @@ public class Commands {
}
}
public static enum ChecksumType {
Crc32c,
None;
}
}
......@@ -87,6 +87,14 @@ public final class DoubleByteBuf extends AbstractReferenceCountedByteBuf {
return toLeakAwareBuffer(buf);
}
public ByteBuf getFirst() {
return b1;
}
public ByteBuf getSecond() {
return b2;
}
@Override
public boolean isDirect() {
return b1.isDirect() && b2.isDirect();
......
......@@ -63,6 +63,7 @@ public final class PulsarApi {
ServiceNotReady(6, 6),
ProducerBlockedQuotaExceededError(7, 7),
ProducerBlockedQuotaExceededException(8, 8),
ChecksumError(9, 9),
;
public static final int UnknownError_VALUE = 0;
......@@ -74,6 +75,7 @@ public final class PulsarApi {
public static final int ServiceNotReady_VALUE = 6;
public static final int ProducerBlockedQuotaExceededError_VALUE = 7;
public static final int ProducerBlockedQuotaExceededException_VALUE = 8;
public static final int ChecksumError_VALUE = 9;
public final int getNumber() { return value; }
......@@ -89,6 +91,7 @@ public final class PulsarApi {
case 6: return ServiceNotReady;
case 7: return ProducerBlockedQuotaExceededError;
case 8: return ProducerBlockedQuotaExceededException;
case 9: return ChecksumError;
default: return null;
}
}
......@@ -166,6 +169,7 @@ public final class PulsarApi {
v3(3, 3),
v4(4, 4),
v5(5, 5),
v6(6, 6),
;
public static final int v0_VALUE = 0;
......@@ -174,6 +178,7 @@ public final class PulsarApi {
public static final int v3_VALUE = 3;
public static final int v4_VALUE = 4;
public static final int v5_VALUE = 5;
public static final int v6_VALUE = 6;
public final int getNumber() { return value; }
......@@ -186,6 +191,7 @@ public final class PulsarApi {
case 3: return v3;
case 4: return v4;
case 5: return v5;
case 6: return v6;
default: return null;
}
}
......@@ -1227,10 +1233,6 @@ public final class PulsarApi {
boolean hasUncompressedSize();
int getUncompressedSize();
// optional sfixed64 checksum = 10;
boolean hasChecksum();
long getChecksum();
// optional int32 num_messages_in_batch = 11 [default = 1];
boolean hasNumMessagesInBatch();
int getNumMessagesInBatch();
......@@ -1441,21 +1443,11 @@ public final class PulsarApi {
return uncompressedSize_;
}
// optional sfixed64 checksum = 10;
public static final int CHECKSUM_FIELD_NUMBER = 10;
private long checksum_;
public boolean hasChecksum() {
return ((bitField0_ & 0x00000080) == 0x00000080);
}
public long getChecksum() {
return checksum_;
}
// optional int32 num_messages_in_batch = 11 [default = 1];
public static final int NUM_MESSAGES_IN_BATCH_FIELD_NUMBER = 11;
private int numMessagesInBatch_;
public boolean hasNumMessagesInBatch() {
return ((bitField0_ & 0x00000100) == 0x00000100);
return ((bitField0_ & 0x00000080) == 0x00000080);
}
public int getNumMessagesInBatch() {
return numMessagesInBatch_;
......@@ -1471,7 +1463,6 @@ public final class PulsarApi {
replicateTo_ = com.google.protobuf.LazyStringArrayList.EMPTY;
compression_ = com.yahoo.pulsar.common.api.proto.PulsarApi.CompressionType.NONE;
uncompressedSize_ = 0;
checksum_ = 0L;
numMessagesInBatch_ = 1;
}
private byte memoizedIsInitialized = -1;
......@@ -1537,9 +1528,6 @@ public final class PulsarApi {
output.writeUInt32(9, uncompressedSize_);
}
if (((bitField0_ & 0x00000080) == 0x00000080)) {
output.writeSFixed64(10, checksum_);
}
if (((bitField0_ & 0x00000100) == 0x00000100)) {
output.writeInt32(11, numMessagesInBatch_);
}
}
......@@ -1592,10 +1580,6 @@ public final class PulsarApi {
.computeUInt32Size(9, uncompressedSize_);
}
if (((bitField0_ & 0x00000080) == 0x00000080)) {
size += com.google.protobuf.CodedOutputStream
.computeSFixed64Size(10, checksum_);
}
if (((bitField0_ & 0x00000100) == 0x00000100)) {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(11, numMessagesInBatch_);
}
......@@ -1730,10 +1714,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00000080);
uncompressedSize_ = 0;
bitField0_ = (bitField0_ & ~0x00000100);
checksum_ = 0L;
bitField0_ = (bitField0_ & ~0x00000200);
numMessagesInBatch_ = 1;
bitField0_ = (bitField0_ & ~0x00000400);
bitField0_ = (bitField0_ & ~0x00000200);
return this;
}
......@@ -1809,10 +1791,6 @@ public final class PulsarApi {
if (((from_bitField0_ & 0x00000200) == 0x00000200)) {
to_bitField0_ |= 0x00000080;
}
result.checksum_ = checksum_;
if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
to_bitField0_ |= 0x00000100;
}
result.numMessagesInBatch_ = numMessagesInBatch_;
result.bitField0_ = to_bitField0_;
return result;
......@@ -1861,9 +1839,6 @@ public final class PulsarApi {
if (other.hasUncompressedSize()) {
setUncompressedSize(other.getUncompressedSize());
}
if (other.hasChecksum()) {
setChecksum(other.getChecksum());
}
if (other.hasNumMessagesInBatch()) {
setNumMessagesInBatch(other.getNumMessagesInBatch());
}
......@@ -1964,13 +1939,8 @@ public final class PulsarApi {
uncompressedSize_ = input.readUInt32();
break;
}
case 81: {
bitField0_ |= 0x00000200;
checksum_ = input.readSFixed64();
break;
}
case 88: {
bitField0_ |= 0x00000400;
bitField0_ |= 0x00000200;
numMessagesInBatch_ = input.readInt32();
break;
}
......@@ -2320,43 +2290,22 @@ public final class PulsarApi {
return this;
}
// optional sfixed64 checksum = 10;
private long checksum_ ;
public boolean hasChecksum() {
return ((bitField0_ & 0x00000200) == 0x00000200);
}
public long getChecksum() {
return checksum_;
}
public Builder setChecksum(long value) {
bitField0_ |= 0x00000200;
checksum_ = value;
return this;
}
public Builder clearChecksum() {
bitField0_ = (bitField0_ & ~0x00000200);
checksum_ = 0L;
return this;
}
// optional int32 num_messages_in_batch = 11 [default = 1];
private int numMessagesInBatch_ = 1;
public boolean hasNumMessagesInBatch() {
return ((bitField0_ & 0x00000400) == 0x00000400);
return ((bitField0_ & 0x00000200) == 0x00000200);
}
public int getNumMessagesInBatch() {
return numMessagesInBatch_;
}
public Builder setNumMessagesInBatch(int value) {
bitField0_ |= 0x00000400;
bitField0_ |= 0x00000200;
numMessagesInBatch_ = value;
return this;
}
public Builder clearNumMessagesInBatch() {
bitField0_ = (bitField0_ & ~0x00000400);
bitField0_ = (bitField0_ & ~0x00000200);
numMessagesInBatch_ = 1;
return this;
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.common.util;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import net.jpountz.xxhash.XXHash64;
import net.jpountz.xxhash.XXHashFactory;
public class XXHashChecksum {
private static final XXHash64 checksum = XXHashFactory.fastestInstance().hash64();
public static long computeChecksum(ByteBuf payload) {
if (payload.hasArray()) {
return checksum.hash(payload.array(), payload.arrayOffset() + payload.readerIndex(),
payload.readableBytes(), 0L);
} else {
ByteBuffer payloadNio = payload.nioBuffer(payload.readerIndex(), payload.readableBytes());
return checksum.hash(payloadNio, 0, payload.readableBytes(), 0L);
}
}
}
......@@ -50,8 +50,9 @@ message MessageMetadata {
repeated string replicate_to = 7;
optional CompressionType compression = 8 [default = NONE];
optional uint32 uncompressed_size = 9 [default = 0];
// XXHash64 checksum of the original message payload
optional sfixed64 checksum = 10;
// Removed below checksum field from Metadata as
// it should be part of send-command which keeps checksum of header + payload
//optional sfixed64 checksum = 10;
// differentiate single and batch message metadata
optional int32 num_messages_in_batch = 11 [default = 1];
}
......@@ -75,6 +76,7 @@ enum ServerError {
ServiceNotReady = 6; // Any error that requires client retry operation with a fresh lookup
ProducerBlockedQuotaExceededError = 7; // Unable to create producer because backlog quota exceeded
ProducerBlockedQuotaExceededException = 8; // Exception while creating producer because quota exceeded
ChecksumError = 9; // Error while verifying message checksum
}
enum AuthMethod {
......@@ -92,6 +94,7 @@ enum ProtocolVersion {
v3 = 3; // Added compression with LZ4 and ZLib
v4 = 4; // Added batch message support
v5 = 5; // Added disconnect client w/o closing connection
v6 = 6; // Added checksum computation for metadata + payload
}
message CommandConnect {
......
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.common.compression;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.testng.annotations.Test;
import com.yahoo.pulsar.checksum.utils.Crc32cChecksum;
import com.yahoo.pulsar.common.api.Commands;
import com.yahoo.pulsar.common.api.Commands.ChecksumType;
import com.yahoo.pulsar.common.api.DoubleByteBuf;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
public class CommandsTest {
@Test
public void testChecksumSendCommand() throws Exception {
// test checksum in send command
String producerName = "prod-name";
int sequenceId = 0;
ByteBuf data = Unpooled.buffer(1024);
MessageMetadata messageMetadata = MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis())
.setProducerName(producerName).setSequenceId(sequenceId).build();
int expectedChecksum = computeChecksum(messageMetadata, data);
ByteBuf clientCommand = Commands.newSend(1, 0, 1, ChecksumType.Crc32c, messageMetadata, data);
clientCommand.retain();
ByteBuffer inputBytes = clientCommand.nioBuffer();
ByteBuf receivedBuf = Unpooled.wrappedBuffer(inputBytes);
receivedBuf.skipBytes(4); //skip [total-size]
int cmdSize = (int) receivedBuf.readUnsignedInt();
receivedBuf.readerIndex(8 + cmdSize);
int startMessagePos = receivedBuf.readerIndex();
/*** 1. verify checksum and metadataParsing ***/
boolean hasChecksum = Commands.hasChecksum(receivedBuf);
int checksum = Commands.readChecksum(receivedBuf).intValue();
// verify checksum is present
assertTrue(hasChecksum);
// verify checksum value
assertEquals(expectedChecksum, checksum);
MessageMetadata metadata = Commands.parseMessageMetadata(receivedBuf);
// verify metadata parsing
assertEquals(metadata.getProducerName(), producerName);
/** 2. parseMessageMetadata should skip checksum if present **/
receivedBuf.readerIndex(startMessagePos);
metadata = Commands.parseMessageMetadata(receivedBuf);
// verify metadata parsing
assertEquals(metadata.getProducerName(), producerName);
}
private int computeChecksum(MessageMetadata msgMetadata, ByteBuf compressedPayload) throws IOException {
int metadataSize = msgMetadata.getSerializedSize();
int metadataFrameSize = 4 + metadataSize;
ByteBuf metaPayloadFrame = PooledByteBufAllocator.DEFAULT.buffer(metadataFrameSize, metadataFrameSize);
ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(metaPayloadFrame);
metaPayloadFrame.writeInt(metadataSize);
msgMetadata.writeTo(outStream);
ByteBuf payload = compressedPayload.copy();
ByteBuf metaPayloadBuf = DoubleByteBuf.get(metaPayloadFrame, payload);
int computedChecksum = Crc32cChecksum.computeChecksum(metaPayloadBuf);
outStream.recycle();
metaPayloadBuf.release();
return computedChecksum;
}
}
......@@ -148,7 +148,6 @@ public class Crc32cChecksumTest {
incrementalChecksum = Crc32cChecksum.resumeChecksum(checksum, payload);
assertEquals(expectedChecksum, incrementalChecksum);
payload.release();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册