diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
index 078d34b10b02a84837ad6bb0b34dd884833cab9c..a22b0b25a7a1cf5699185c0bb9ca5b1a83e783ea 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionManager.java
@@ -251,12 +251,6 @@ public class PlainPermissionManager {
}
public boolean updateGlobalWhiteAddrsConfig(List globalWhiteAddrsList) {
-
- if (globalWhiteAddrsList == null) {
- log.error("Parameter value globalWhiteAddrsList is null,Please check your parameter");
- return false;
- }
-
Map aclAccessConfigMap = AclUtils.getYamlDataObject(fileHome + File.separator + fileName,
Map.class);
if (aclAccessConfigMap == null || aclAccessConfigMap.isEmpty()) {
@@ -266,9 +260,10 @@ public class PlainPermissionManager {
if (globalWhiteRemoteAddrList != null) {
globalWhiteRemoteAddrList.clear();
- globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
-
- // Update globalWhiteRemoteAddr element in memeory map firstly
+ if (globalWhiteAddrsList != null) {
+ globalWhiteRemoteAddrList.addAll(globalWhiteAddrsList);
+ }
+ // Update globalWhiteRemoteAddr element in memory map firstly
aclAccessConfigMap.put(AclConstants.CONFIG_GLOBAL_WHITE_ADDRS, globalWhiteRemoteAddrList);
if (AclUtils.writeDataObject(fileHome + File.separator + fileName, updateAclConfigFileVersion(aclAccessConfigMap))) {
return true;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
index 968bcfb564f9fe67e48d84371a47089a4572783a..373a0a48b672002401603f03158e0f98224fe03d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransfer.java
@@ -53,6 +53,11 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
return transferred;
}
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
@Override
public long count() {
return byteBufferHeader.limit() + this.getMessageResult.getBufferTotalSize();
@@ -76,6 +81,28 @@ public class ManyMessageTransfer extends AbstractReferenceCounted implements Fil
return 0;
}
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
public void close() {
this.deallocate();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
index b795d2d253bd3ade2d7a102ecdbbe596321d8d15..558f091763e75bd0a3d0fb35b852530756b298d2 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/OneMessageTransfer.java
@@ -47,6 +47,11 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
return transferred;
}
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
@Override
public long count() {
return this.byteBufferHeader.limit() + this.selectMappedBufferResult.getSize();
@@ -65,6 +70,28 @@ public class OneMessageTransfer extends AbstractReferenceCounted implements File
return 0;
}
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
public void close() {
this.deallocate();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
index e8f30996677a72cd2159baec3375ad42ad9a6c1a..db47b9e50e346652052065b06f51412eec25cf1d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pagecache/QueryMessageTransfer.java
@@ -53,6 +53,11 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
return transferred;
}
+ @Override
+ public long transferred() {
+ return transferred;
+ }
+
@Override
public long count() {
return byteBufferHeader.limit() + this.queryMessageResult.getBufferTotalSize();
@@ -76,6 +81,28 @@ public class QueryMessageTransfer extends AbstractReferenceCounted implements Fi
return 0;
}
+ @Override
+ public FileRegion retain() {
+ super.retain();
+ return this;
+ }
+
+ @Override
+ public FileRegion retain(int increment) {
+ super.retain(increment);
+ return this;
+ }
+
+ @Override
+ public FileRegion touch() {
+ return this;
+ }
+
+ @Override
+ public FileRegion touch(Object hint) {
+ return this;
+ }
+
public void close() {
this.deallocate();
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 1db019bec810accdf844597dc2783910f05c1e1c..b95bab62490b51041d2afdc357362d8141db590d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -180,8 +180,8 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
}
@Override
- public boolean appendToCommitLog(long startOffset, byte[] data) {
- return next.appendToCommitLog(startOffset, data);
+ public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
+ return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
}
@Override
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 724cf54c8137c57d1dd8a5d434e20e582674d7a7..ccec5069c75028fafb3abb51b45e1241e87136d0 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -178,7 +178,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
- maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
+ Integer times = requestHeader.getMaxReconsumeTimes();
+ if (times != null) {
+ maxReconsumeTimes = times;
+ }
}
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 3ab214b8db6437f21ab13b0fb615bbaec89409e7..beeeb2f5e5352d1db89b19c540c410c587951b6c 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -178,7 +178,7 @@ public class ClientConfig {
}
public String getNamesrvAddr() {
- if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.validateInstanceEndpoint(namesrvAddr.trim())) {
+ if (StringUtils.isNotEmpty(namesrvAddr) && NameServerAddressUtils.NAMESRV_ENDPOINT_PATTERN.matcher(namesrvAddr.trim()).matches()) {
return NameServerAddressUtils.getNameSrvAddrFromNamesrvEndpoint(namesrvAddr);
}
return namesrvAddr;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9011117a79fc247800c3c9aa06d58c5c97bb01a8..caf166de47d5e78d88321bdb57516219ff3efa24 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -236,11 +236,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private boolean unitMode = false;
/**
- * Max re-consume times. -1 means 16 times.
- *
+ * Max re-consume times.
+ * In concurrently mode, -1 means 16;
+ * In orderly mode, -1 means Integer.MAX_VALUE.
*
- * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
- * queue waiting.
+ * If messages are re-consumed more than {@link #maxReconsumeTimes} before success.
*/
private int maxReconsumeTimes = -1;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index b37f8a635983573c28d42a6708e0ed1ffee91b92..537dbee832d59df982120950a770caf5d697ba46 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -110,32 +110,12 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
@Override
public void incCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize < this.defaultMQPushConsumer.getConsumeThreadMax())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // + 1);
- // }
- // log.info("incCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}",
- // corePoolSize,
- // this.consumeExecutor.getCorePoolSize(),
- // this.consumerGroup);
+
}
@Override
public void decCorePoolSize() {
- // long corePoolSize = this.consumeExecutor.getCorePoolSize();
- // if (corePoolSize > this.defaultMQPushConsumer.getConsumeThreadMin())
- // {
- // this.consumeExecutor.setCorePoolSize(this.consumeExecutor.getCorePoolSize()
- // - 1);
- // }
- // log.info("decCorePoolSize Concurrently from {} to {}, ConsumerGroup:
- // {}",
- // corePoolSize,
- // this.consumeExecutor.getCorePoolSize(),
- // this.consumerGroup);
+
}
@Override
@@ -411,11 +391,11 @@ public class ConsumeMessageConcurrentlyService implements ConsumeMessageService
}
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
+ log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
- messageQueue);
+ messageQueue), e);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 130effad9e55f08ede275f6870ba9976f11f5f56..8d92b5718a624caccb95f74019859002a3230678 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -487,11 +487,11 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
- log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
+ log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
- messageQueue);
+ messageQueue), e);
hasException = true;
} finally {
this.processQueue.getConsumeLock().unlock();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 4e139c44ce0c25d0eee8b01bc2f87804a036c8b9..d28d23ad6bbb5c5f765ef8f1e470dcf00a15d237 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -100,7 +100,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
private static final String NOT_RUNNING_EXCEPTION_MESSAGE = "The consumer not running, please start it first.";
- private static final String SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
+ private static final String SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE = "Subscribe and assign are mutually exclusive.";
/**
* the type of subscription
*/
@@ -195,8 +195,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private void checkServiceState() {
- if (this.serviceState != ServiceState.RUNNING)
+ if (this.serviceState != ServiceState.RUNNING) {
throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
+ }
}
public void updateNameServerAddr(String newAddresses) {
@@ -204,10 +205,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
private synchronized void setSubscriptionType(SubscriptionType type) {
- if (this.subscriptionType == SubscriptionType.NONE)
+ if (this.subscriptionType == SubscriptionType.NONE) {
this.subscriptionType = type;
- else if (this.subscriptionType != type)
- throw new IllegalStateException(SUBSCRIPTION_CONFILCT_EXCEPTION_MESSAGE);
+ } else if (this.subscriptionType != type) {
+ throw new IllegalStateException(SUBSCRIPTION_CONFLICT_EXCEPTION_MESSAGE);
+ }
}
private void updateAssignedMessageQueue(String topic, Set assignedMessageQueue) {
@@ -464,7 +466,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
try {
- if (topic == null || topic.equals("")) {
+ if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
@@ -483,7 +485,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized void subscribe(String topic, MessageSelector messageSelector) throws MQClientException {
try {
- if (topic == null || topic.equals("")) {
+ if (topic == null || "".equals(topic)) {
throw new IllegalArgumentException("Topic can not be null or empty.");
}
setSubscriptionType(SubscriptionType.SUBSCRIBE);
@@ -533,8 +535,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public synchronized List poll(long timeout) {
try {
checkServiceState();
- if (timeout < 0)
+ if (timeout < 0) {
throw new IllegalArgumentException("Timeout must not be negative");
+ }
if (defaultLitePullConsumer.isAutoCommit()) {
maybeAutoCommit();
@@ -546,8 +549,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
if (endTime - System.currentTimeMillis() > 0) {
while (consumeRequest != null && consumeRequest.getProcessQueue().isDropped()) {
consumeRequest = consumeRequestCache.poll(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if (endTime - System.currentTimeMillis() <= 0)
+ if (endTime - System.currentTimeMillis() <= 0) {
break;
+ }
}
}
@@ -671,8 +675,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
long offset = this.offsetStore.readOffset(messageQueue, ReadOffsetType.MEMORY_FIRST_THEN_STORE);
- if (offset == -2)
+ if (offset == -2) {
throw new MQClientException("Fetch consume offset from broker exception", null);
+ }
return offset;
}
@@ -683,8 +688,9 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
Iterator iter = consumeRequestCache.iterator();
while (iter.hasNext()) {
- if (iter.next().getMessageQueue().equals(messageQueue))
+ if (iter.next().getMessageQueue().equals(messageQueue)) {
iter.remove();
+ }
}
}
@@ -735,10 +741,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
return;
}
- if (consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
+ if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
- if ((consumeRequestFlowControlTimes++ % 1000) == 0)
+ if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
+ }
return;
}
@@ -778,7 +785,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long offset = 0L;
try {
offset = nextPullOffset(messageQueue);
- } catch (MQClientException e) {
+ } catch (Exception e) {
log.error("Failed to get next pull offset", e);
scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_ON_EXCEPTION, TimeUnit.MILLISECONDS);
return;
@@ -790,11 +797,10 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
long pullDelayTimeMills = 0;
try {
SubscriptionData subscriptionData;
+ String topic = this.messageQueue.getTopic();
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
- String topic = this.messageQueue.getTopic();
subscriptionData = rebalanceImpl.getSubscriptionInner().get(topic);
} else {
- String topic = this.messageQueue.getTopic();
subscriptionData = FilterAPI.buildSubscriptionData(topic, SubscriptionData.SUB_ALL);
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index bb0b7f10436ad7ef0844bad9ec09849c72b5940c..59b8deb3fba71c0bbfe385c92ceb538fac15b2de 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -273,7 +273,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
long offset = -1L;
try {
offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
- } catch (MQClientException e) {
+ } catch (Exception e) {
this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
return;
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 833d465a4703184be6e9a17c20682fdd5d81b240..7677d8b685f43c43c789bfdffa7b42975f9f49ad 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -378,7 +378,7 @@ public abstract class RebalanceImpl {
long nextOffset = -1L;
try {
nextOffset = this.computePullFromWhereWithException(mq);
- } catch (MQClientException e) {
+ } catch (Exception e) {
log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
continue;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 286c684e43a52dc3759c0f3b9393e0ee75851ef4..8fe9400b0f74ab92c712051d40c937915365ab45 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -102,7 +102,8 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
try {
result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
} catch (MQClientException e) {
- result = -1;
+ log.warn("Compute consume offset from last offset exception, mq={}, exception={}", mq, e);
+ throw e;
}
}
} else {
diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
index af5a7053957929e62e3d934299a3a6558742b5bf..e268dff3454eb1a7556972b0bc4514944ce6faa1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/trace/TraceDataEncoderTest.java
@@ -60,7 +60,6 @@ public class TraceDataEncoderTest {
Assert.assertEquals(contexts.get(0).getTraceType(), TraceType.Pub);
}
-
@Test
public void testEncoderFromContextBean() {
TraceContext context = new TraceContext();
@@ -130,4 +129,107 @@ public class TraceDataEncoderTest {
Assert.assertEquals(before.getTransactionState(), after.getTransactionState());
Assert.assertEquals(before.isFromTransactionCheck(), after.isFromTransactionCheck());
}
+
+ @Test
+ public void testPubTraceDataFormatTest() {
+ TraceContext pubContext = new TraceContext();
+ pubContext.setTraceType(TraceType.Pub);
+ pubContext.setTimeStamp(time);
+ pubContext.setRegionId("Default-region");
+ pubContext.setGroupName("GroupName-test");
+ pubContext.setCostTime(34);
+ pubContext.setSuccess(true);
+ TraceBean bean = new TraceBean();
+ bean.setTopic("topic-test");
+ bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ bean.setTags("tags");
+ bean.setKeys("keys");
+ bean.setStoreHost("127.0.0.1:10911");
+ bean.setBodyLength(100);
+ bean.setMsgType(MessageType.Normal_Msg);
+ bean.setOffsetMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ pubContext.setTraceBeans(new ArrayList(1));
+ pubContext.getTraceBeans().add(bean);
+
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(pubContext);
+ String transData = traceTransferBean.getTransData();
+ Assert.assertNotNull(transData);
+ String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+ Assert.assertEquals(14, items.length);
+
+ }
+
+ @Test
+ public void testSubBeforeTraceDataFormatTest() {
+ TraceContext subBeforeContext = new TraceContext();
+ subBeforeContext.setTraceType(TraceType.SubBefore);
+ subBeforeContext.setTimeStamp(time);
+ subBeforeContext.setRegionId("Default-region");
+ subBeforeContext.setGroupName("GroupName-test");
+ subBeforeContext.setRequestId("3455848576927");
+ TraceBean bean = new TraceBean();
+ bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ bean.setRetryTimes(0);
+ bean.setKeys("keys");
+ subBeforeContext.setTraceBeans(new ArrayList(1));
+ subBeforeContext.getTraceBeans().add(bean);
+
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(subBeforeContext);
+ String transData = traceTransferBean.getTransData();
+ Assert.assertNotNull(transData);
+ String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+ Assert.assertEquals(8, items.length);
+
+ }
+
+ @Test
+ public void testSubAfterTraceDataFormatTest() {
+ TraceContext subAfterContext = new TraceContext();
+ subAfterContext.setTraceType(TraceType.SubAfter);
+ subAfterContext.setRequestId("3455848576927");
+ subAfterContext.setCostTime(20);
+ subAfterContext.setSuccess(true);
+ subAfterContext.setContextCode(98623046);
+ TraceBean bean = new TraceBean();
+ bean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ bean.setKeys("keys");
+ subAfterContext.setTraceBeans(new ArrayList(1));
+ subAfterContext.getTraceBeans().add(bean);
+
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(subAfterContext);
+ String transData = traceTransferBean.getTransData();
+ Assert.assertNotNull(transData);
+ String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+ Assert.assertEquals(7, items.length);
+
+ }
+
+ @Test
+ public void testEndTrxTraceDataFormatTest() {
+ TraceContext endTrxContext = new TraceContext();
+ endTrxContext.setTraceType(TraceType.EndTransaction);
+ endTrxContext.setGroupName("PID-test");
+ endTrxContext.setRegionId("DefaultRegion");
+ endTrxContext.setTimeStamp(time);
+ TraceBean endTrxTraceBean = new TraceBean();
+ endTrxTraceBean.setTopic("topic-test");
+ endTrxTraceBean.setKeys("Keys");
+ endTrxTraceBean.setTags("Tags");
+ endTrxTraceBean.setMsgId("AC1415116D1418B4AAC217FE1B4E0000");
+ endTrxTraceBean.setStoreHost("127.0.0.1:10911");
+ endTrxTraceBean.setMsgType(MessageType.Trans_msg_Commit);
+ endTrxTraceBean.setTransactionId("transactionId");
+ endTrxTraceBean.setTransactionState(LocalTransactionState.COMMIT_MESSAGE);
+ endTrxTraceBean.setFromTransactionCheck(false);
+ List traceBeans = new ArrayList();
+ traceBeans.add(endTrxTraceBean);
+ endTrxContext.setTraceBeans(traceBeans);
+
+ TraceTransferBean traceTransferBean = TraceDataEncoder.encoderFromContextBean(endTrxContext);
+ String transData = traceTransferBean.getTransData();
+ Assert.assertNotNull(transData);
+ String[] items = transData.split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
+ Assert.assertEquals(13, items.length);
+
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/NameServerAddressUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/NameServerAddressUtils.java
index 85dc95feb4c796374db0d1628c798fe7f4febc29..68f883935ae060f98f42344eef1f516b5c037c92 100644
--- a/common/src/main/java/org/apache/rocketmq/common/utils/NameServerAddressUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/NameServerAddressUtils.java
@@ -20,6 +20,7 @@ public class NameServerAddressUtils {
public static final String INSTANCE_PREFIX = "MQ_INST_";
public static final String INSTANCE_REGEX = INSTANCE_PREFIX + "\\w+_\\w+";
public static final String ENDPOINT_PREFIX = "(\\w+://|)";
+ public static final Pattern NAMESRV_ENDPOINT_PATTERN = Pattern.compile("^http://.*");
public static final Pattern INST_ENDPOINT_PATTERN = Pattern.compile("^" + ENDPOINT_PREFIX + INSTANCE_REGEX + "\\..*");
public static String getNameServerAddresses() {
diff --git a/docs/cn/Deployment.md b/docs/cn/Deployment.md
new file mode 100644
index 0000000000000000000000000000000000000000..1f00986b4beda4794d1ee3ab36378ff3789b8c93
--- /dev/null
+++ b/docs/cn/Deployment.md
@@ -0,0 +1,159 @@
+# 部署架构和设置步骤
+
+## 集群的设置
+
+### 1 单master模式
+
+这是最简单但也是最危险的模式,一旦broker服务器重启或宕机,整个服务将不可用。 建议在生产环境中不要使用这种部署方式,在本地测试和开发可以选择这种模式。 以下是构建的步骤。
+
+**1)启动NameServer**
+
+```shell
+### 第一步启动namesrv
+$ nohup sh mqnamesrv &
+
+### 验证namesrv是否启动成功
+$ tail -f ~/logs/rocketmqlogs/namesrv.log
+The Name Server boot success...
+```
+
+我们可以在namesrv.log 中看到'The Name Server boot success..',表示NameServer 已成功启动。
+
+**2)启动Broker**
+
+```shell
+### 第一步先启动broker
+$ nohup sh bin/mqbroker -n localhost:9876 &
+
+### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
+$ tail -f ~/logs/rocketmqlogs/Broker.log
+The broker[broker-a,192.169.1.2:10911] boot success...
+```
+
+我们可以在 Broker.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。
+
+### 2 多Master模式
+
+该模式是指所有节点都是master主节点(比如2个或3个主节点),没有slave从节点的模式。 这种模式的优缺点如下:
+
+- 优点:
+ 1. 配置简单。
+ 2. 一个master节点的宕机或者重启(维护)对应用程序没有影响。
+ 3. 当磁盘配置为RAID10时,消息不会丢失,因为RAID10磁盘非常可靠,即使机器不可恢复(消息异步刷盘模式的情况下,会丢失少量消息;如果消息是同步刷盘模式,不会丢失任何消息)。
+ 4. 在这种模式下,性能是最高的。
+- 缺点:
+ 1. 单台机器宕机时,本机未消费的消息,直到机器恢复后才会订阅,影响消息实时性。
+
+多Master模式的启动步骤如下:
+
+**1)启动 NameServer**
+
+```shell
+### 第一步先启动broker
+$ nohup sh mqnamesrv &
+
+### 验证namesrv是否启动成功
+$ tail -f ~/logs/rocketmqlogs/namesrv.log
+The Name Server boot success...
+```
+
+**2)启动 Broker 集群**
+
+```shell
+### 比如在A机器上启动第一个Master,假设配置的NameServer IP为:192.168.1.1
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
+
+### 然后在机器B上启动第二个Master,假设配置的NameServer IP是:192.168.1.1
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
+
+...
+```
+
+上面显示的boot命令用于单个NameServer的情况。对于多个NameServer的集群,broker boot命令中-n参数后面的地址列表用分号隔开,例如 192.168.1.1 : 9876; 192.161.2 : 9876
+
+### 3 多Master多Slave模式-异步复制
+
+每个主节点配置多个从节点,多对主从。HA采用异步复制,主节点和从节点之间有短消息延迟(毫秒)。这种模式的优缺点如下:
+
+- 优点:
+ 1. 即使磁盘损坏,也不会丢失极少的消息,不影响消息的实时性能。
+ 2. 同时,当主节点宕机时,消费者仍然可以消费从节点的消息,这个过程对应用本身是透明的,不需要人为干预。
+ 3. 性能几乎与多Master模式一样高。
+- 缺点:
+ 1. 主节点宕机、磁盘损坏时,会丢失少量消息。
+
+多主多从模式的启动步骤如下:
+
+**1)启动 NameServer**
+
+```shell
+### 第一步先启动broker
+$ nohup sh mqnamesrv &
+
+### 验证namesrv是否启动成功
+$ tail -f ~/logs/rocketmqlogs/namesrv.log
+The Name Server boot success...
+```
+
+**2)启动 Broker 集群**
+
+```shell
+### 例如在A机器上启动第一个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
+
+### 然后在机器B上启动第二个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
+
+### 然后在C机器上启动第一个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
+
+### 最后在D机启动第二个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
+```
+
+上图显示了 2M-2S-Async 模式的启动命令,类似于其他 nM-nS-Async 模式。
+
+### 4 多Master多Slave模式-同步双写
+
+这种模式下,每个master节点配置多个slave节点,有多对Master-Slave。HA采用同步双写,即只有消息成功写入到主节点并复制到多个从节点,才会返回成功响应给应用程序。
+
+这种模式的优缺点如下:
+
+- 优点:
+ 1. 数据和服务都没有单点故障。
+ 2. 在master节点关闭的情况下,消息也没有延迟。
+ 3. 服务可用性和数据可用性非常高;
+- 缺点:
+ 1. 这种模式下的性能略低于异步复制模式(大约低 10%)。
+ 2. 发送单条消息的RT略高,目前版本,master节点宕机后,slave节点无法自动切换到master。
+
+启动步骤如下:
+
+**1)启动NameServer**
+
+```shell
+### 第一步启动broker
+$ nohup sh mqnamesrv &
+
+### 验证namesrv是否启动成功
+$ tail -f ~/logs/rocketmqlogs/namesrv.log
+The Name Server boot success...
+```
+
+**2)启动 Broker 集群**
+
+```shell
+### 例如在A机器上启动第一个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
+
+### 然后在B机器上启动第二个Master,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
+
+### 然后在C机器上启动第一个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
+
+### 最后在D机启动第二个Slave,假设配置的NameServer IP为:192.168.1.1,端口为9876。
+$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
+```
+
+上述Master和Slave是通过指定相同的config命名为“brokerName”来配对的,master节点的brokerId必须为0,slave节点的brokerId必须大于0。
\ No newline at end of file
diff --git a/docs/cn/Example_Batch.md b/docs/cn/Example_Batch.md
new file mode 100644
index 0000000000000000000000000000000000000000..6c8897fa6bcfa790a66234ab257751fce1f34e0e
--- /dev/null
+++ b/docs/cn/Example_Batch.md
@@ -0,0 +1,82 @@
+# 批量消息发送
+批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。
+
+### 1 发送批量消息
+如果你一次只发送不超过 4MiB 的消息,使用批处理很容易:
+```java
+String topic = "BatchTest";
+List messages = new ArrayList<>();
+messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
+messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
+messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
+try {
+ producer.send(messages);
+} catch (Exception e) {
+ e.printStackTrace();
+ //handle the error
+}
+```
+### 2 拆分
+当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息:
+
+```java
+public class ListSplitter implements Iterator> {
+ private final int SIZE_LIMIT = 1024 * 1024 * 4;
+ private final List messages;
+ private int currIndex;
+ public ListSplitter(List messages) {
+ this.messages = messages;
+ }
+ @Override public boolean hasNext() {
+ return currIndex < messages.size();
+ }
+ @Override public List next() {
+ int startIndex = getStartIndex();
+ int nextIndex = startIndex;
+ int totalSize = 0;
+ for (; nextIndex < messages.size(); nextIndex++) {
+ Message message = messages.get(nextIndex);
+ int tmpSize = calcMessageSize(message);
+ if (tmpSize + totalSize > SIZE_LIMIT) {
+ break;
+ } else {
+ totalSize += tmpSize;
+ }
+ }
+ List subList = messages.subList(startIndex, nextIndex);
+ currIndex = nextIndex;
+ return subList;
+ }
+ private int getStartIndex() {
+ Message currMessage = messages.get(currIndex);
+ int tmpSize = calcMessageSize(currMessage);
+ while(tmpSize > SIZE_LIMIT) {
+ currIndex += 1;
+ Message message = messages.get(curIndex);
+ tmpSize = calcMessageSize(message);
+ }
+ return currIndex;
+ }
+ private int calcMessageSize(Message message) {
+ int tmpSize = message.getTopic().length() + message.getBody().length();
+ Map properties = message.getProperties();
+ for (Map.Entry entry : properties.entrySet()) {
+ tmpSize += entry.getKey().length() + entry.getValue().length();
+ }
+ tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
+ return tmpSize;
+ }
+}
+
+// then you could split the large list into small ones:
+ListSplitter splitter = new ListSplitter(messages);
+while (splitter.hasNext()) {
+ try {
+ List listItem = splitter.next();
+ producer.send(listItem);
+ } catch (Exception e) {
+ e.printStackTrace();
+ // handle the error
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/cn/Example_Delay.md b/docs/cn/Example_Delay.md
new file mode 100644
index 0000000000000000000000000000000000000000..31df40f4485e6015bbc1bc272d05f474d2837e59
--- /dev/null
+++ b/docs/cn/Example_Delay.md
@@ -0,0 +1,85 @@
+# Schedule example
+
+### 1 启动消费者等待传入的订阅消息
+
+```java
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+import java.util.List;
+
+public class ScheduledMessageConsumer {
+
+ public static void main(String[] args) throws Exception {
+ // Instantiate message consumer
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
+ // Subscribe topics
+ consumer.subscribe("TestTopic", "*");
+ // Register message listener
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) {
+ for (MessageExt message : messages) {
+ // Print approximate delay time period
+ System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ // Launch consumer
+ consumer.start();
+ }
+}
+```
+
+### 2 发送延迟消息
+
+```java
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+public class ScheduledMessageProducer {
+
+ public static void main(String[] args) throws Exception {
+ // Instantiate a producer to send scheduled messages
+ DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
+ // Launch producer
+ producer.start();
+ int totalMessagesToSend = 100;
+ for (int i = 0; i < totalMessagesToSend; i++) {
+ Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
+ // This message will be delivered to consumer 10 seconds later.
+ message.setDelayTimeLevel(3);
+ // Send the message
+ producer.send(message);
+ }
+
+ // Shutdown producer after use.
+ producer.shutdown();
+ }
+
+}
+```
+
+### 3 确认
+
+您应该会看到消息在其存储时间后大约 10 秒被消耗。
+
+### 4 延迟消息的使用场景
+
+例如在电子商务中,如果提交订单,可以发送延迟消息,1小时后可以查看订单状态。 如果订单仍未付款,则可以取消订单并释放库存。
+
+### 5 使用延迟消息的限制
+
+```java
+// org/apache/rocketmq/store/config/MessageStoreConfig.java
+
+private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
+```
+
+当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。
+
+ See `SendMessageProcessor.java`
diff --git a/docs/cn/Example_Simple_cn.md b/docs/cn/Example_Simple_cn.md
new file mode 100644
index 0000000000000000000000000000000000000000..f0a2b6a1deaab08c5ccb89155d87a29e4d22ad0f
--- /dev/null
+++ b/docs/cn/Example_Simple_cn.md
@@ -0,0 +1,136 @@
+# Basic Sample
+------
+基本示例中提供了以下两个功能
+* RocketMQ可用于以三种方式发送消息:可靠的同步、可靠的异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。
+* RocketMQ可以用来消费消息。
+### 1 添加依赖
+maven:
+``` java
+
+ org.apache.rocketmq
+ rocketmq-client
+ 4.3.0
+
+```
+gradle:
+``` java
+compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
+```
+### 2 发送消息
+##### 2.1 使用Producer发送同步消息
+可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。
+``` java
+public class SyncProducer {
+ public static void main(String[] args) throws Exception {
+ // Instantiate with a producer group name
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ // Specify name server addresses
+ producer.setNamesrvAddr("localhost:9876");
+ // Launch the producer instance
+ producer.start();
+ for (int i = 0; i < 100; i++) {
+ // Create a message instance with specifying topic, tag and message body
+ Message msg = new Message("TopicTest" /* Topic */,
+ "TagA" /* Tag */,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+ );
+ // Send message to one of brokers
+ SendResult sendResult = producer.send(msg);
+ // Check whether the message has been delivered by the callback of sendResult
+ System.out.printf("%s%n", sendResult);
+ }
+ // Shut down once the producer instance is not longer in use
+ producer.shutdown();
+ }
+}
+```
+##### 2.2 发送异步消息
+异步传输通常用于响应时间敏感的业务场景。这意味着发送方无法等待代理的响应太长时间。
+``` java
+public class AsyncProducer {
+ public static void main(String[] args) throws Exception {
+ // Instantiate with a producer group name
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ // Specify name server addresses
+ producer.setNamesrvAddr("localhost:9876");
+ // Launch the producer instance
+ producer.start();
+ producer.setRetryTimesWhenSendAsyncFailed(0);
+ for (int i = 0; i < 100; i++) {
+ final int index = i;
+ // Create a message instance with specifying topic, tag and message body
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+ // SendCallback: receive the callback of the asynchronous return result.
+ producer.send(msg, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ System.out.printf("%-10d OK %s %n", index,
+ sendResult.getMsgId());
+ }
+ @Override
+ public void onException(Throwable e) {
+ System.out.printf("%-10d Exception %s %n", index, e);
+ e.printStackTrace();
+ }
+ });
+ }
+ // Shut down once the producer instance is not longer in use
+ producer.shutdown();
+ }
+}
+```
+##### 2.3 以单向模式发送消息
+单向传输用于需要中等可靠性的情况,如日志收集。
+``` java
+public class OnewayProducer {
+ public static void main(String[] args) throws Exception{
+ // Instantiate with a producer group name
+ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
+ // Specify name server addresses
+ producer.setNamesrvAddr("localhost:9876");
+ // Launch the producer instance
+ producer.start();
+ for (int i = 0; i < 100; i++) {
+ // Create a message instance with specifying topic, tag and message body
+ Message msg = new Message("TopicTest" /* Topic */,
+ "TagA" /* Tag */,
+ ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
+ );
+ // Send in one-way mode, no return result
+ producer.sendOneway(msg);
+ }
+ // Shut down once the producer instance is not longer in use
+ producer.shutdown();
+ }
+}
+```
+### 3 消费消息
+``` java
+public class Consumer {
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ // Instantiate with specified consumer group name
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
+
+ // Specify name server addresses
+ consumer.setNamesrvAddr("localhost:9876");
+
+ // Subscribe one or more topics and tags for finding those messages need to be consumed
+ consumer.subscribe("TopicTest", "*");
+ // Register callback to execute on arrival of messages fetched from brokers
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ // Mark the message that have been consumed successfully
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+ // Launch the consumer instance
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+}
+```
\ No newline at end of file
diff --git a/docs/cn/FAQ.md b/docs/cn/FAQ.md
new file mode 100644
index 0000000000000000000000000000000000000000..b58807926a019d0a0528356983b327f0f6309564
--- /dev/null
+++ b/docs/cn/FAQ.md
@@ -0,0 +1,110 @@
+# 经常被问到的问题
+
+以下是关于RocketMQ项目的常见问题
+
+## 1 基本
+
+1. **为什么我们要使用RocketMQ而不是选择其他的产品?**
+
+ 请参考[为什么要选择RocketMQ](http://rocketmq.apache.org/docs/motivation/)
+
+2. **我是否需要安装其他的软件才能使用RocketMQ,例如zookeeper?**
+
+ 不需要,RocketMQ可以独立的运行。
+
+## 2 使用
+
+1. **新创建的Consumer ID从哪里开始消费消息?**
+
+ 1)如果发送的消息在三天之内,那么消费者会从服务器中保存的第一条消息开始消费。
+
+ 2)如果发送的消息已经超过三天,则消费者会从服务器中的最新消息开始消费,也就是从队列的尾部开始消费。
+
+ 3)如果消费者重新启动,那么它会从最后一个消费位置开始消费消息。
+
+2. **当消费失败的时候如何重新消费消息?**
+
+ 1)在集群模式下,消费的业务逻辑代码会返回Action.ReconsumerLater,NULL,或者抛出异常,如果一条消息消费失败,最多会重试16次,之后该消息会被丢弃。
+
+ 2)在广播消费模式下,广播消费仍然保证消息至少被消费一次,但不提供重发的选项。
+
+3. **当消费失败的时候如何找到失败的消息?**
+
+ 1)使用按时间的主题查询,可以查询到一段时间内的消息。
+
+ 2)使用主题和消息ID来准确查询消息。
+
+ 3)使用主题和消息的Key来准确查询所有消息Key相同的消息。
+
+4. **消息只会被传递一次吗?**
+
+ RocketMQ 确保所有消息至少传递一次。 在大多数情况下,消息不会重复。
+
+5. **如何增加一个新的Broker?**
+
+ 1)启动一个新的Broker并将其注册到name server中的Broker列表里。
+
+ 2)默认只自动创建内部系统topic和consumer group。 如果您希望在新节点上拥有您的业务主题和消费者组,请从现有的Broker中复制它们。 我们提供了管理工具和命令行来处理此问题。
+
+## 3 配置相关
+
+以下回答均为默认值,可通过配置修改。
+
+1. **消息在服务器上可以保存多长时间?**
+
+ 存储的消息将最多保存 3 天,超过 3 天未使用的消息将被删除。
+
+2. **消息体的大小限制是多少?**
+
+ 通常是256KB
+
+3. **怎么设置消费者线程数?**
+
+ 当你启动消费者的时候,可以设置 ConsumeThreadNums属性的值,举例如下:
+
+ ```java
+ consumer.setConsumeThreadMin(20);
+ consumer.setConsumeThreadMax(20);
+ ```
+
+## 4 错误
+
+1. **当你启动一个生产者或消费者的过程失败了并且错误信息是生产者组或消费者重复**
+
+ 原因:使用同一个Producer/Consumer Group在同一个JVM中启动多个Producer/Consumer实例可能会导致客户端无法启动。
+
+ 解决方案:确保一个 Producer/Consumer Group 对应的 JVM 只启动一个 Producer/Consumer 实例。
+
+2. **消费者无法在广播模式下开始加载 json 文件**
+
+ 原因:fastjson 版本太低,无法让广播消费者加载本地 offsets.json,导致消费者启动失败。 损坏的 fastjson 文件也会导致同样的问题。
+
+ 解决方案:Fastjson 版本必须升级到 RocketMQ 客户端依赖版本,以确保可以加载本地 offsets.json。 默认情况下,offsets.json 文件在 /home/{user}/.rocketmq_offsets 中。 或者检查fastjson的完整性。
+
+3. **Broker崩溃以后有什么影响?**
+
+ 1)Master节点崩溃
+
+ 消息不能再发送到该Broker集群,但是如果您有另一个可用的Broker集群,那么在主题存在的条件下仍然可以发送消息。消息仍然可以从Slave节点消费。
+
+ 2)一些Slave节点崩溃
+
+ 只要有另一个工作的slave,就不会影响发送消息。 对消费消息也不会产生影响,除非消费者组设置为优先从该Slave消费。 默认情况下,消费者组从 master 消费。
+
+ 3)所有Slave节点崩溃
+
+ 向master发送消息不会有任何影响,但是,如果master是SYNC_MASTER,producer会得到一个SLAVE_NOT_AVAILABLE,表示消息没有发送给任何slave。 对消费消息也没有影响,除非消费者组设置为优先从slave消费。 默认情况下,消费者组从master消费。
+
+4. **Producer提示“No Topic Route Info”,如何诊断?**
+
+ 当您尝试将消息发送到一个路由信息对生产者不可用的主题时,就会发生这种情况。
+
+ 1)确保生产者可以连接到名称服务器并且能够从中获取路由元信息。
+
+ 2)确保名称服务器确实包含主题的路由元信息。 您可以使用管理工具或 Web 控制台通过 topicRoute 从名称服务器查询路由元信息。
+
+ 3)确保您的Broker将心跳发送到您的生产者正在连接的同一name server列表。
+
+ 4)确保主题的权限为6(rw-),或至少为2(-w-)。
+
+ 如果找不到此主题,请通过管理工具命令updateTopic或Web控制台在Broker上创建它。
\ No newline at end of file
diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md
index 4ad6bb23fe7332bfb87d4eeb42997755f58c9777..b24c6eca57bb77e00a53cf49b9770f8a756ad413 100755
--- a/docs/cn/best_practice.md
+++ b/docs/cn/best_practice.md
@@ -186,8 +186,8 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
| brokerName | null | broker 的名称 |
| brokerClusterName | DefaultCluster | 本 broker 所属的 Cluser 名称 |
| brokerId | 0 | broker id, 0 表示 master, 其他的正整数表示 slave |
+| storePathRootDir | $HOME/store/ | 存储根路径 |
| storePathCommitLog | $HOME/store/commitlog/ | 存储 commit log 的路径 |
-| storePathConsumerQueue | $HOME/store/consumequeue/ | 存储 consume queue 的路径 |
| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | commit log 的映射文件大小 |
| deleteWhen | 04 | 在每天的什么时间删除已经超过文件保留时间的 commit log |
| fileReservedTime | 72 | 以小时计算的文件保留时间 |
diff --git a/docs/cn/image/rocketmq_architecture_1.png b/docs/cn/image/rocketmq_architecture_1.png
index 1d3e82b354b563e17aa90d64c9f94dd9bfe476b6..addb571a84459640ad8e021e31bd4e4cad2e0f5e 100644
Binary files a/docs/cn/image/rocketmq_architecture_1.png and b/docs/cn/image/rocketmq_architecture_1.png differ
diff --git a/docs/cn/image/rocketmq_architecture_3.png b/docs/cn/image/rocketmq_architecture_3.png
index 6460f8e1c30ed6e98e245aa29cf79db89f4891cc..b5d755adbdb8e1a13c871ec6687e564afa946091 100644
Binary files a/docs/cn/image/rocketmq_architecture_3.png and b/docs/cn/image/rocketmq_architecture_3.png differ
diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
index b6447186ce31f8f367243a0278fffc5ef89b59a9..b7807334403548d4b4ab773f76a5bd625f0a5838 100644
--- a/docs/cn/msg_trace/user_guide.md
+++ b/docs/cn/msg_trace/user_guide.md
@@ -100,6 +100,19 @@ RocketMQ的消息轨迹特性支持两种存储轨迹数据的方式:
......
```
-
-
-
+### 4.4 使用mqadmin命令发送和查看轨迹
+- 发送消息
+```shell
+./mqadmin sendMessage -m true --topic some-topic-name -n 127.0.0.1:9876 -p "your meesgae content"
+```
+- 查询轨迹
+```shell
+./mqadmin QueryMsgTraceById -n 127.0.0.1:9876 -i "some-message-id"
+```
+- 查询轨迹结果
+```
+RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
+RocketMQLog:WARN Please initialize the logger system properly.
+#Type #ProducerGroup #ClientHost #SendTime #CostTimes #Status
+Pub 1623305799667 xxx.xxx.xxx.xxx 2021-06-10 14:16:40 131ms success
+```
diff --git a/docs/en/best_practice.md b/docs/en/best_practice.md
index be3b937d1037b859e9c117e8f1a0152a33adcb6c..213dc3174d93be0d2ffdc920a371114cb22cdf51 100755
--- a/docs/en/best_practice.md
+++ b/docs/en/best_practice.md
@@ -20,8 +20,8 @@
| brokerName | null | broker name |
| brokerClusterName | DefaultCluster | this broker belongs to which cluster |
| brokerId | 0 | broker id, 0 means master, positive integers mean slave |
+| storePathRootDir | $HOME/store/ | file path for root store |
| storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
-| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
| mappedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |
| deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |
| fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |
diff --git a/docs/en/client/java/API_Reference_DefaultMQProducer.md b/docs/en/client/java/API_Reference_DefaultMQProducer.md
new file mode 100644
index 0000000000000000000000000000000000000000..152f45dca21b7092c1b149343da9877d8f5133f1
--- /dev/null
+++ b/docs/en/client/java/API_Reference_DefaultMQProducer.md
@@ -0,0 +1,71 @@
+## DefaultMQProducer
+---
+### Class introduction
+
+`public class DefaultMQProducer
+extends ClientConfig
+implements MQProducer`
+
+>`DefaultMQProducer` is the entry point for an application to post messages, out of the box,ca quickly create a producer with a no-argument construction. it is mainly responsible for message sending, support synchronous、asynchronous、one-way send. All of these send methods support batch send. The parameters of the sender can be adjusted through the getter/setter methods , provided by this class. `DefaultMQProducer` has multi send method and each method is slightly different. Make sure you know the usage before you use it . Blow is a producer example . [see more examples](https://github.com/apache/rocketmq/blob/master/example/src/main/java/org/apache/rocketmq/example/)。
+
+``` java
+public class Producer {
+ public static void main(String[] args) throws MQClientException {
+ // create a produce with producer_group_name
+ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
+
+ // start the producer
+ producer.start();
+
+ for (int i = 0; i < 128; i++)
+ try {
+ // construct the msg
+ Message msg = new Message("TopicTest",
+ "TagA",
+ "OrderID188",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ // send sync
+ SendResult sendResult = producer.send(msg);
+
+ // print the result
+ System.out.printf("%s%n", sendResult);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ producer.shutdown();
+ }
+}
+```
+
+**Note** : This class is thread safe. It can be safely shared between multiple threads after configuration and startup is complete.
+
+### Variable
+
+|Type|Name| description |
+|------|-------|-------|
+|DefaultMQProducerImpl|defaultMQProducerImpl|The producer's internal default implementation|
+|String|producerGroup|The producer's group|
+|String|createTopicKey| Topics that do not exist on the server are automatically created when the message is sent |
+|int|defaultTopicQueueNums|The default number of queues to create a topic|
+|int|sendMsgTimeout|The timeout for the message to be sent|
+|int|compressMsgBodyOverHowmuch|the threshold of the compress of message body|
+|int|retryTimesWhenSendFailed|Maximum number of internal attempts to send a message in synchronous mode|
+|int|retryTimesWhenSendAsyncFailed|Maximum number of internal attempts to send a message in asynchronous mode|
+|boolean|retryAnotherBrokerWhenNotStoreOK|Whether to retry another broker if an internal send fails|
+|int|maxMessageSize| Maximum length of message |
+|TraceDispatcher|traceDispatcher| Message trackers. Use rcpHook to track messages |
+
+### construction method
+
+|方法名称|方法描述|
+|-------|------------|
+|DefaultMQProducer()| creates a producer with default parameter values |
+|DefaultMQProducer(final String producerGroup)| creates a producer with producer group name. |
+|DefaultMQProducer(final String producerGroup, boolean enableMsgTrace)|creates a producer with producer group name and set whether to enable message tracking|
+|DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic)|creates a producer with producer group name and set whether to enable message tracking、the trace topic.|
+|DefaultMQProducer(RPCHook rpcHook)|creates a producer with a rpc hook.|
+|DefaultMQProducer(final String producerGroup, RPCHook rpcHook)|creates a producer with a rpc hook and producer group.|
+|DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic)|all of above.|
+
diff --git a/docs/en/image/rocketmq_architecture_1.png b/docs/en/image/rocketmq_architecture_1.png
index 1d3e82b354b563e17aa90d64c9f94dd9bfe476b6..addb571a84459640ad8e021e31bd4e4cad2e0f5e 100644
Binary files a/docs/en/image/rocketmq_architecture_1.png and b/docs/en/image/rocketmq_architecture_1.png differ
diff --git a/docs/en/image/rocketmq_architecture_3.png b/docs/en/image/rocketmq_architecture_3.png
index 6460f8e1c30ed6e98e245aa29cf79db89f4891cc..b5d755adbdb8e1a13c871ec6687e564afa946091 100644
Binary files a/docs/en/image/rocketmq_architecture_3.png and b/docs/en/image/rocketmq_architecture_3.png differ
diff --git a/docs/en/msg_trace/user_guide.md b/docs/en/msg_trace/user_guide.md
index eb540d02be3cbeecdb3f0bc177174d570f4a1058..9573056d1320d7ef18c8e47b5e8f9b8d3a04792a 100644
--- a/docs/en/msg_trace/user_guide.md
+++ b/docs/en/msg_trace/user_guide.md
@@ -101,5 +101,21 @@ Adjusting instantiation of DefaultMQProducer and DefaultMQPushConsumer as follow
```
+### 4.4 Send and query message trace by mqadmin command
+- send message
+```shell
+./mqadmin sendMessage -m true --topic some-topic-name -n 127.0.0.1:9876 -p "your meesgae content"
+```
+- query trace
+```shell
+./mqadmin QueryMsgTraceById -n 127.0.0.1:9876 -i "some-message-id"
+```
+- query trace result
+```
+RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
+RocketMQLog:WARN Please initialize the logger system properly.
+#Type #ProducerGroup #ClientHost #SendTime #CostTimes #Status
+Pub 1623305799667 xxx.xxx.xxx.xxx 2021-06-10 14:16:40 131ms success
+```
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index 85af04eabb6c9a9a91b50b6b8988e9b38ecad096..1b511d8a9e5ffef29452badc44176ad5833855a7 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -69,6 +69,7 @@ public class TransactionProducer {
config.batchId = commandLine.hasOption("b") ? Long.parseLong(commandLine.getOptionValue("b")) : System.currentTimeMillis();
config.sendInterval = commandLine.hasOption("i") ? Integer.parseInt(commandLine.getOptionValue("i")) : 0;
config.aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
+ config.msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(config.threadCount);
@@ -123,8 +124,12 @@ public class TransactionProducer {
}, 10000, 10000);
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
- final TransactionMQProducer producer =
- new TransactionMQProducer("benchmark_transaction_producer", config.aclEnable ? AclClient.getAclRPCHook() : null);
+ final TransactionMQProducer producer = new TransactionMQProducer(
+ null,
+ "benchmark_transaction_producer",
+ config.aclEnable ? AclClient.getAclRPCHook() : null,
+ config.msgTraceEnable,
+ null);
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
@@ -256,6 +261,10 @@ public class TransactionProducer {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("m", "msgTraceEnable", true, "Message Trace Enable, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
}
@@ -439,6 +448,7 @@ class TxSendConfig {
long batchId;
int sendInterval;
boolean aclEnable;
+ boolean msgTraceEnable;
}
class LRUMap extends LinkedHashMap {
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index 53a1d4dd64ac027d52389520c05b7cad08228120..771eea1598e119cd5f8e0e44fa4132941ac94b22 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -65,6 +65,39 @@ public class Producer {
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
+ /*
+ * There are different ways to send message, if you don't care about the send result,you can use this way
+ * {@code
+ * producer.sendOneway(msg);
+ * }
+ */
+
+ /*
+ * if you want to get the send result in a synchronize way, you can use this send method
+ * {@code
+ * SendResult sendResult = producer.send(msg);
+ * System.out.printf("%s%n", sendResult);
+ * }
+ */
+
+ /*
+ * if you want to get the send result in a asynchronize way, you can use this send method
+ * {@code
+ *
+ * producer.send(msg, new SendCallback() {
+ * @Override
+ * public void onSuccess(SendResult sendResult) {
+ * // do something
+ * }
+ *
+ * @Override
+ * public void onException(Throwable e) {
+ * // do something
+ * }
+ *});
+ *
+ *}
+ */
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
diff --git a/pom.xml b/pom.xml
index b6f1770e2d85c6295c02f5077926e158d1d8f872..59fe5c8072b2af583a0ec43d65ab268de58f79f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -537,7 +537,7 @@
io.netty
netty-all
- 4.0.42.Final
+ 4.1.65.Final
com.alibaba
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
index 4b4e86e68715c986c6823a310406221b785c62ff..1a0c9b53e32424857fc6d575a0989ac12875a4fe 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyLogger.java
@@ -52,6 +52,8 @@ public class NettyLogger {
private InternalLogger logger = null;
+ private static final String EXCEPTION_MESSAGE = "Unexpected exception:";
+
public NettyBridgeLogger(String name) {
logger = InternalLoggerFactory.getLogger(name);
}
@@ -161,6 +163,25 @@ public class NettyLogger {
}
}
+ @Override
+ public void log(InternalLogLevel internalLogLevel, Throwable throwable) {
+ if (internalLogLevel.equals(InternalLogLevel.DEBUG)) {
+ logger.debug(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.TRACE)) {
+ logger.info(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.INFO)) {
+ logger.info(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.WARN)) {
+ logger.warn(EXCEPTION_MESSAGE, throwable);
+ }
+ if (internalLogLevel.equals(InternalLogLevel.ERROR)) {
+ logger.error(EXCEPTION_MESSAGE, throwable);
+ }
+ }
+
@Override
public boolean isTraceEnabled() {
return isEnabled(InternalLogLevel.TRACE);
@@ -191,6 +212,11 @@ public class NettyLogger {
logger.info(var1, var2);
}
+ @Override
+ public void trace(Throwable var1) {
+ logger.info(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isDebugEnabled() {
return isEnabled(InternalLogLevel.DEBUG);
@@ -221,6 +247,11 @@ public class NettyLogger {
logger.debug(var1, var2);
}
+ @Override
+ public void debug(Throwable var1) {
+ logger.debug(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isInfoEnabled() {
return isEnabled(InternalLogLevel.INFO);
@@ -251,6 +282,11 @@ public class NettyLogger {
logger.info(var1, var2);
}
+ @Override
+ public void info(Throwable var1) {
+ logger.info(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isWarnEnabled() {
return isEnabled(InternalLogLevel.WARN);
@@ -281,6 +317,11 @@ public class NettyLogger {
logger.warn(var1, var2);
}
+ @Override
+ public void warn(Throwable var1) {
+ logger.warn(EXCEPTION_MESSAGE, var1);
+ }
+
@Override
public boolean isErrorEnabled() {
return isEnabled(InternalLogLevel.ERROR);
@@ -310,6 +351,11 @@ public class NettyLogger {
public void error(String var1, Throwable var2) {
logger.error(var1, var2);
}
+
+ @Override
+ public void error(Throwable var1) {
+ logger.error(EXCEPTION_MESSAGE, var1);
+ }
}
}
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 3f21eb8ff1b1e4bb94b38e75d3a02197405b48b1..c9b39946c07f64d8322c2617fc7431bbb1f9d828 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -553,7 +553,7 @@ public abstract class NettyRemotingAbstract {
throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
} else {
String info = String.format(
- "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
+ "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreOnewayValue: %d",
timeoutMillis,
this.semaphoreOneway.getQueueLength(),
this.semaphoreOneway.availablePermits()
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index cce6481b8daa119def971b3c4c2f95d4b3e6502a..57fa3637856380a2bb03767d165ad811880bea68 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -19,8 +19,8 @@ package org.apache.rocketmq.store;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -1185,7 +1185,7 @@ public class CommitLog {
this.mappedFileQueue.destroy();
}
- public boolean appendData(long startOffset, byte[] data) {
+ public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
putMessageLock.lock();
try {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(startOffset);
@@ -1194,7 +1194,7 @@ public class CommitLog {
return false;
}
- return mappedFile.appendMessage(data);
+ return mappedFile.appendMessage(data, dataStart, dataLength);
} finally {
putMessageLock.unlock();
}
@@ -1403,48 +1403,55 @@ public class CommitLog {
* GroupCommit Service
*/
class GroupCommitService extends FlushCommitLogService {
- private volatile List requestsWrite = new ArrayList();
- private volatile List requestsRead = new ArrayList();
+ private volatile LinkedList requestsWrite = new LinkedList();
+ private volatile LinkedList requestsRead = new LinkedList();
+ private final PutMessageSpinLock lock = new PutMessageSpinLock();
public synchronized void putRequest(final GroupCommitRequest request) {
- synchronized (this.requestsWrite) {
+ lock.lock();
+ try {
this.requestsWrite.add(request);
+ } finally {
+ lock.unlock();
}
this.wakeup();
}
private void swapRequests() {
- List tmp = this.requestsWrite;
- this.requestsWrite = this.requestsRead;
- this.requestsRead = tmp;
+ lock.lock();
+ try {
+ LinkedList tmp = this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ } finally {
+ lock.unlock();
+ }
}
private void doCommit() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (GroupCommitRequest req : this.requestsRead) {
- // There may be a message in the next file, so a maximum of
- // two times the flush
- boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
- for (int i = 0; i < 2 && !flushOK; i++) {
- CommitLog.this.mappedFileQueue.flush(0);
- flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
- }
-
- req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ if (!this.requestsRead.isEmpty()) {
+ for (GroupCommitRequest req : this.requestsRead) {
+ // There may be a message in the next file, so a maximum of
+ // two times the flush
+ boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+ for (int i = 0; i < 2 && !flushOK; i++) {
+ CommitLog.this.mappedFileQueue.flush(0);
+ flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
}
- long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
- if (storeTimestamp > 0) {
- CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
- }
+ req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ }
- this.requestsRead.clear();
- } else {
- // Because of individual messages is set to not sync flush, it
- // will come to this process
- CommitLog.this.mappedFileQueue.flush(0);
+ long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
+ if (storeTimestamp > 0) {
+ CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
+
+ this.requestsRead = new LinkedList<>();
+ } else {
+ // Because of individual messages is set to not sync flush, it
+ // will come to this process
+ CommitLog.this.mappedFileQueue.flush(0);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b8ecdee8cc63141876bf6b8f0b2efb3f811479b1..7dd5a32b2fab0d873736479ddde8f9c402e3ec0c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -921,13 +921,13 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public boolean appendToCommitLog(long startOffset, byte[] data) {
+ public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
if (this.shutdown) {
log.warn("message store has shutdown, so appendToPhyQueue is forbidden");
return false;
}
- boolean result = this.commitLog.appendData(startOffset, data);
+ boolean result = this.commitLog.appendData(startOffset, data, dataStart, dataLength);
if (result) {
this.reputMessageService.wakeup();
} else {
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index b5bdc7766b118f02b7f6c152b85f509ca03e8644..25f0e393144bdd978a72afbf07c6385fda62a4a0 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -301,7 +301,7 @@ public class MappedFile extends ReferenceResource {
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
- commit0(commitLeastPages);
+ commit0();
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
@@ -317,11 +317,11 @@ public class MappedFile extends ReferenceResource {
return this.committedPosition.get();
}
- protected void commit0(final int commitLeastPages) {
+ protected void commit0() {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
- if (writePos - lastCommittedPosition > commitLeastPages) {
+ if (writePos - lastCommittedPosition > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 64eb5250de6e59f7ce3ef19033cd40b5d5f0254f..a8c658bfe2635eeaa847faf94f7171d9a8432901 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -245,9 +245,11 @@ public interface MessageStore {
*
* @param startOffset starting offset.
* @param data data to append.
+ * @param dataStart the start index of data array
+ * @param dataLength the length of data array
* @return true if success; false otherwise.
*/
- boolean appendToCommitLog(final long startOffset, final byte[] data);
+ boolean appendToCommitLog(final long startOffset, final byte[] data, int dataStart, int dataLength);
/**
* Execute file deletion manually.
diff --git a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
index 034b964bac2a8f7508171df783f8007659af5cab..03061e6b48a83bec8f4840c01b59bd3a511cfd63 100644
--- a/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
+++ b/store/src/main/java/org/apache/rocketmq/store/SelectMappedBufferResult.java
@@ -48,13 +48,6 @@ public class SelectMappedBufferResult {
this.byteBuffer.limit(this.size);
}
-// @Override
-// protected void finalize() {
-// if (this.mappedFile != null) {
-// this.release();
-// }
-// }
-
public synchronized void release() {
if (this.mappedFile != null) {
this.mappedFile.release();
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
index d43b34342f6a744e7f5d02d7a0a8ed6525c0b06d..8372845e7fed8ed68b274bff18a5bb8de82d0925 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
@@ -57,6 +57,7 @@ public class StoreStatsService extends ServiceThread {
private final LinkedList getTimesMissList = new LinkedList();
private final LinkedList transferedMsgCountList = new LinkedList();
private volatile AtomicLong[] putMessageDistributeTime;
+ private volatile AtomicLong[] lastPutMessageDistributeTime;
private long messageStoreBootTimestamp = System.currentTimeMillis();
private volatile long putMessageEntireTimeMax = 0;
private volatile long getMessageEntireTimeMax = 0;
@@ -80,11 +81,11 @@ public class StoreStatsService extends ServiceThread {
next[i] = new AtomicLong(0);
}
- AtomicLong[] old = this.putMessageDistributeTime;
+ this.lastPutMessageDistributeTime = this.putMessageDistributeTime;
this.putMessageDistributeTime = next;
- return old;
+ return lastPutMessageDistributeTime;
}
public long getPutMessageEntireTimeMax() {
@@ -298,7 +299,7 @@ public class StoreStatsService extends ServiceThread {
}
private String putMessageDistributeTimeToString() {
- final AtomicLong[] times = this.putMessageDistributeTime;
+ final AtomicLong[] times = this.lastPutMessageDistributeTime;
if (null == times)
return null;
diff --git a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 90fd6f3f169871fa1d6a226b08a6d0a6a8a901f4..ea791bd9ff66dcc3bf32f4b0c4f6e65aec3c12f9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -562,11 +562,13 @@ public class DLedgerCommitLog extends CommitLog {
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
- dledgerFuture = (BatchAppendFuture) dLedgerServer.handleAppend(request);
- if (dledgerFuture.getPos() == -1) {
- log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+ AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request);
+ if (appendFuture.getPos() == -1) {
+ log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
}
+ dledgerFuture = (BatchAppendFuture) appendFuture;
+
long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
@@ -789,11 +791,13 @@ public class DLedgerCommitLog extends CommitLog {
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
request.setBatchMsgs(encodeResult.batchData);
- dledgerFuture = (BatchAppendFuture) dLedgerServer.handleAppend(request);
- if (dledgerFuture.getPos() == -1) {
- log.warn("HandleAppend return false due to error code {}", dledgerFuture.get().getCode());
+ AppendFuture appendFuture = (AppendFuture) dLedgerServer.handleAppend(request);
+ if (appendFuture.getPos() == -1) {
+ log.warn("HandleAppend return false due to error code {}", appendFuture.get().getCode());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
}
+ dledgerFuture = (BatchAppendFuture) appendFuture;
+
long wroteOffset = 0;
int msgIdLength = (messageExtBatch.getSysFlag() & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
@@ -901,7 +905,7 @@ public class DLedgerCommitLog extends CommitLog {
}
@Override
- public boolean appendData(long startOffset, byte[] data) {
+ public boolean appendData(long startOffset, byte[] data, int dataStart, int dataLength) {
//the old ha service will invoke method, here to prevent it
return false;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 34c51eb9e26fc5df4efbc8bef58d360f759a5fe5..845935bb9dd4212625b2a4399c93d0db4d6952aa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -25,7 +25,6 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -39,6 +38,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;
public class HAService {
@@ -254,12 +254,16 @@ public class HAService {
class GroupTransferService extends ServiceThread {
private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
- private volatile List requestsWrite = new ArrayList<>();
- private volatile List requestsRead = new ArrayList<>();
+ private final PutMessageSpinLock lock = new PutMessageSpinLock();
+ private volatile LinkedList requestsWrite = new LinkedList<>();
+ private volatile LinkedList requestsRead = new LinkedList<>();
- public synchronized void putRequest(final CommitLog.GroupCommitRequest request) {
- synchronized (this.requestsWrite) {
+ public void putRequest(final CommitLog.GroupCommitRequest request) {
+ lock.lock();
+ try {
this.requestsWrite.add(request);
+ } finally {
+ lock.unlock();
}
this.wakeup();
}
@@ -269,32 +273,35 @@ public class HAService {
}
private void swapRequests() {
- List tmp = this.requestsWrite;
- this.requestsWrite = this.requestsRead;
- this.requestsRead = tmp;
+ lock.lock();
+ try {
+ LinkedList tmp = this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ } finally {
+ lock.unlock();
+ }
}
private void doWaitTransfer() {
- synchronized (this.requestsRead) {
- if (!this.requestsRead.isEmpty()) {
- for (CommitLog.GroupCommitRequest req : this.requestsRead) {
- boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ if (!this.requestsRead.isEmpty()) {
+ for (CommitLog.GroupCommitRequest req : this.requestsRead) {
+ boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+ long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
- while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
- this.notifyTransferObject.waitForRunning(1000);
- transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- }
-
- if (!transferOK) {
- log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
- }
+ while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
+ this.notifyTransferObject.waitForRunning(1000);
+ transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
+ }
- req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ if (!transferOK) {
+ log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
}
- this.requestsRead.clear();
+ req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}
+
+ this.requestsRead = new LinkedList<>();
}
}
@@ -433,7 +440,6 @@ public class HAService {
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
- int readSocketPos = this.byteBufferRead.position();
while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
@@ -452,13 +458,12 @@ public class HAService {
}
if (diff >= (msgHeaderSize + bodySize)) {
- byte[] bodyData = new byte[bodySize];
- this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);
- this.byteBufferRead.get(bodyData);
+ byte[] bodyData = byteBufferRead.array();
+ int dataStart = this.dispatchPosition + msgHeaderSize;
- HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);
+ HAService.this.defaultMessageStore.appendToCommitLog(
+ masterPhyOffset, bodyData, dataStart, bodySize);
- this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;
if (!reportSlaveMaxOffsetPlus()) {
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
index 75b7597feade1c34092d8f6fee8d0d12f4126fe2..d5ed65f5f760f177127a56c196563c89d904d5e9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/WaitNotifyObject.java
@@ -20,40 +20,43 @@ import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
public class WaitNotifyObject {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
- protected final HashMap waitingThreadTable =
- new HashMap(16);
+ protected final ConcurrentHashMap waitingThreadTable =
+ new ConcurrentHashMap(16);
- protected volatile boolean hasNotified = false;
+ protected AtomicBoolean hasNotified = new AtomicBoolean(false);
public void wakeup() {
- synchronized (this) {
- if (!this.hasNotified) {
- this.hasNotified = true;
+ boolean needNotify = hasNotified.compareAndSet(false, true);
+ if (needNotify) {
+ synchronized (this) {
this.notify();
}
}
}
protected void waitForRunning(long interval) {
+ if (this.hasNotified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
synchronized (this) {
- if (this.hasNotified) {
- this.hasNotified = false;
- this.onWaitEnd();
- return;
- }
-
try {
+ if (this.hasNotified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
this.wait(interval);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
- this.hasNotified = false;
+ this.hasNotified.set(false);
this.onWaitEnd();
}
}
@@ -63,15 +66,14 @@ public class WaitNotifyObject {
}
public void wakeupAll() {
- synchronized (this) {
- boolean needNotify = false;
-
- for (Map.Entry entry : this.waitingThreadTable.entrySet()) {
- needNotify = needNotify || !entry.getValue();
- entry.setValue(true);
+ boolean needNotify = false;
+ for (Map.Entry entry : this.waitingThreadTable.entrySet()) {
+ if (entry.getValue().compareAndSet(false, true)) {
+ needNotify = true;
}
-
- if (needNotify) {
+ }
+ if (needNotify) {
+ synchronized (this) {
this.notifyAll();
}
}
@@ -79,20 +81,22 @@ public class WaitNotifyObject {
public void allWaitForRunning(long interval) {
long currentThreadId = Thread.currentThread().getId();
+ AtomicBoolean notified = this.waitingThreadTable.computeIfAbsent(currentThreadId, k -> new AtomicBoolean(false));
+ if (notified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
synchronized (this) {
- Boolean notified = this.waitingThreadTable.get(currentThreadId);
- if (notified != null && notified) {
- this.waitingThreadTable.put(currentThreadId, false);
- this.onWaitEnd();
- return;
- }
-
try {
+ if (notified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
this.wait(interval);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
- this.waitingThreadTable.put(currentThreadId, false);
+ notified.set(false);
this.onWaitEnd();
}
}