diff --git a/.travis.yml b/.travis.yml index df6a7356bffa1e505dd70b950bd6af0954d1d5ff..4a70bccff63ac8d0098fe04a5ad95737f7bdb38b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,8 +16,8 @@ matrix: # On Linux we install latest OpenJDK 1.8 from Ubuntu repositories - name: Linux x86_64 arch: amd64 - - name: Linux aarch64 - arch: arm64 +# - name: Linux aarch64 +# arch: arm64 cache: directories: diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index ced7c2014f4a5d24b5832971142e686ebc404df0..29ffc4bb76b94c2e7cd1f93d960cee667ed8d94c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -257,8 +257,8 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc try { final SendMessageRequestHeader requestHeader = parseRequestHeader(request); - String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()); if (null != requestHeader) { + String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getTopic()); context.setNamespace(namespace); context.setProducerGroup(requestHeader.getProducerGroup()); context.setTopic(requestHeader.getTopic()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 97ca51d966d38b18bfdfb430c707a093b2e41a42..c481d14d22662e32165912dd6e2e0145d236d0d6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1445,7 +1445,7 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements java.io.File commitLogDir = new java.io.File(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); if (commitLogDir.exists()) { - runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getFreeSpace(), false))); + runtimeInfo.put("commitLogDirCapacity", String.format("Total : %s, Free : %s.", MixAll.humanReadableByteCount(commitLogDir.getTotalSpace(), false), MixAll.humanReadableByteCount(commitLogDir.getUsableSpace(), false))); } return runtimeInfo; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index ccec5069c75028fafb3abb51b45e1241e87136d0..8b3caa7e126a85c626e2ca3b46d9f9f283ae6b24 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -612,6 +612,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } } + @Override public SocketAddress getStoreHost() { return storeHost; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 976dcba17de6bde92e827ae19be61c1e7f78d637..d30534ff8498847b28a1face04035ee2a8bdda02 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -882,24 +882,6 @@ public class MQClientInstance { this.unregisterClient(null, group); } - private void unregisterClientWithLock(final String producerGroup, final String consumerGroup) { - try { - if (this.lockHeartbeat.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - this.unregisterClient(producerGroup, consumerGroup); - } catch (Exception e) { - log.error("unregisterClient exception", e); - } finally { - this.lockHeartbeat.unlock(); - } - } else { - log.warn("lock heartBeat, but failed. [{}]", this.clientId); - } - } catch (InterruptedException e) { - log.warn("unregisterClientWithLock exception", e); - } - } - private void unregisterClient(final String producerGroup, final String consumerGroup) { Iterator>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { @@ -927,7 +909,7 @@ public class MQClientInstance { } } - public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { + public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) { if (null == group || null == producer) { return false; } @@ -941,9 +923,9 @@ public class MQClientInstance { return true; } - public void unregisterProducer(final String group) { + public synchronized void unregisterProducer(final String group) { this.producerTable.remove(group); - this.unregisterClientWithLock(group, null); + this.unregisterClient(group, null); } public boolean registerAdminExt(final String group, final MQAdminExtInner admin) { diff --git a/distribution/bin/runbroker.cmd b/distribution/bin/runbroker.cmd index 753a11e9a48eb592301ed599509233624e2c7ba8..6cbf1c031ae666d0f981764f42604c3cc7ebe775 100644 --- a/distribution/bin/runbroker.cmd +++ b/distribution/bin/runbroker.cmd @@ -28,7 +28,7 @@ set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH% rem =========================================================================================== rem JVM Configuration rem =========================================================================================== -set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g" +set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g" set "JAVA_OPT=%JAVA_OPT% -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8" set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:%USERPROFILE%\mq_gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" set "JAVA_OPT=%JAVA_OPT% -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" diff --git a/distribution/bin/runbroker.sh b/distribution/bin/runbroker.sh index 1d1000ee4ce43e3da8869f7261fbd82515492871..d1a148b36c635c4b2460c3c633e5d67699c381ea 100644 --- a/distribution/bin/runbroker.sh +++ b/distribution/bin/runbroker.sh @@ -64,7 +64,7 @@ choose_gc_log_directory() choose_gc_log_directory -JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" +JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g" JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" diff --git a/distribution/bin/runserver.cmd b/distribution/bin/runserver.cmd index 76865f7a160fc6d720019dbad916438fcd993660..ddd9b5fc70adef2917e4989340ed9cc0d6ec6f64 100644 --- a/distribution/bin/runserver.cmd +++ b/distribution/bin/runserver.cmd @@ -28,7 +28,7 @@ set CLASSPATH=.;%BASE_DIR%conf;%CLASSPATH% set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" set "JAVA_OPT=%JAVA_OPT% -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" -set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails" +set "JAVA_OPT=%JAVA_OPT% -verbose:gc -Xloggc:"%USERPROFILE%\rmq_srv_gc.log" -XX:+PrintGCDetails -XX:+PrintGCDateStamps" set "JAVA_OPT=%JAVA_OPT% -XX:-OmitStackTraceInFastThrow" set "JAVA_OPT=%JAVA_OPT% -XX:-UseLargePages" set "JAVA_OPT=%JAVA_OPT% -Djava.ext.dirs=%BASE_DIR%lib;%JAVA_HOME%\jre\lib\ext" diff --git a/distribution/bin/runserver.sh b/distribution/bin/runserver.sh index 59f322085dfc89e155c479b2cb6b0e2e671d9a4a..cfa577345b0f83c99a543df82e188f4d12c5ff56 100644 --- a/distribution/bin/runserver.sh +++ b/distribution/bin/runserver.sh @@ -68,18 +68,18 @@ choose_gc_options() # '1' means releases befor Java 9 JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p') if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then + JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" - JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails" + JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps" JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" else + JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log:time,tags:filecount=5,filesize=30M" fi } choose_gc_log_directory - -JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" choose_gc_options JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java index 5a09fd6138801de2fe6a1544d3f9f689b6881250..d4c3749a0f3ea833c39970c49be2d2e3e67d2b9c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettySystemConfig.java @@ -37,7 +37,6 @@ public class NettySystemConfig { public static final String COM_ROCKETMQ_REMOTING_CLIENT_CLOSE_SOCKET_IF_TIMEOUT = "com.rocketmq.remoting.client.closeSocketIfTimeout"; - public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = // Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false")); public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = // diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index d33763847f94e779218e3346af24ce116bb830c1..5499c90066007db210b55d098acc35bb0c7972ed 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store; import java.nio.ByteBuffer; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLog.PutMessageContext; /** * Write messages callback interface @@ -30,7 +31,7 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBrokerInner msg); + final int maxBlank, final MessageExtBrokerInner msg, PutMessageContext putMessageContext); /** * After batched message serialization, write MapedByteBuffer @@ -39,5 +40,5 @@ public interface AppendMessageCallback { * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, - final int maxBlank, final MessageExtBatch messageExtBatch); + final int maxBlank, final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext); } diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index d6d1aa6a31c829cb26285feafa1aee8486320015..de3c03b307f9e2ec8a32512c78cd9b93dd51db16 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.store; +import java.util.function.Supplier; + /** * When write a message to the commit log, returns results */ @@ -28,6 +30,7 @@ public class AppendMessageResult { private int wroteBytes; // Message ID private String msgId; + private Supplier msgIdSupplier; // Message storage timestamp private long storeTimestamp; // Consume queue's offset(step by one) @@ -51,6 +54,17 @@ public class AppendMessageResult { this.pagecacheRT = pagecacheRT; } + public AppendMessageResult(AppendMessageStatus status, long wroteOffset, int wroteBytes, Supplier msgIdSupplier, + long storeTimestamp, long logicsOffset, long pagecacheRT) { + this.status = status; + this.wroteOffset = wroteOffset; + this.wroteBytes = wroteBytes; + this.msgIdSupplier = msgIdSupplier; + this.storeTimestamp = storeTimestamp; + this.logicsOffset = logicsOffset; + this.pagecacheRT = pagecacheRT; + } + public long getPagecacheRT() { return pagecacheRT; } @@ -88,6 +102,9 @@ public class AppendMessageResult { } public String getMsgId() { + if (msgId == null && msgIdSupplier != null) { + msgId = msgIdSupplier.get(); + } return msgId; } diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 57fa3637856380a2bb03767d165ad811880bea68..5e92654aceae912bc9a013f876a7400d0119bae9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -16,17 +16,18 @@ */ package org.apache.rocketmq.store; +import java.net.Inet4Address; import java.net.Inet6Address; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.UtilAll; @@ -62,7 +63,7 @@ public class CommitLog { private final FlushCommitLogService commitLogService; private final AppendMessageCallback appendMessageCallback; - private final ThreadLocal batchEncoderThreadLocal; + private final ThreadLocal putMessageThreadLocal; protected HashMap topicQueueTable = new HashMap(1024); protected volatile long confirmOffset = -1L; @@ -84,10 +85,10 @@ public class CommitLog { this.commitLogService = new CommitRealTimeService(); this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); - batchEncoderThreadLocal = new ThreadLocal() { + putMessageThreadLocal = new ThreadLocal() { @Override - protected MessageExtBatchEncoder initialValue() { - return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); + protected PutMessageThreadLocal initialValue() { + return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize()); } }; this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock(); @@ -555,6 +556,14 @@ public class CommitLog { return beginTimeInLock; } + private String generateKey(StringBuilder keyBuilder, MessageExt messageExt) { + keyBuilder.setLength(0); + keyBuilder.append(messageExt.getTopic()); + keyBuilder.append('-'); + keyBuilder.append(messageExt.getQueueId()); + return keyBuilder.toString(); + } + public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); @@ -591,12 +600,30 @@ public class CommitLog { } } + InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + msg.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + msg.setStoreHostAddressV6Flag(); + } + + PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get(); + PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg); + if (encodeResult != null) { + return CompletableFuture.completedFuture(encodeResult); + } + msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer); + PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg)); + long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; @@ -613,7 +640,7 @@ public class CommitLog { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } - result = mappedFile.appendMessage(msg, this.appendMessageCallback); + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break; @@ -627,7 +654,7 @@ public class CommitLog { beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } - result = mappedFile.appendMessage(msg, this.appendMessageCallback); + result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: @@ -693,14 +720,26 @@ public class CommitLog { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null)); } + InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); + if (bornSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setBornHostV6Flag(); + } + + InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); + if (storeSocketAddress.getAddress() instanceof Inet6Address) { + messageExtBatch.setStoreHostAddressV6Flag(); + } + long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //fine-grained lock instead of the coarse-grained - MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); + PutMessageThreadLocal pmThreadLocal = this.putMessageThreadLocal.get(); + MessageExtEncoder batchEncoder = pmThreadLocal.getEncoder(); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(generateKey(pmThreadLocal.getKeyBuilder(), messageExtBatch)); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); putMessageLock.lock(); try { @@ -720,7 +759,7 @@ public class CommitLog { return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null)); } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); switch (result.getStatus()) { case PUT_OK: break; @@ -734,7 +773,7 @@ public class CommitLog { beginTimeInLock = 0; return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result)); } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); + result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: @@ -784,129 +823,6 @@ public class CommitLog { } - public PutMessageResult putMessage(final MessageExtBrokerInner msg) { - // Set the storage time - msg.setStoreTimestamp(System.currentTimeMillis()); - // Set the message body BODY CRC (consider the most appropriate setting - // on the client) - msg.setBodyCRC(UtilAll.crc32(msg.getBody())); - // Back to Results - AppendMessageResult result = null; - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - String topic = msg.getTopic(); - int queueId = msg.getQueueId(); - - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE - || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { - // Delay Delivery - if (msg.getDelayTimeLevel() > 0) { - if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { - msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); - } - - topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; - queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); - - // Backup real topic, queueId - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); - MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); - msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); - - msg.setTopic(topic); - msg.setQueueId(queueId); - } - } - - InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost(); - if (bornSocketAddress.getAddress() instanceof Inet6Address) { - msg.setBornHostV6Flag(); - } - - InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost(); - if (storeSocketAddress.getAddress() instanceof Inet6Address) { - msg.setStoreHostAddressV6Flag(); - } - - long elapsedTimeInLock = 0; - - MappedFile unlockMappedFile = null; - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; - - // Here settings are stored timestamp, in order to ensure an orderly - // global - msg.setStoreTimestamp(beginLockTimestamp); - - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); - } - - result = mappedFile.appendMessage(msg, this.appendMessageCallback); - switch (result.getStatus()) { - case PUT_OK: - break; - case END_OF_FILE: - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); - } - result = mappedFile.appendMessage(msg, this.appendMessageCallback); - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); - case UNKNOWN_ERROR: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - default: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); - } - - if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { - this.defaultMessageStore.unlockMappedFile(unlockMappedFile); - } - - PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); - - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); - - handleDiskFlush(result, putMessageResult, msg); - handleHA(result, putMessageResult, msg); - - return putMessageResult; - } - public CompletableFuture submitFlushRequest(AppendMessageResult result, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { @@ -951,179 +867,6 @@ public class CommitLog { return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK); } - - public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { - // Synchronization flush - if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { - final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; - if (messageExt.isWaitStoreMsgOK()) { - GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); - service.putRequest(request); - CompletableFuture flushOkFuture = request.future(); - PutMessageStatus flushStatus = null; - try { - flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), - TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - //flushOK=false; - } - if (flushStatus != PutMessageStatus.PUT_OK) { - log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() - + " client address: " + messageExt.getBornHostString()); - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); - } - } else { - service.wakeup(); - } - } - // Asynchronous flush - else { - if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { - flushCommitLogService.wakeup(); - } else { - commitLogService.wakeup(); - } - } - } - - public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { - if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) { - HAService service = this.defaultMessageStore.getHaService(); - if (messageExt.isWaitStoreMsgOK()) { - // Determine whether to wait - if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) { - GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); - service.putRequest(request); - service.getWaitNotifyObject().wakeupAll(); - PutMessageStatus replicaStatus = null; - try { - replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(), - TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - } - if (replicaStatus != PutMessageStatus.PUT_OK) { - log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: " - + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString()); - putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT); - } - } - // Slave problem - else { - // Tell the producer, slave not available - putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE); - } - } - } - - } - - public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { - messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); - AppendMessageResult result; - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); - - if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - if (messageExtBatch.getDelayTimeLevel() > 0) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - - InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); - if (bornSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setBornHostV6Flag(); - } - - InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); - if (storeSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setStoreHostAddressV6Flag(); - } - - long elapsedTimeInLock = 0; - MappedFile unlockMappedFile = null; - MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); - - //fine-grained lock instead of the coarse-grained - MessageExtBatchEncoder batchEncoder = batchEncoderThreadLocal.get(); - - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); - - putMessageLock.lock(); - try { - long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); - this.beginTimeInLock = beginLockTimestamp; - - // Here settings are stored timestamp, in order to ensure an orderly - // global - messageExtBatch.setStoreTimestamp(beginLockTimestamp); - - if (null == mappedFile || mappedFile.isFull()) { - mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise - } - if (null == mappedFile) { - log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); - } - - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); - switch (result.getStatus()) { - case PUT_OK: - break; - case END_OF_FILE: - unlockMappedFile = mappedFile; - // Create a new file, re-write the message - mappedFile = this.mappedFileQueue.getLastMappedFile(0); - if (null == mappedFile) { - // XXX: warn and notify me - log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString()); - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); - } - result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback); - break; - case MESSAGE_SIZE_EXCEEDED: - case PROPERTIES_SIZE_EXCEEDED: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); - case UNKNOWN_ERROR: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - default: - beginTimeInLock = 0; - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; - beginTimeInLock = 0; - } finally { - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result); - } - - if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { - this.defaultMessageStore.unlockMappedFile(unlockMappedFile); - } - - PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); - - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(result.getMsgNum()); - storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(result.getWroteBytes()); - - handleDiskFlush(result, putMessageResult, messageExtBatch); - - handleHA(result, putMessageResult, messageExtBatch); - - return putMessageResult; - } - /** * According to receive certain message or offset storage time if an error occurs, it returns -1 */ @@ -1509,50 +1252,33 @@ public class CommitLog { private final ByteBuffer msgStoreItemMemory; // The maximum length of the message private final int maxMessageSize; - // Build Message Key - private final StringBuilder keyBuilder = new StringBuilder(); - - private final StringBuilder msgIdBuilder = new StringBuilder(); DefaultAppendMessageCallback(final int size) { this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8); this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8); - this.msgStoreItemMemory = ByteBuffer.allocate(size + END_FILE_MIN_BLANK_LENGTH); + this.msgStoreItemMemory = ByteBuffer.allocate(END_FILE_MIN_BLANK_LENGTH); this.maxMessageSize = size; } - public ByteBuffer getMsgStoreItemMemory() { - return msgStoreItemMemory; - } - public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBrokerInner msgInner) { + final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET
// PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); - int sysflag = msgInner.getSysFlag(); - - int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; - int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; - ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength); - ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); - - this.resetByteBuffer(storeHostHolder, storeHostLength); - String msgId; - if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { - msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); - } else { - msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset); - } + Supplier msgIdSupplier = () -> { + int sysflag = msgInner.getSysFlag(); + int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; + ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); + MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer); + msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer + msgIdBuffer.putLong(msgIdLen - 8, wroteOffset); + return UtilAll.bytes2string(msgIdBuffer.array()); + }; // Record ConsumeQueue information - keyBuilder.setLength(0); - keyBuilder.append(msgInner.getTopic()); - keyBuilder.append('-'); - keyBuilder.append(msgInner.getQueueId()); - String key = keyBuilder.toString(); + String key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; @@ -1574,36 +1300,12 @@ public class CommitLog { break; } - /** - * Serialize message - */ - final byte[] propertiesData = - msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - - final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; - - if (propertiesLength > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long. length={}", propertiesData.length); - return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); - } - - final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); - final int topicLength = topicData.length; - - final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; - - final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); - - // Exceeds the maximum message - if (msgLen > this.maxMessageSize) { - CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength - + ", maxMessageSize: " + this.maxMessageSize); - return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); - } + ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff(); + final int msgLen = preEncodeBuffer.getInt(0); // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { - this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); + this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE @@ -1611,60 +1313,31 @@ public class CommitLog { // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); - byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); - return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), - queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); + byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, + maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */ + msgIdSupplier, msgInner.getStoreTimestamp(), + queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } - // Initialization of storage space - this.resetByteBuffer(msgStoreItemMemory, msgLen); - // 1 TOTALSIZE - this.msgStoreItemMemory.putInt(msgLen); - // 2 MAGICCODE - this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); - // 3 BODYCRC - this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); - // 4 QUEUEID - this.msgStoreItemMemory.putInt(msgInner.getQueueId()); - // 5 FLAG - this.msgStoreItemMemory.putInt(msgInner.getFlag()); + int pos = 4 + 4 + 4 + 4 + 4; // 6 QUEUEOFFSET - this.msgStoreItemMemory.putLong(queueOffset); + preEncodeBuffer.putLong(pos, queueOffset); + pos += 8; // 7 PHYSICALOFFSET - this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); - // 8 SYSFLAG - this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); - // 9 BORNTIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); - // 10 BORNHOST - this.resetByteBuffer(bornHostHolder, bornHostLength); - this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); - // 11 STORETIMESTAMP - this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); - // 12 STOREHOSTADDRESS - this.resetByteBuffer(storeHostHolder, storeHostLength); - this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); - // 13 RECONSUMETIMES - this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); - // 14 Prepared Transaction Offset - this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); - // 15 BODY - this.msgStoreItemMemory.putInt(bodyLength); - if (bodyLength > 0) - this.msgStoreItemMemory.put(msgInner.getBody()); - // 16 TOPIC - this.msgStoreItemMemory.put((byte) topicLength); - this.msgStoreItemMemory.put(topicData); - // 17 PROPERTIES - this.msgStoreItemMemory.putShort((short) propertiesLength); - if (propertiesLength > 0) - this.msgStoreItemMemory.put(propertiesData); + preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position()); + int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; + // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP + pos += 8 + 4 + 8 + ipLen; + // refresh store time stamp in lock + preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp()); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer - byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); - - AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, + byteBuffer.put(preEncodeBuffer); + msgInner.setEncodedBuff(null); + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { @@ -1683,16 +1356,12 @@ public class CommitLog { } public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, - final MessageExtBatch messageExtBatch) { + final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { byteBuffer.mark(); //physical offset long wroteOffset = fileFromOffset + byteBuffer.position(); // Record ConsumeQueue information - keyBuilder.setLength(0); - keyBuilder.append(messageExtBatch.getTopic()); - keyBuilder.append('-'); - keyBuilder.append(messageExtBatch.getQueueId()); - String key = keyBuilder.toString(); + String key = putMessageContext.getTopicQueueTableKey(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; @@ -1701,17 +1370,35 @@ public class CommitLog { long beginQueueOffset = queueOffset; int totalMsgLen = 0; int msgNum = 0; - msgIdBuilder.setLength(0); + final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); ByteBuffer messagesByteBuff = messageExtBatch.getEncodedBuff(); int sysFlag = messageExtBatch.getSysFlag(); + int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; int storeHostLength = (sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4; - ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength); + Supplier msgIdSupplier = () -> { + int msgIdLen = storeHostLength + 8; + int batchCount = putMessageContext.getBatchSize(); + long[] phyPosArray = putMessageContext.getPhyPos(); + ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen); + MessageExt.socketAddress2ByteBuffer(messageExtBatch.getStoreHost(), msgIdBuffer); + msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer + + StringBuilder buffer = new StringBuilder(batchCount * msgIdLen * 2 + batchCount - 1); + for (int i = 0; i < phyPosArray.length; i++) { + msgIdBuffer.putLong(msgIdLen - 8, phyPosArray[i]); + String msgId = UtilAll.bytes2string(msgIdBuffer.array()); + if (i != 0) { + buffer.append(','); + } + buffer.append(msgId); + } + return buffer.toString(); + }; - this.resetByteBuffer(storeHostHolder, storeHostLength); - ByteBuffer storeHostBytes = messageExtBatch.getStoreHostBytes(storeHostHolder); messagesByteBuff.mark(); + int index = 0; while (messagesByteBuff.hasRemaining()) { // 1 TOTALSIZE final int msgPos = messagesByteBuff.position(); @@ -1726,7 +1413,7 @@ public class CommitLog { totalMsgLen += msgLen; // Determines whether there is sufficient free space if ((totalMsgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { - this.resetByteBuffer(this.msgStoreItemMemory, 8); + this.msgStoreItemMemory.clear(); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE @@ -1737,27 +1424,20 @@ public class CommitLog { // Here the length of the specially set maxBlank byteBuffer.reset(); //ignore the previous appended messages byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8); - return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdBuilder.toString(), messageExtBatch.getStoreTimestamp(), + return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } //move to add queue offset and commitlog offset - messagesByteBuff.position(msgPos + 20); - messagesByteBuff.putLong(queueOffset); - messagesByteBuff.putLong(wroteOffset + totalMsgLen - msgLen); - - storeHostBytes.rewind(); - String msgId; - if ((sysFlag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) { - msgId = MessageDecoder.createMessageId(this.msgIdMemory, storeHostBytes, wroteOffset + totalMsgLen - msgLen); - } else { - msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, storeHostBytes, wroteOffset + totalMsgLen - msgLen); - } - - if (msgIdBuilder.length() > 0) { - msgIdBuilder.append(',').append(msgId); - } else { - msgIdBuilder.append(msgId); - } + int pos = msgPos + 20; + messagesByteBuff.putLong(pos, queueOffset); + pos += 8; + messagesByteBuff.putLong(pos, wroteOffset + totalMsgLen - msgLen); + // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP + pos += 8 + 4 + 8 + bornHostLength; + // refresh store time stamp in lock + messagesByteBuff.putLong(pos, messageExtBatch.getStoreTimestamp()); + + putMessageContext.getPhyPos()[index++] = wroteOffset + totalMsgLen - msgLen; queueOffset++; msgNum++; messagesByteBuff.position(msgPos + msgLen); @@ -1767,7 +1447,7 @@ public class CommitLog { messagesByteBuff.limit(totalMsgLen); byteBuffer.put(messagesByteBuff); messageExtBatch.setEncodedBuff(null); - AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdBuilder.toString(), + AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, totalMsgLen, msgIdSupplier, messageExtBatch.getStoreTimestamp(), beginQueueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); result.setMsgNum(msgNum); CommitLog.this.topicQueueTable.put(key, queueOffset); @@ -1782,19 +1462,104 @@ public class CommitLog { } - public static class MessageExtBatchEncoder { + public static class MessageExtEncoder { // Store the message content - private final ByteBuffer msgBatchMemory; + private final ByteBuffer encoderBuffer; // The maximum length of the message private final int maxMessageSize; - MessageExtBatchEncoder(final int size) { - this.msgBatchMemory = ByteBuffer.allocateDirect(size); + MessageExtEncoder(final int size) { + this.encoderBuffer = ByteBuffer.allocateDirect(size); this.maxMessageSize = size; } - public ByteBuffer encode(final MessageExtBatch messageExtBatch) { - msgBatchMemory.clear(); //not thread-safe + private void socketAddress2ByteBuffer(final SocketAddress socketAddress, final ByteBuffer byteBuffer) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress; + InetAddress address = inetSocketAddress.getAddress(); + if (address instanceof Inet4Address) { + byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 4); + } else { + byteBuffer.put(inetSocketAddress.getAddress().getAddress(), 0, 16); + } + byteBuffer.putInt(inetSocketAddress.getPort()); + } + + protected PutMessageResult encode(MessageExtBrokerInner msgInner) { + /** + * Serialize message + */ + final byte[] propertiesData = + msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); + + final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; + + if (propertiesLength > Short.MAX_VALUE) { + log.warn("putMessage message properties length too long. length={}", propertiesData.length); + return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); + } + + final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); + final int topicLength = topicData.length; + + final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; + + final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength); + + // Exceeds the maximum message + if (msgLen > this.maxMessageSize) { + CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + + ", maxMessageSize: " + this.maxMessageSize); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); + } + + // Initialization of storage space + this.resetByteBuffer(encoderBuffer, msgLen); + // 1 TOTALSIZE + this.encoderBuffer.putInt(msgLen); + // 2 MAGICCODE + this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); + // 3 BODYCRC + this.encoderBuffer.putInt(msgInner.getBodyCRC()); + // 4 QUEUEID + this.encoderBuffer.putInt(msgInner.getQueueId()); + // 5 FLAG + this.encoderBuffer.putInt(msgInner.getFlag()); + // 6 QUEUEOFFSET, need update later + this.encoderBuffer.putLong(0); + // 7 PHYSICALOFFSET, need update later + this.encoderBuffer.putLong(0); + // 8 SYSFLAG + this.encoderBuffer.putInt(msgInner.getSysFlag()); + // 9 BORNTIMESTAMP + this.encoderBuffer.putLong(msgInner.getBornTimestamp()); + // 10 BORNHOST + socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer); + // 11 STORETIMESTAMP + this.encoderBuffer.putLong(msgInner.getStoreTimestamp()); + // 12 STOREHOSTADDRESS + socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer); + // 13 RECONSUMETIMES + this.encoderBuffer.putInt(msgInner.getReconsumeTimes()); + // 14 Prepared Transaction Offset + this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset()); + // 15 BODY + this.encoderBuffer.putInt(bodyLength); + if (bodyLength > 0) + this.encoderBuffer.put(msgInner.getBody()); + // 16 TOPIC + this.encoderBuffer.put((byte) topicLength); + this.encoderBuffer.put(topicData); + // 17 PROPERTIES + this.encoderBuffer.putShort((short) propertiesLength); + if (propertiesLength > 0) + this.encoderBuffer.put(propertiesData); + + encoderBuffer.flip(); + return null; + } + + protected ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) { + encoderBuffer.clear(); //not thread-safe int totalMsgLen = 0; ByteBuffer messagesByteBuff = messageExtBatch.wrap(); @@ -1809,7 +1574,9 @@ public class CommitLog { final byte[] batchPropData = batchPropStr.getBytes(MessageDecoder.CHARSET_UTF8); final short batchPropLen = (short) batchPropData.length; + int batchSize = 0; while (messagesByteBuff.hasRemaining()) { + batchSize++; // 1 TOTALSIZE messagesByteBuff.getInt(); // 2 MAGICCODE @@ -1849,53 +1616,55 @@ public class CommitLog { } // 1 TOTALSIZE - this.msgBatchMemory.putInt(msgLen); + this.encoderBuffer.putInt(msgLen); // 2 MAGICCODE - this.msgBatchMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); + this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC - this.msgBatchMemory.putInt(bodyCrc); + this.encoderBuffer.putInt(bodyCrc); // 4 QUEUEID - this.msgBatchMemory.putInt(messageExtBatch.getQueueId()); + this.encoderBuffer.putInt(messageExtBatch.getQueueId()); // 5 FLAG - this.msgBatchMemory.putInt(flag); + this.encoderBuffer.putInt(flag); // 6 QUEUEOFFSET - this.msgBatchMemory.putLong(0); + this.encoderBuffer.putLong(0); // 7 PHYSICALOFFSET - this.msgBatchMemory.putLong(0); + this.encoderBuffer.putLong(0); // 8 SYSFLAG - this.msgBatchMemory.putInt(messageExtBatch.getSysFlag()); + this.encoderBuffer.putInt(messageExtBatch.getSysFlag()); // 9 BORNTIMESTAMP - this.msgBatchMemory.putLong(messageExtBatch.getBornTimestamp()); + this.encoderBuffer.putLong(messageExtBatch.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); - this.msgBatchMemory.put(messageExtBatch.getBornHostBytes(bornHostHolder)); + this.encoderBuffer.put(messageExtBatch.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP - this.msgBatchMemory.putLong(messageExtBatch.getStoreTimestamp()); + this.encoderBuffer.putLong(messageExtBatch.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); - this.msgBatchMemory.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); + this.encoderBuffer.put(messageExtBatch.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES - this.msgBatchMemory.putInt(messageExtBatch.getReconsumeTimes()); + this.encoderBuffer.putInt(messageExtBatch.getReconsumeTimes()); // 14 Prepared Transaction Offset, batch does not support transaction - this.msgBatchMemory.putLong(0); + this.encoderBuffer.putLong(0); // 15 BODY - this.msgBatchMemory.putInt(bodyLen); + this.encoderBuffer.putInt(bodyLen); if (bodyLen > 0) - this.msgBatchMemory.put(messagesByteBuff.array(), bodyPos, bodyLen); + this.encoderBuffer.put(messagesByteBuff.array(), bodyPos, bodyLen); // 16 TOPIC - this.msgBatchMemory.put((byte) topicLength); - this.msgBatchMemory.put(topicData); + this.encoderBuffer.put((byte) topicLength); + this.encoderBuffer.put(topicData); // 17 PROPERTIES - this.msgBatchMemory.putShort((short) (propertiesLen + batchPropLen)); + this.encoderBuffer.putShort((short) (propertiesLen + batchPropLen)); if (propertiesLen > 0) { - this.msgBatchMemory.put(messagesByteBuff.array(), propertiesPos, propertiesLen); + this.encoderBuffer.put(messagesByteBuff.array(), propertiesPos, propertiesLen); } if (batchPropLen > 0) { - this.msgBatchMemory.put(batchPropData, 0, batchPropLen); + this.encoderBuffer.put(batchPropData, 0, batchPropLen); } } - msgBatchMemory.flip(); - return msgBatchMemory; + putMessageContext.setBatchSize(batchSize); + putMessageContext.setPhyPos(new long[batchSize]); + encoderBuffer.flip(); + return encoderBuffer; } private void resetByteBuffer(final ByteBuffer byteBuffer, final int limit) { @@ -1904,4 +1673,51 @@ public class CommitLog { } } + + static class PutMessageThreadLocal { + private MessageExtEncoder encoder; + private StringBuilder keyBuilder; + PutMessageThreadLocal(int size) { + encoder = new MessageExtEncoder(size); + keyBuilder = new StringBuilder(); + } + + public MessageExtEncoder getEncoder() { + return encoder; + } + + public StringBuilder getKeyBuilder() { + return keyBuilder; + } + } + + static class PutMessageContext { + private String topicQueueTableKey; + private long[] phyPos; + private int batchSize; + + public PutMessageContext(String topicQueueTableKey) { + this.topicQueueTableKey = topicQueueTableKey; + } + + public String getTopicQueueTableKey() { + return topicQueueTableKey; + } + + public long[] getPhyPos() { + return phyPos; + } + + public void setPhyPos(long[] phyPos) { + this.phyPos = phyPos; + } + + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7dd5a32b2fab0d873736479ddde8f9c402e3ec0c..69019c15490f0ad0b2ea6aa5a749899ef524ea70 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -34,6 +34,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -476,58 +477,20 @@ public class DefaultMessageStore implements MessageStore { @Override public PutMessageResult putMessage(MessageExtBrokerInner msg) { - PutMessageStatus checkStoreStatus = this.checkStoreStatus(); - if (checkStoreStatus != PutMessageStatus.PUT_OK) { - return new PutMessageResult(checkStoreStatus, null); - } - - PutMessageStatus msgCheckStatus = this.checkMessage(msg); - if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { - return new PutMessageResult(msgCheckStatus, null); - } - - long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessage(msg); - long elapsedTime = this.getSystemClock().now() - beginTime; - if (elapsedTime > 500) { - log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); - } - - this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + try { + return asyncPutMessage(msg).get(); + } catch (InterruptedException | ExecutionException e) { + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); } - - return result; } @Override public PutMessageResult putMessages(MessageExtBatch messageExtBatch) { - PutMessageStatus checkStoreStatus = this.checkStoreStatus(); - if (checkStoreStatus != PutMessageStatus.PUT_OK) { - return new PutMessageResult(checkStoreStatus, null); - } - - PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch); - if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) { - return new PutMessageResult(msgCheckStatus, null); - } - - long beginTime = this.getSystemClock().now(); - PutMessageResult result = this.commitLog.putMessages(messageExtBatch); - long elapsedTime = this.getSystemClock().now() - beginTime; - if (elapsedTime > 500) { - log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length); - } - - this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); - - if (null == result || !result.isOk()) { - this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); + try { + return asyncPutMessages(messageExtBatch).get(); + } catch (InterruptedException | ExecutionException e) { + return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null); } - - return result; } @Override diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 25f0e393144bdd978a72afbf07c6385fda62a4a0..297271d38bb124bfb02b8abaa2943738723a556a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -37,6 +37,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLog.PutMessageContext; import org.apache.rocketmq.store.config.FlushDiskType; import org.apache.rocketmq.store.util.LibC; import sun.nio.ch.DirectBuffer; @@ -188,15 +189,18 @@ public class MappedFile extends ReferenceResource { return fileChannel; } - public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) { - return appendMessagesInner(msg, cb); + public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { + return appendMessagesInner(msg, cb, putMessageContext); } - public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb) { - return appendMessagesInner(messageExtBatch, cb); + public AppendMessageResult appendMessages(final MessageExtBatch messageExtBatch, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { + return appendMessagesInner(messageExtBatch, cb, putMessageContext); } - public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { + public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb, + PutMessageContext putMessageContext) { assert messageExt != null; assert cb != null; @@ -207,9 +211,11 @@ public class MappedFile extends ReferenceResource { byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + (MessageExtBrokerInner) messageExt, putMessageContext); } else if (messageExt instanceof MessageExtBatch) { - result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); + result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, + (MessageExtBatch) messageExt, putMessageContext); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java index e5f087b697d36eb59bcd83cbeb8b1cff3db8b49c..df7e6e586bbbc15d7cbf53607f0ff51da264cdc2 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java +++ b/store/src/main/java/org/apache/rocketmq/store/MessageExtBrokerInner.java @@ -16,6 +16,8 @@ */ package org.apache.rocketmq.store; +import java.nio.ByteBuffer; + import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.message.MessageExt; @@ -24,6 +26,16 @@ public class MessageExtBrokerInner extends MessageExt { private String propertiesString; private long tagsCode; + private ByteBuffer encodedBuff; + + public ByteBuffer getEncodedBuff() { + return encodedBuff; + } + + public void setEncodedBuff(ByteBuffer encodedBuff) { + this.encodedBuff = encodedBuff; + } + public static long tagsString2tagsCode(final TopicFilterType filter, final String tags) { if (null == tags || tags.length() == 0) { return 0; } diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java index ea791bd9ff66dcc3bf32f4b0c4f6e65aec3c12f9..011cbe169cf7da33c095f5d4b977dc8abffdc1e9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java @@ -37,7 +37,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; @@ -413,237 +412,6 @@ public class DLedgerCommitLog extends CommitLog { } } - @Override - public PutMessageResult putMessage(final MessageExtBrokerInner msg) { - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); - String topic = msg.getTopic(); - setMessageInfo(msg,tranType); - - // Back to Results - AppendMessageResult appendResult; - AppendFuture dledgerFuture; - EncodeResult encodeResult; - - encodeResult = this.messageSerializer.serialize(msg); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult.status)); - } - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - long elapsedTimeInLock; - long queueOffset; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, false); - AppendEntryRequest request = new AppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBody(encodeResult.getData()); - dledgerFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (dledgerFuture.getPos() == -1) { - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); - } - long wroteOffset = dledgerFuture.getPos() + DLedgerEntry.BODY_OFFSET; - - int msgIdLength = (msg.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - String msgId = MessageDecoder.createMessageId(buffer, msg.getStoreHostBytes(), wroteOffset); - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - switch (tranType) { - case MessageSysFlag.TRANSACTION_PREPARED_TYPE: - case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: - break; - case MessageSysFlag.TRANSACTION_NOT_TYPE: - case MessageSysFlag.TRANSACTION_COMMIT_TYPE: - // The next update ConsumeQueue information - DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + 1); - break; - default: - break; - } - } catch (Exception e) { - log.error("Put message error", e); - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, appendResult); - } - - PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; - try { - AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); - switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { - case SUCCESS: - putMessageStatus = PutMessageStatus.PUT_OK; - break; - case INCONSISTENT_LEADER: - case NOT_LEADER: - case LEADER_NOT_READY: - case DISK_FULL: - putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; - break; - case WAIT_QUORUM_ACK_TIMEOUT: - //Do not return flush_slave_timeout to the client, for the ons client will ignore it. - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - case LEADER_PENDING_FULL: - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - } - } catch (Throwable t) { - log.error("Failed to get dledger append result", t); - } - - PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); - if (putMessageStatus == PutMessageStatus.PUT_OK) { - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); - storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(appendResult.getWroteBytes()); - } - return putMessageResult; - } - - @Override - public PutMessageResult putMessages(final MessageExtBatch messageExtBatch) { - final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag()); - - if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - if (messageExtBatch.getDelayTimeLevel() > 0) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); - } - - // Set the storage time - messageExtBatch.setStoreTimestamp(System.currentTimeMillis()); - - StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); - - InetSocketAddress bornSocketAddress = (InetSocketAddress) messageExtBatch.getBornHost(); - if (bornSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setBornHostV6Flag(); - } - - InetSocketAddress storeSocketAddress = (InetSocketAddress) messageExtBatch.getStoreHost(); - if (storeSocketAddress.getAddress() instanceof Inet6Address) { - messageExtBatch.setStoreHostAddressV6Flag(); - } - - // Back to Results - AppendMessageResult appendResult; - BatchAppendFuture dledgerFuture; - EncodeResult encodeResult; - - encodeResult = this.messageSerializer.serialize(messageExtBatch); - if (encodeResult.status != AppendMessageStatus.PUT_OK) { - return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(encodeResult - .status)); - } - - putMessageLock.lock(); //spin or ReentrantLock ,depending on store config - msgIdBuilder.setLength(0); - long elapsedTimeInLock; - long queueOffset; - int msgNum = 0; - try { - beginTimeInDledgerLock = this.defaultMessageStore.getSystemClock().now(); - queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey, tranType); - encodeResult.setQueueOffsetKey(queueOffset, true); - BatchAppendEntryRequest request = new BatchAppendEntryRequest(); - request.setGroup(dLedgerConfig.getGroup()); - request.setRemoteId(dLedgerServer.getMemberState().getSelfId()); - request.setBatchMsgs(encodeResult.batchData); - AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request); - if (appendFuture.getPos() == -1) { - log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode()); - return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)); - } - dledgerFuture = (BatchAppendFuture) appendFuture; - - long wroteOffset = 0; - - int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8; - ByteBuffer buffer = ByteBuffer.allocate(msgIdLength); - - boolean isFirstOffset = true; - long firstWroteOffset = 0; - for (long pos : dledgerFuture.getPositions()) { - wroteOffset = pos + DLedgerEntry.BODY_OFFSET; - if (isFirstOffset) { - firstWroteOffset = wroteOffset; - isFirstOffset = false; - } - String msgId = MessageDecoder.createMessageId(buffer, messageExtBatch.getStoreHostBytes(), wroteOffset); - if (msgIdBuilder.length() > 0) { - msgIdBuilder.append(',').append(msgId); - } else { - msgIdBuilder.append(msgId); - } - msgNum++; - } - - elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock; - appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK, firstWroteOffset, encodeResult.totalMsgLen, - msgIdBuilder.toString(), System.currentTimeMillis(), queueOffset, elapsedTimeInLock); - appendResult.setMsgNum(msgNum); - DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey, queueOffset + msgNum); - } catch (Exception e) { - log.error("Put message error", e); - return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new AppendMessageResult(AppendMessageStatus - .UNKNOWN_ERROR)); - } finally { - beginTimeInDledgerLock = 0; - putMessageLock.unlock(); - } - - if (elapsedTimeInLock > 500) { - log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", - elapsedTimeInLock, messageExtBatch.getBody().length, appendResult); - } - - PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR; - try { - AppendEntryResponse appendEntryResponse = dledgerFuture.get(3, TimeUnit.SECONDS); - switch (DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) { - case SUCCESS: - putMessageStatus = PutMessageStatus.PUT_OK; - break; - case INCONSISTENT_LEADER: - case NOT_LEADER: - case LEADER_NOT_READY: - case DISK_FULL: - putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE; - break; - case WAIT_QUORUM_ACK_TIMEOUT: - //Do not return flush_slave_timeout to the client, for the ons client will ignore it. - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - case LEADER_PENDING_FULL: - putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY; - break; - } - } catch (Throwable t) { - log.error("Failed to get dledger append result", t); - } - - PutMessageResult putMessageResult = new PutMessageResult(putMessageStatus, appendResult); - if (putMessageStatus == PutMessageStatus.PUT_OK) { - // Statistics - storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).addAndGet(msgNum); - storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).addAndGet(encodeResult.totalMsgLen); - } - return putMessageResult; - } - @Override public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) { diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index f46b3befed63fc25f9c0901b60f97c54a68616f8..715c9d334aa5427b1b31601a78bafb6c8eafd953 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -30,6 +30,8 @@ import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBatch; +import org.apache.rocketmq.store.CommitLog.MessageExtEncoder; +import org.apache.rocketmq.store.CommitLog.PutMessageContext; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Before; @@ -42,7 +44,7 @@ public class AppendCallbackTest { AppendMessageCallback callback; - CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024); + MessageExtEncoder batchEncoder = new MessageExtEncoder(10 * 1024 * 1024); @Before public void init() throws Exception { @@ -84,10 +86,12 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); //encounter end of file when append half of the data - AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); + AppendMessageResult result = + callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); assertEquals(0, result.getWroteOffset()); assertEquals(0, result.getLogicsOffset()); @@ -121,10 +125,12 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); //encounter end of file when append half of the data - AppendMessageResult result = callback.doAppend(0, buff, 1000, messageExtBatch); + AppendMessageResult result = + callback.doAppend(0, buff, 1000, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.END_OF_FILE, result.getStatus()); assertEquals(0, result.getWroteOffset()); assertEquals(0, result.getLogicsOffset()); @@ -154,9 +160,11 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); - AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); assertEquals(0, allresult.getWroteOffset()); @@ -214,9 +222,11 @@ public class AppendCallbackTest { messageExtBatch.setStoreHost(new InetSocketAddress("::1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); - messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); + PutMessageContext putMessageContext = new PutMessageContext(topic + "-" + queue); + messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch, putMessageContext)); ByteBuffer buff = ByteBuffer.allocate(1024 * 10); - AppendMessageResult allresult = callback.doAppend(0, buff, 1024 * 10, messageExtBatch); + AppendMessageResult allresult = + callback.doAppend(0, buff, 1024 * 10, messageExtBatch, putMessageContext); assertEquals(AppendMessageStatus.PUT_OK, allresult.getStatus()); assertEquals(0, allresult.getWroteOffset()); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java index b625cd94c18c89bf048d98c45da926d17905e634..465a2db586496cd3b7080eb339e996aa800f0767 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/acl/UpdateAccessConfigSubCommand.java @@ -164,9 +164,9 @@ public class UpdateAccessConfigSubCommand implements SubCommand { String clusterName = commandLine.getOptionValue('c').trim(); defaultMQAdminExt.start(); - Set masterSet = - CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); - for (String addr : masterSet) { + Set brokerAddrSet = + CommandUtil.fetchMasterAndSlaveAddrByClusterName(defaultMQAdminExt, clusterName); + for (String addr : brokerAddrSet) { defaultMQAdminExt.createAndUpdatePlainAccessConfig(addr, accessConfig); System.out.printf("create or update plain access config to %s success.%n", addr); }