diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index d33763847f94e779218e3346af24ce116bb830c1..5499c90066007db210b55d098acc35bb0c7972ed 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store; import java.nio.ByteBuffer; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLog.PutMessageContext; /** * Write messages callback interface @@ -30,7 +31,7 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBrokerInner msg); + final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext putMessageContext); /** * After batched message serialization, write MapedByteBuffer @@ -39,5 +40,5 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBatch messageExtBatch); + final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext); } diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index d6d1aa6a31c829cb26285feafa1aee8486320015..de3c03b307f9e2ec8a32512c78cd9b93dd51db16 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.store; +import java.util.function.Supplier; + /** * When write a message to the commit log, returns results */ @@ -28,6 +30,7 @@ public class AppendMessageResult { private int wroteBytes; // Message ID private String msgId; + private Supplier msgIdSupplier; // Message storage timestamp private long storeTimestamp; // Consume queue's offset(step by one) @@ -51,6 +54,17 @@ public class AppendMessageResult { this.pagecacheRT = pagecacheRT; } + public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier msgIdSupplier, + long storeTimestamp, long logicsOffset, long pagecacheRT) { + this.status = status; + this.wroteOffset = wroteOffset; + this.wroteBytes = wroteBytes; + this.msgIdSupplier = msgIdSupplier; + this.storeTimestamp = storeTimestamp; + this.logicsOffset = logicsOffset; + this.pagecacheRT = pagecacheRT; + } + public long getPagecacheRT() { return pagecacheRT; } @@ -88,6 +102,9 @@ public class AppendMessageResult { } public String getMsgId() { + if (msgId == null && msgIdSupplier != null) { + msgId = msgIdSupplier.get(); + } return msgId; } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 57fa3637856380a2bb03767d165ad811880bea68..5e92654aceae912bc9a013f876a7400d0119bae9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -16,17 +16,18 @@ */ package org.apache.rocketmq.store; +import java.net.Inet4Address; import java.net.Inet6Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; @@ -62,7 +63,7 @@ public class CommitLog { private final FlushCommitLogService commitLogService; private final AppendMessageCallback appendMessageCallback; - private final ThreadLocal batchEncoderThreadLocal; + private final ThreadLocal putMessageThreadLocal; protected HashMap topicQueueTable = new HashMap(1024); protected volatile long confirmOffset = -1L; @@ -84,10 +85,10 @@ public class CommitLog { this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); - batchEncoderThreadLocal = new ThreadLocal() { + putMessageThreadLocal = new ThreadLocal() { @Override - protected MessageExtBatchEncoder initialValue() { - return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + protected PutMessageThreadLocal initialValue() { + return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); @@ -555,6 +556,14 @@ public class CommitLog { return beginTimeInLock; } + private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) { + keyBuilder.setLength(0); + keyBuilder.append(messageExt.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(messageExt.getQueueId()); + return keyBuilder.toString(); + } + public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); @@ -591,12 +600,30 @@ public class CommitLog { } } + InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + msg.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + msg.setStoreHostAddressV6Flag(); + } + + PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); + PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); + if (encodeResult != null) { + return CompletableFuture.completedFuture(encodeResult); + } + msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer); + PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg)); + long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -613,7 +640,7 @@ public class CommitLog { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } - result = mappedFile.appendMessage(msg, this.appendMessageCallback); + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break; @@ -627,7 +654,7 @@ public class CommitLog { beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } - result = mappedFile.appendMessage(msg, this.appendMessageCallback); + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: @@ -693,14 +720,26 @@ public class CommitLog { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); } + InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setStoreHostAddressV6Flag(); + } + long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //fine-grained lock instead of the coarse-grained - MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); + PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get(); + MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder(); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch)); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); putMessageLock.lock(); try { @@ -720,7 +759,7 @@ public class CommitLog { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break; @@ -734,7 +773,7 @@ public class CommitLog { beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: @@ -784,129 +823,6 @@ public class CommitLog { } - public PutMessageResult putMessage(final MessageExtBrokerInner msg) { - // Set the storage time - msg.setStoreTimestamp(System.currentTimeMillis()); - // Set the message body BODY CRC (consider the most appropriate setting - // on the client) - msg.setBodyCRC(UtilAll.crc32(msg.getBody())); - // Back to Results - AppendMessageResult result = null; - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - String topic = msg.getTopic(); - int queueId = msg.getQueueId(); - - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE - || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - // Delay Delivery - if (msg.getDelayTimeLevel() > 0) { - if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { - msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); - } - - topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; - queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); - - // Backup real topic, queueId - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); - msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); - - msg.setTopic(topic); - msg.setQueueId(queueId); - } - } - - InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); - if (bornSocketAddress.getAddress() instanceof Inet6Address) { - msg.setBornHostV6Flag(); - } - - InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); - if (storeSocketAddress.getAddress() instanceof Inet6Address) { - msg.setStoreHostAddressV6Flag(); - } - - long elapsedTimeInLock = 0; - - MappedFile unlockMappedFile = null; - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; - - // Here settings are stored timestamp, in order to ensure an orderly - // global - msg.setStoreTimestamp(beginLockTimestamp); - - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); - } - - result = mappedFile.appendMessage(msg, this.appendMessageCallback); - switch (result.getStatus()) { - case PUT_OK: - break; - case END_OF_FILE: - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); - } - result = mappedFile.appendMessage(msg, this.appendMessageCallback); - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); - case UNKNOWN_ERROR: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - default: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); - } - - if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { - this.defaultMessageStore.unlockMappedFile(unlockMappedFile); - } - - PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); - - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); - - handleDiskFlush(result, putMessageResult, msg); - handleHA(result, putMessageResult, msg); - - return putMessageResult; - } - public CompletableFuture submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -951,179 +867,6 @@ public class CommitLog { return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } - - public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { - // Synchronization flush - if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { - final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; - if (messageExt.isWaitStoreMsgOK()) { - GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); - service.putRequest(request); - CompletableFuture flushOkFuture = request.future(); - PutMessageStatus flushStatus = null; - try { - flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), - TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - //flushOK=false; - } - if (flushStatus != PutMessageStatus.PUT_OK) { - log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() - + " client address: " + messageExt.getBornHostString()); - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); - } - } else { - service.wakeup(); - } - } - // Asynchronous flush - else { - if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { - flushCommitLogService.wakeup(); - } else { - commitLogService.wakeup(); - } - } - } - - public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { - if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { - HAService service = this.defaultMessageStore.getHaService(); - if (messageExt.isWaitStoreMsgOK()) { - // Determine whether to wait - if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { - GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); - service.putRequest(request); - service.getWaitNotifyObject().wakeupAll(); - PutMessageStatus replicaStatus = null; - try { - replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), - TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - } - if (replicaStatus != PutMessageStatus.PUT_OK) { - log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " - + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); - } - } - // Slave problem - else { - // Tell the producer, slave not available - putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); - } - } - } - - } - - public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { - messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); - AppendMessageResult result; - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); - - if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - if (messageExtBatch.getDelayTimeLevel() > 0) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - - InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); - if (bornSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setBornHostV6Flag(); - } - - InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); - if (storeSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setStoreHostAddressV6Flag(); - } - - long elapsedTimeInLock = 0; - MappedFile unlockMappedFile = null; - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - - //fine-grained lock instead of the coarse-grained - MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); - - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); - - putMessageLock.lock(); - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; - - // Here settings are stored timestamp, in order to ensure an orderly - // global - messageExtBatch.setStoreTimestamp(beginLockTimestamp); - - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); - } - - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); - switch (result.getStatus()) { - case PUT_OK: - break; - case END_OF_FILE: - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); - } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); - case UNKNOWN_ERROR: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - default: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); - } - - if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { - this.defaultMessageStore.unlockMappedFile(unlockMappedFile); - } - - PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); - - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); - storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); - - handleDiskFlush(result, putMessageResult, messageExtBatch); - - handleHA(result, putMessageResult, messageExtBatch); - - return putMessageResult; - } - /** * According to receive certain message or offset storage time if an error occurs, it returns -1 */ @@ -1509,50 +1252,33 @@ public class CommitLog { private final ByteBuffer msgStoreItemMemory; // The maximum length of the message private final int maxMessageSize; - // Build Message Key - private final StringBuilder keyBuilder = new StringBuilder(); - - private final StringBuilder msgIdBuilder = new StringBuilder(); DefaultAppendMessageCallback(final int size) { this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8); this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8); - this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); + this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); this.maxMessageSize = size; } - public ByteBuffer getMsgStoreItemMemory() { - return msgStoreItemMemory; - } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBrokerInner msgInner) { + final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
// PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); - int sysflag = msgInner.getSysFlag(); - - int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; - int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; - ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); - ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); - - this.resetByteBuffer(storeHostHolder, storeHostLength); - String msgId; - if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { - msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); - } else { - msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); - } + Supplier msgIdSupplier = () -> { + int sysflag = msgInner.getSysFlag(); + int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); + MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); + msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer + msgIdBuffer.putLong(msgIdLen - 8, wroteOffset); + return UtilAll.bytes2string(msgIdBuffer.array()); + }; // Record ConsumeQueue information - keyBuilder.setLength(0); - keyBuilder.append(msgInner.getTopic()); - keyBuilder.append('-'); - keyBuilder.append(msgInner.getQueueId()); - String key = keyBuilder.toString(); + String key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; @@ -1574,36 +1300,12 @@ public class CommitLog { break; } - /** - * Serialize message - */ - final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - - final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; - - if (propertiesLength > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long. length={}", propertiesData.length); - return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); - } - - final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); - final int topicLength = topicData.length; - - final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; - - final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); - - // Exceeds the maximum message - if (msgLen > this.maxMessageSize) { - CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength - + ", maxMessageSize: " + this.maxMessageSize); - return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); - } + ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); + final int msgLen = preEncodeBuffer.getInt(0); // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { - this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); + this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE @@ -1611,60 +1313,31 @@ public class CommitLog { // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); - byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); - return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), - queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, + maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */ + msgIdSupplier, msgInner.getStoreTimestamp(), + queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } - // Initialization of storage space - this.resetByteBuffer(msgStoreItemMemory, msgLen); - // 1 TOTALSIZE - this.msgStoreItemMemory.putInt(msgLen); - // 2 MAGICCODE - this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); - // 3 BODYCRC - this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); - // 4 QUEUEID - this.msgStoreItemMemory.putInt(msgInner.getQueueId()); - // 5 FLAG - this.msgStoreItemMemory.putInt(msgInner.getFlag()); + int pos = 4 + 4 + 4 + 4 + 4; // 6 QUEUEOFFSET - this.msgStoreItemMemory.putLong(queueOffset); + preEncodeBuffer.putLong(pos, queueOffset); + pos += 8; // 7 PHYSICALOFFSET - this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); - // 8 SYSFLAG - this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); - // 9 BORNTIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); - // 10 BORNHOST - this.resetByteBuffer(bornHostHolder, bornHostLength); - this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); - // 11 STORETIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); - // 12 STOREHOSTADDRESS - this.resetByteBuffer(storeHostHolder, storeHostLength); - this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); - // 13 RECONSUMETIMES - this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); - // 14 Prepared Transaction Offset - this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); - // 15 BODY - this.msgStoreItemMemory.putInt(bodyLength); - if (bodyLength > 0) - this.msgStoreItemMemory.put(msgInner.getBody()); - // 16 TOPIC - this.msgStoreItemMemory.put((byte) topicLength); - this.msgStoreItemMemory.put(topicData); - // 17 PROPERTIES - this.msgStoreItemMemory.putShort((short) propertiesLength); - if (propertiesLength > 0) - this.msgStoreItemMemory.put(propertiesData); + preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); + int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; + // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP + pos += 8 + 4 + 8 + ipLen; + // refresh store time stamp in lock + preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer - byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); - - AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, + byteBuffer.put(preEncodeBuffer); + msgInner.setEncodedBuff(null); + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { @@ -1683,16 +1356,12 @@ public class CommitLog { } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBatch messageExtBatch) { + final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { byteBuffer.mark(); //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue information - keyBuilder.setLength(0); - keyBuilder.append(messageExtBatch.getTopic()); - keyBuilder.append('-'); - keyBuilder.append(messageExtBatch.getQueueId()); - String key = keyBuilder.toString(); + String key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; @@ -1701,17 +1370,35 @@ public class CommitLog { long beginQueueOffset = queueOffset; int totalMsgLen = 0; int msgNum = 0; - msgIdBuilder.setLength(0); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff(); int sysFlag = messageExtBatch.getSysFlag(); + int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; - ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); + Supplier msgIdSupplier = () -> { + int msgIdLen = storeHostLength + 8; + int batchCount = putMessageContext.getBatchSize(); + long[] phyPosArray = putMessageContext.getPhyPos(); + ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); + MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer); + msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer + + StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1); + for (int i = 0; i < phyPosArray.length; i++) { + msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]); + String msgId = UtilAll.bytes2string(msgIdBuffer.array()); + if (i != 0) { + buffer.append(','); + } + buffer.append(msgId); + } + return buffer.toString(); + }; - this.resetByteBuffer(storeHostHolder, storeHostLength); - ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(storeHostHolder); messagesByteBuff.mark(); + int index = 0; while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE final int msgPos = messagesByteBuff.position(); @@ -1726,7 +1413,7 @@ public class CommitLog { totalMsgLen += msgLen; // Determines whether there is sufficient free space if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { - this.resetByteBuffer(this.msgStoreItemMemory, 8); + this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE @@ -1737,27 +1424,20 @@ public class CommitLog { // Here the length of the specially set maxBlank byteBuffer.reset(); //ignore the previous appended messages byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); - return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } //move to add queue offset and commitlog offset - messagesByteBuff.position(msgPos + 20); - messagesByteBuff.putLong(queueOffset); - messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen); - - storeHostBytes.rewind(); - String msgId; - if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { - msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen); - } else { - msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, storeHostBytes, wroteOffset + totalMsgLen - msgLen); - } - - if (msgIdBuilder.length() > 0) { - msgIdBuilder.append(',').append(msgId); - } else { - msgIdBuilder.append(msgId); - } + int pos = msgPos + 20; + messagesByteBuff.putLong(pos, queueOffset); + pos += 8; + messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); + // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP + pos += 8 + 4 + 8 + bornHostLength; + // refresh store time stamp in lock + messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); + + putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; queueOffset++; msgNum++; messagesByteBuff.position(msgPos + msgLen); @@ -1767,7 +1447,7 @@ public class CommitLog { messagesByteBuff.limit(totalMsgLen); byteBuffer.put(messagesByteBuff); messageExtBatch.setEncodedBuff(null); - AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(), + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); result.setMsgNum(msgNum); CommitLog.this.topicQueueTable.put(key, queueOffset); @@ -1782,19 +1462,104 @@ public class CommitLog { } - public static class MessageExtBatchEncoder { + public static class MessageExtEncoder { // Store the message content - private final ByteBuffer msgBatchMemory; + private final ByteBuffer encoderBuffer; // The maximum length of the message private final int maxMessageSize; - MessageExtBatchEncoder(final int size) { - this.msgBatchMemory = ByteBuffer.allocateDirect(size); + MessageExtEncoder(final int size) { + this.encoderBuffer = ByteBuffer.allocateDirect(size); this.maxMessageSize = size; } - public ByteBuffer encode(final MessageExtBatch messageExtBatch) { - msgBatchMemory.clear(); //not thread-safe + private void socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + InetAddress address = inetSocketAddress.getAddress(); + if (address instanceof Inet4Address) { + byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); + } else { + byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16); + } + byteBuffer.putInt(inetSocketAddress.getPort()); + } + + protected PutMessageResult encode(MessageExtBrokerInner msgInner) { + /** + * Serialize message + */ + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + } + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageSize); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + // Initialization of storage space + this.resetByteBuffer(encoderBuffer, msgLen); + // 1 TOTALSIZE + this.encoderBuffer.putInt(msgLen); + // 2 MAGICCODE + this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.encoderBuffer.putInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.encoderBuffer.putInt(msgInner.getQueueId()); + // 5 FLAG + this.encoderBuffer.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET, need update later + this.encoderBuffer.putLong(0); + // 7 PHYSICALOFFSET, need update later + this.encoderBuffer.putLong(0); + // 8 SYSFLAG + this.encoderBuffer.putInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.encoderBuffer.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer); + // 11 STORETIMESTAMP + this.encoderBuffer.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer); + // 13 RECONSUMETIMES + this.encoderBuffer.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.encoderBuffer.putInt(bodyLength); + if (bodyLength > 0) + this.encoderBuffer.put(msgInner.getBody()); + // 16 TOPIC + this.encoderBuffer.put((byte) topicLength); + this.encoderBuffer.put(topicData); + // 17 PROPERTIES + this.encoderBuffer.putShort((short) propertiesLength); + if (propertiesLength > 0) + this.encoderBuffer.put(propertiesData); + + encoderBuffer.flip(); + return null; + } + + protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { + encoderBuffer.clear(); //not thread-safe int totalMsgLen = 0; ByteBuffer messagesByteBuff = messageExtBatch.wrap(); @@ -1809,7 +1574,9 @@ public class CommitLog { final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8); final short batchPropLen = (short) batchPropData.length; + int batchSize = 0; while (messagesByteBuff.hasRemaining()) { + batchSize++; // 1 TOTALSIZE messagesByteBuff.getInt(); // 2 MAGICCODE @@ -1849,53 +1616,55 @@ public class CommitLog { } // 1 TOTALSIZE - this.msgBatchMemory.putInt(msgLen); + this.encoderBuffer.putInt(msgLen); // 2 MAGICCODE - this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC - this.msgBatchMemory.putInt(bodyCrc); + this.encoderBuffer.putInt(bodyCrc); // 4 QUEUEID - this.msgBatchMemory.putInt(messageExtBatch.getQueueId()); + this.encoderBuffer.putInt(messageExtBatch.getQueueId()); // 5 FLAG - this.msgBatchMemory.putInt(flag); + this.encoderBuffer.putInt(flag); // 6 QUEUEOFFSET - this.msgBatchMemory.putLong(0); + this.encoderBuffer.putLong(0); // 7 PHYSICALOFFSET - this.msgBatchMemory.putLong(0); + this.encoderBuffer.putLong(0); // 8 SYSFLAG - this.msgBatchMemory.putInt(messageExtBatch.getSysFlag()); + this.encoderBuffer.putInt(messageExtBatch.getSysFlag()); // 9 BORNTIMESTAMP - this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp()); + this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); - this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder)); + this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP - this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp()); + this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); - this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); + this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES - this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes()); + this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes()); // 14 Prepared Transaction Offset, batch does not support transaction - this.msgBatchMemory.putLong(0); + this.encoderBuffer.putLong(0); // 15 BODY - this.msgBatchMemory.putInt(bodyLen); + this.encoderBuffer.putInt(bodyLen); if (bodyLen > 0) - this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); + this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen); // 16 TOPIC - this.msgBatchMemory.put((byte) topicLength); - this.msgBatchMemory.put(topicData); + this.encoderBuffer.put((byte) topicLength); + this.encoderBuffer.put(topicData); // 17 PROPERTIES - this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen)); + this.encoderBuffer.putShort((short) (propertiesLen + batchPropLen)); if (propertiesLen > 0) { - this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); + this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen); } if (batchPropLen > 0) { - this.msgBatchMemory.put(batchPropData, 0, batchPropLen); + this.encoderBuffer.put(batchPropData, 0, batchPropLen); } } - msgBatchMemory.flip(); - return msgBatchMemory; + putMessageContext.setBatchSize(batchSize); + putMessageContext.setPhyPos(new long[batchSize]); + encoderBuffer.flip(); + return encoderBuffer; } private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { @@ -1904,4 +1673,51 @@ public class CommitLog { } } + + static class PutMessageThreadLocal { + private MessageExtEncoder encoder; + private StringBuilder keyBuilder; + PutMessageThreadLocal(int size) { + encoder = new MessageExtEncoder(size); + keyBuilder = new StringBuilder(); + } + + public MessageExtEncoder getEncoder() { + return encoder; + } + + public StringBuilder getKeyBuilder() { + return keyBuilder; + } + } + + static class PutMessageContext { + private String topicQueueTableKey; + private long[] phyPos; + private int batchSize; + + public PutMessageContext(String topicQueueTableKey) { + this.topicQueueTableKey = topicQueueTableKey; + } + + public String getTopicQueueTableKey() { + return topicQueueTableKey; + } + + public long[] getPhyPos() { + return phyPos; + } + + public void setPhyPos(long[] phyPos) { + this.phyPos = phyPos; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7dd5a32b2fab0d873736479ddde8f9c402e3ec0c..69019c15490f0ad0b2ea6aa5a749899ef524ea70 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore { @Override public PutMessageResult putMessage(MessageExtBrokerInner msg) { - PutMessageStatus checkStoreStatus = this.checkStoreStatus(); - if (checkStoreStatus != PutMessageStatus.PUT_OK) { - return new PutMessageResult(checkStoreStatus, null); - } - - PutMessageStatus msgCheckStatus = this.checkMessage(msg); - if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { - return new PutMessageResult(msgCheckStatus, null); - } - - long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessage(msg); - long elapsedTime = this.getSystemClock().now() - beginTime; - if (elapsedTime > 500) { - log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); - } - - this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + try { + return asyncPutMessage(msg).get(); + } catch (InterruptedException | ExecutionException e) { + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); } - - return result; } @Override public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { - PutMessageStatus checkStoreStatus = this.checkStoreStatus(); - if (checkStoreStatus != PutMessageStatus.PUT_OK) { - return new PutMessageResult(checkStoreStatus, null); - } - - PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); - if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { - return new PutMessageResult(msgCheckStatus, null); - } - - long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessages(messageExtBatch); - long elapsedTime = this.getSystemClock().now() - beginTime; - if (elapsedTime > 500) { - log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); - } - - this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + try { + return asyncPutMessages(messageExtBatch).get(); + } catch (InterruptedException | ExecutionException e) { + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); } - - return result; } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 25f0e393144bdd978a72afbf07c6385fda62a4a0..297271d38bb124bfb02b8abaa2943738723a556a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLog.PutMessageContext; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.util.LibC; import sun.nio.ch.DirectBuffer; @@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource { return fileChannel; } - public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { - return appendMessagesInner(msg, cb); + public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { + return appendMessagesInner(msg, cb, putMessageContext); } - public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) { - return appendMessagesInner(messageExtBatch, cb); + public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { + return appendMessagesInner(messageExtBatch, cb, putMessageContext); } - public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { + public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { assert messageExt != null; assert cb != null; @@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource { byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + (MessageExtBrokerInner) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBatch) { - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + (MessageExtBatch) messageExt, putMessageContext); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java index e5f087b697d36eb59bcd83cbeb8b1cff3db8b49c..df7e6e586bbbc15d7cbf53607f0ff51da264cdc2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.store; +import java.nio.ByteBuffer; + import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.message.MessageExt; @@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt { private String propertiesString; private long tagsCode; + private ByteBuffer encodedBuff; + + public ByteBuffer getEncodedBuff() { + return encodedBuff; + } + + public void setEncodedBuff(ByteBuffer encodedBuff) { + this.encodedBuff = encodedBuff; + } + public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index ea791bd9ff66dcc3bf32f4b0c4f6e65aec3c12f9..011cbe169cf7da33c095f5d4b977dc8abffdc1e9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -37,7 +37,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -413,237 +412,6 @@ public class DLedgerCommitLog extends CommitLog { } } - @Override - public PutMessageResult putMessage(final MessageExtBrokerInner msg) { - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - String topic = msg.getTopic(); - setMessageInfo(msg,tranType); - - // Back to Results - AppendMessageResult appendResult; - AppendFuture dledgerFuture; - EncodeResult encodeResult; - - encodeResult = this.messageSerializer.serialize(msg); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); - } - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - long elapsedTimeInLock; - long queueOffset; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); - AppendEntryRequest request = new AppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.getData()); - dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (dledgerFuture.getPos() == -1) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); - } - long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; - - int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - switch (tranType) { - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - break; - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - // The next update ConsumeQueue information - DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); - break; - default: - break; - } - } catch (Exception e) { - log.error("Put message error", e); - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); - } - - PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; - try { - AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); - switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { - case SUCCESS: - putMessageStatus = PutMessageStatus.PUT_OK; - break; - case INCONSISTENT_LEADER: - case NOT_LEADER: - case LEADER_NOT_READY: - case DISK_FULL: - putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; - break; - case WAIT_QUORUM_ACK_TIMEOUT: - //Do not return flush_slave_timeout to the client, for the ons client will ignore it. - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - case LEADER_PENDING_FULL: - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - } - } catch (Throwable t) { - log.error("Failed to get dledger append result", t); - } - - PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); - if (putMessageStatus == PutMessageStatus.PUT_OK) { - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes()); - } - return putMessageResult; - } - - @Override - public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { - final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); - - if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - if (messageExtBatch.getDelayTimeLevel() > 0) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - - // Set the storage time - messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); - if (bornSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setBornHostV6Flag(); - } - - InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); - if (storeSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setStoreHostAddressV6Flag(); - } - - // Back to Results - AppendMessageResult appendResult; - BatchAppendFuture dledgerFuture; - EncodeResult encodeResult; - - encodeResult = this.messageSerializer.serialize(messageExtBatch); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult - .status)); - } - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - msgIdBuilder.setLength(0); - long elapsedTimeInLock; - long queueOffset; - int msgNum = 0; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, true); - BatchAppendEntryRequest request = new BatchAppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBatchMsgs(encodeResult.batchData); - AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (appendFuture.getPos() == -1) { - log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); - } - dledgerFuture = (BatchAppendFuture) appendFuture; - - long wroteOffset = 0; - - int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - boolean isFirstOffset = true; - long firstWroteOffset = 0; - for (long pos : dledgerFuture.getPositions()) { - wroteOffset = pos + DLedgerEntry.BODY_OFFSET; - if (isFirstOffset) { - firstWroteOffset = wroteOffset; - isFirstOffset = false; - } - String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); - if (msgIdBuilder.length() > 0) { - msgIdBuilder.append(',').append(msgId); - } else { - msgIdBuilder.append(msgId); - } - msgNum++; - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, - msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - appendResult.setMsgNum(msgNum); - DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum); - } catch (Exception e) { - log.error("Put message error", e); - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus - .UNKNOWN_ERROR)); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", - elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); - } - - PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; - try { - AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); - switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { - case SUCCESS: - putMessageStatus = PutMessageStatus.PUT_OK; - break; - case INCONSISTENT_LEADER: - case NOT_LEADER: - case LEADER_NOT_READY: - case DISK_FULL: - putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; - break; - case WAIT_QUORUM_ACK_TIMEOUT: - //Do not return flush_slave_timeout to the client, for the ons client will ignore it. - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - case LEADER_PENDING_FULL: - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - } - } catch (Throwable t) { - log.error("Failed to get dledger append result", t); - } - - PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); - if (putMessageStatus == PutMessageStatus.PUT_OK) { - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum); - storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen); - } - return putMessageResult; - } - @Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index f46b3befed63fc25f9c0901b60f97c54a68616f8..715c9d334aa5427b1b31601a78bafb6c8eafd953 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLog.MessageExtEncoder; +import org.apache.rocketmq.store.CommitLog.PutMessageContext; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Before; @@ -42,7 +44,7 @@ public class AppendCallbackTest { AppendMessageCallback callback; - CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024); + MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024); @Before public void init() throws Exception { @@ -84,10 +86,12 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); //encounter end of file when append half of the data - AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); + AppendMessageResult result = + callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); assertEquals(0, result.getWroteOffset()); assertEquals(0, result.getLogicsOffset()); @@ -121,10 +125,12 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); //encounter end of file when append half of the data - AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); + AppendMessageResult result = + callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); assertEquals(0, result.getWroteOffset()); assertEquals(0, result.getLogicsOffset()); @@ -154,9 +160,11 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); - AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); assertEquals(0, allresult.getWroteOffset()); @@ -214,9 +222,11 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); - AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); assertEquals(0, allresult.getWroteOffset());