diff --git a/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
similarity index 100%
rename from PULL_REQUEST_TEMPLATE.md
rename to .github/PULL_REQUEST_TEMPLATE.md
diff --git a/broker/pom.xml b/broker/pom.xml
index e81ee030d04cb83edb4f540fe2c148023f3f1550..3da70fa3368034d3cc12ff42bc3b4f1430c51b71 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 2d4bedcb96e76ba1f723287d2c110f80470722ff..6aefe81a1139ef8434203d870ae054a779d89196 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.latency;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -27,6 +28,10 @@ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} and
+ * {@link BrokerController#pullThreadPoolQueue}
+ */
public class BrokerFastFailure {
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -52,7 +57,9 @@ public class BrokerFastFailure {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
- cleanExpiredRequest();
+ if (brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
+ cleanExpiredRequest();
+ }
}
}, 1000, 10, TimeUnit.MILLISECONDS);
}
@@ -75,10 +82,18 @@ public class BrokerFastFailure {
}
}
+ cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
+ this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
+
+ cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
+ this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
+ }
+
+ void cleanExpiredRequestInQueue(final BlockingQueue blockingQueue, final long maxWaitTimeMillsInQueue) {
while (true) {
try {
- if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) {
- final Runnable runnable = this.brokerController.getSendThreadPoolQueue().peek();
+ if (!blockingQueue.isEmpty()) {
+ final Runnable runnable = blockingQueue.peek();
if (null == runnable) {
break;
}
@@ -88,10 +103,10 @@ public class BrokerFastFailure {
}
final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
- if (behind >= this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
- if (this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
+ if (behind >= maxWaitTimeMillsInQueue) {
+ if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
- rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, this.brokerController.getSendThreadPoolQueue().size()));
+ rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
}
} else {
break;
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..5d0f7f9d72b5b31a8308f87c969fe83d6a41335c
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.latency;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BrokerFastFailureTest {
+ @Test
+ public void testCleanExpiredRequestInQueue() throws Exception {
+ BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
+
+ BlockingQueue queue = new LinkedBlockingQueue<>();
+ brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
+ assertThat(queue.size()).isZero();
+
+ //Normal Runnable
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+
+ }
+ };
+ queue.add(runnable);
+
+ assertThat(queue.size()).isEqualTo(1);
+ brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
+ assertThat(queue.size()).isEqualTo(1);
+
+ queue.clear();
+
+ //With expired request
+ RequestTask expiredRequest = new RequestTask(runnable, null, null);
+ queue.add(new FutureTaskExt<>(expiredRequest, null));
+ TimeUnit.MILLISECONDS.sleep(100);
+
+ RequestTask requestTask = new RequestTask(runnable, null, null);
+ queue.add(new FutureTaskExt<>(requestTask, null));
+
+ assertThat(queue.size()).isEqualTo(2);
+ brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
+ assertThat(queue.size()).isEqualTo(1);
+ assertThat(((FutureTaskExt) queue.peek()).getRunnable()).isEqualTo(requestTask);
+ }
+
+}
\ No newline at end of file
diff --git a/client/pom.xml b/client/pom.xml
index 3f5c9617886f5bbc536ee56844fa829e9208133f..56dea38b9419be0d0ba2c3346de388119f8498ae 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 72bc953f5664018b3b9aed1f482346846f4eaba8..f560376c6ad731407dce25aa98ed58de92c1d5b1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
// consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
- || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
- || this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
throw new MQClientException(
"consumeThreadMin Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
@@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
null);
}
+ // consumeThreadMin can't be larger than consumeThreadMax
+ if (this.defaultMQPushConsumer.getConsumeThreadMin() > this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ throw new MQClientException(
+ "consumeThreadMin (" + this.defaultMQPushConsumer.getConsumeThreadMin() + ") "
+ + "is larger than consumeThreadMax (" + this.defaultMQPushConsumer.getConsumeThreadMax() + ")",
+ null);
+ }
+
// consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 65535) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 4176520abd5c59b8d1ed1bfc549e166e7a00741e..0cea1aea890f70613cb136918ba448f10e2cc1b4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -48,7 +48,10 @@ public class ProcessQueue {
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
- private final TreeMap msgTreeMapTemp = new TreeMap();
+ /**
+ * A subset of msgTreeMap, will only be used when orderly consume
+ */
+ private final TreeMap consumingMsgOrderlyTreeMap = new TreeMap();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
private volatile long queueOffsetMax = 0L;
private volatile boolean dropped = false;
@@ -243,8 +246,8 @@ public class ProcessQueue {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
- this.msgTreeMap.putAll(this.msgTreeMapTemp);
- this.msgTreeMapTemp.clear();
+ this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
+ this.consumingMsgOrderlyTreeMap.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
@@ -257,12 +260,12 @@ public class ProcessQueue {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
- Long offset = this.msgTreeMapTemp.lastKey();
- msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
- for (MessageExt msg : this.msgTreeMapTemp.values()) {
+ Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
+ msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
+ for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) {
msgSize.addAndGet(0 - msg.getBody().length);
}
- this.msgTreeMapTemp.clear();
+ this.consumingMsgOrderlyTreeMap.clear();
if (offset != null) {
return offset + 1;
}
@@ -281,7 +284,7 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
- this.msgTreeMapTemp.remove(msg.getQueueOffset());
+ this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
@@ -304,7 +307,7 @@ public class ProcessQueue {
Map.Entry entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
- msgTreeMapTemp.put(entry.getKey(), entry.getValue());
+ consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue());
} else {
break;
}
@@ -343,7 +346,7 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.clear();
- this.msgTreeMapTemp.clear();
+ this.consumingMsgOrderlyTreeMap.clear();
this.msgCount.set(0);
this.msgSize.set(0);
this.queueOffsetMax = 0L;
@@ -402,10 +405,10 @@ public class ProcessQueue {
info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024)));
}
- if (!this.msgTreeMapTemp.isEmpty()) {
- info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey());
- info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey());
- info.setTransactionMsgCount(this.msgTreeMapTemp.size());
+ if (!this.consumingMsgOrderlyTreeMap.isEmpty()) {
+ info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey());
+ info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey());
+ info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size());
}
info.setLocked(this.locked);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 365fca9fa1065c2fe3641bd891dd3b6869c09040..83b9ee7671cf591190a5ec41fd4f6b003754c2b1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -975,7 +975,6 @@ public class MQClientInstance {
HashMap map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
- FOR_SEG:
for (Map.Entry entry : map.entrySet()) {
Long id = entry.getKey();
brokerAddr = entry.getValue();
@@ -983,7 +982,6 @@ public class MQClientInstance {
found = true;
if (MixAll.MASTER_ID == id) {
slave = false;
- break FOR_SEG;
} else {
slave = true;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 35b905e18d21609f6ade4d6c0c10fafc34f9180f..7c169796741f9f80e4b42dd883956a80f6194e27 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (tmpmq != null) {
- mq = tmpmq;
+ MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
+ if (mqSelected != null) {
+ mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..d4f581231f0d4d25eb7a403f9200911ce90549da
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class DefaultMQPushConsumerImplTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void checkConfigTest() throws MQClientException {
+
+ //test type
+ thrown.expect(MQClientException.class);
+
+ //test message
+ thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)");
+
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group");
+
+ consumer.setConsumeThreadMin(10);
+ consumer.setConsumeThreadMax(9);
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ public ConsumeConcurrentlyStatus consumeMessage(List msgs,
+ ConsumeConcurrentlyContext context) {
+ System.out.println(" Receive New Messages: " + msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(consumer, null);
+ defaultMQPushConsumerImpl.start();
+ }
+}
diff --git a/common/pom.xml b/common/pom.xml
index 78e5ffdf7f6efe49db2088ecf02fe36a425e6073..ce16df4dd2ac7f2f53de51834052a4caaa53acb1 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index c344a7ce6d97bd0e8ba75b07158d7b9370d4276c..efb36b50a7e7ddf13a6d610d6e81bafe955158d0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -106,7 +106,9 @@ public class BrokerConfig {
private boolean disableConsumeIfConsumerReadSlowly = false;
private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
+ private boolean brokerFastFailureEnable = true;
private long waitTimeMillsInSendQueue = 200;
+ private long waitTimeMillsInPullQueue = 5 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
@@ -163,6 +165,22 @@ public class BrokerConfig {
this.consumerFallbehindThreshold = consumerFallbehindThreshold;
}
+ public boolean isBrokerFastFailureEnable() {
+ return brokerFastFailureEnable;
+ }
+
+ public void setBrokerFastFailureEnable(final boolean brokerFastFailureEnable) {
+ this.brokerFastFailureEnable = brokerFastFailureEnable;
+ }
+
+ public long getWaitTimeMillsInPullQueue() {
+ return waitTimeMillsInPullQueue;
+ }
+
+ public void setWaitTimeMillsInPullQueue(final long waitTimeMillsInPullQueue) {
+ this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
+ }
+
public boolean isDisableConsumeIfConsumerReadSlowly() {
return disableConsumeIfConsumerReadSlowly;
}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 56a5a08b550665f593796c150409b5d271269ed5..fbfbd402837055025275048745598a3f8186ec31 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
rocketmq-distribution
rocketmq-distribution ${project.version}
diff --git a/example/pom.xml b/example/pom.xml
index 7e4fd9fa0bd96c8f68f0e4de26db9cd4551f9d06..b9d97df3d9a794681ba17ef1e1aede562c352a9e 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
@@ -51,7 +51,7 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
diff --git a/filter/pom.xml b/filter/pom.xml
index a0a9e2ec9792a743a38dce939bd4ea1245fb7754..eebfa30af492d9aba38be2fba7ee3af232dbc51f 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/filtersrv/pom.xml b/filtersrv/pom.xml
index 667561a4355e77c3ed0d811153efadf42ffe22bc..b6202b2908c7b3b763b19f191be5d12a1c969269 100644
--- a/filtersrv/pom.xml
+++ b/filtersrv/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/logappender/pom.xml b/logappender/pom.xml
index d5172e6d718a4fbcfe42679a54330a8f0329436c..ff5f7e2bcedae075450061da2b3972260abe8c6d 100644
--- a/logappender/pom.xml
+++ b/logappender/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
rocketmq-logappender
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index d4c2461dcbf50c54e81d47c377959c1d3cab7702..02ed402bf2d132e0cfcd507ae7c706d710bc1aa7 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
index 760a61be02f1067c6e73ff072502eebdb3f1dbf3..0a0c6562d6f70226d643ba24174536ec2b9765b5 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java
@@ -32,7 +32,6 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.netty.NettySystemConfig;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.srvutil.ShutdownHookThread;
@@ -49,15 +48,6 @@ public class NamesrvStartup {
public static NamesrvController main0(String[] args) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
-
- if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
- NettySystemConfig.socketSndbufSize = 4096;
- }
-
- if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
- NettySystemConfig.socketRcvbufSize = 4096;
- }
-
try {
//PackageConflictDetect.detectFastjson();
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index e2864ebca013b8cbf1f310169524a860614bb111..857a70eeb3ee6e430fe54a7f705cff356485cd56 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/pom.xml b/pom.xml
index e317472c0e681d81457c03aada12cf8e7592fa11..4326b4669d3464268f6f579d130da7973a3a2b79 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
2012
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
pom
Apache RocketMQ ${project.version}
http://rocketmq.apache.org/
@@ -39,9 +39,9 @@
- https://github.com/apache/rocketmq.git
- scm:git:https://github.com/apache/rocketmq.git
- scm:git:https://github.com/apache/rocketmq.git
+ git@github.com:apache/rocketmq.git
+ scm:git:git@github.com:apache/rocketmq.git
+ scm:git:git@github.com:apache/rocketmq.git
HEAD
@@ -252,7 +252,8 @@
.travis.yml
CONTRIBUTING.md
bin/README.md
- PULL_REQUEST_TEMPLATE.md
+ .github/*
+ src/test/resources/certs/*
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 61f0286a1a07e1d0c59afce72727308a49892a8a..c788202cbb574356a7f871de24b4f4bb4a9f5842 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index 70a19a6cd8e263450b40c1310ef764d18d9869f0..b22e45f9b5ca51931d33c2ac8c9cff7fcb4ca9a2 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/store/pom.xml b/store/pom.xml
index dd035ea572d979df5a7ad689e20894cdb3ce7a2f..07feb444ae1d1127d8028b4094e56d4815a09614 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 192d0972e6440428914b9ce580848575f19d26e8..7a5647c3e0a95763a44b4c7eb1293d2e18f7e9d3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -110,6 +110,8 @@ public class DefaultMessageStore implements MessageStore {
private FileLock lock;
+ boolean shutDownNormal = false;
+
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
this.messageArrivingListener = messageArrivingListener;
@@ -265,8 +267,9 @@ public class DefaultMessageStore implements MessageStore {
this.storeCheckpoint.flush();
this.storeCheckpoint.shutdown();
- if (this.runningFlags.isWriteable()) {
+ if (this.runningFlags.isWriteable() && dispatchBehindBytes() == 0) {
this.deleteFile(StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir()));
+ shutDownNormal = true;
} else {
log.warn("the store may be wrong, so shutdown abnormally, and keep abort file.");
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
index 492ac5f26cfafe7848c06e5d08829c922813d0f5..0a43d47f8ff1ae4a240429a4782d885a4e0d9010 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java
@@ -570,6 +570,11 @@ public class MappedFile extends ReferenceResource {
log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
+ //testable
+ File getFile() {
+ return this.file;
+ }
+
@Override
public String toString() {
return this.fileName;
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index edf4c9184608b3fef74ce08ac5e4b7149452346d..9eb3b3ab0634033d5a579c3c106c6c6d235d6c83 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -367,6 +367,9 @@ public class MappedFileQueue {
} else {
break;
}
+ } else {
+ //avoid deleting files in the middle
+ break;
}
}
}
@@ -421,7 +424,7 @@ public class MappedFileQueue {
public boolean flush(final int flushLeastPages) {
boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, false);
+ MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
if (mappedFile != null) {
long tmpTimeStamp = mappedFile.getStoreTimestamp();
int offset = mappedFile.flush(flushLeastPages);
@@ -438,7 +441,7 @@ public class MappedFileQueue {
public boolean commit(final int commitLeastPages) {
boolean result = true;
- MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, false);
+ MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
if (mappedFile != null) {
int offset = mappedFile.commit(commitLeastPages);
long where = mappedFile.getFileFromOffset() + offset;
diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShuwDownTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShuwDownTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..ac85d59f1c7473c0a30c8979a34dfd9ebc930a5d
--- /dev/null
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShuwDownTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.store;
+
+import java.io.File;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.config.FlushDiskType;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.config.StorePathConfigHelper;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DefaultMessageStoreShuwDownTest {
+ private DefaultMessageStore messageStore;
+
+ @Before
+ public void init() throws Exception {
+ messageStore = spy(buildMessageStore());
+ boolean load = messageStore.load();
+ when(messageStore.dispatchBehindBytes()).thenReturn(100L);
+ assertTrue(load);
+ messageStore.start();
+ }
+
+ @Test
+ public void testDispatchBehindWhenShutDown() {
+ messageStore.shutdown();
+ assertTrue(!messageStore.shutDownNormal);
+ File file = new File(StorePathConfigHelper.getAbortFile(messageStore.getMessageStoreConfig().getStorePathRootDir()));
+ assertTrue(file.exists());
+ }
+
+ @After
+ public void destory() {
+ messageStore.destroy();
+ File file = new File(messageStore.getMessageStoreConfig().getStorePathRootDir());
+ UtilAll.deleteFile(file);
+ }
+
+ public DefaultMessageStore buildMessageStore() throws Exception {
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+ messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10);
+ messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 1024 * 10);
+ messageStoreConfig.setMaxHashSlotNum(10000);
+ messageStoreConfig.setMaxIndexNum(100 * 100);
+ messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+ return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
+ }
+
+
+}
diff --git a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
index 203dfcd565f4d5491c648f0219c99eff925f3330..92f1876b2f09a095fd319b03d9a94f180fc18d23 100644
--- a/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
@@ -20,9 +20,7 @@ package org.apache.rocketmq.store;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Arrays;
-
import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Test;
@@ -47,7 +45,7 @@ public class MappedFileQueueTest {
}
@Test
- public void test_findMappedFileByOffset() {
+ public void testFindMappedFileByOffset() {
// four-byte string.
final String fixedMsg = "abcd";
@@ -97,6 +95,28 @@ public class MappedFileQueueTest {
mappedFileQueue.destroy();
}
+ @Test
+ public void testFindMappedFileByOffset_StartOffsetIsNonZero() {
+ MappedFileQueue mappedFileQueue =
+ new MappedFileQueue("target/unit_test_store/b/", 1024, null);
+
+ //Start from a non-zero offset
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(1024);
+ assertThat(mappedFile).isNotNull();
+
+ assertThat(mappedFileQueue.findMappedFileByOffset(1025)).isEqualTo(mappedFile);
+
+ assertThat(mappedFileQueue.findMappedFileByOffset(0)).isNull();
+ assertThat(mappedFileQueue.findMappedFileByOffset(123, false)).isNull();
+ assertThat(mappedFileQueue.findMappedFileByOffset(123, true)).isEqualTo(mappedFile);
+
+ assertThat(mappedFileQueue.findMappedFileByOffset(0, false)).isNull();
+ assertThat(mappedFileQueue.findMappedFileByOffset(0, true)).isEqualTo(mappedFile);
+
+ mappedFileQueue.shutdown(1000);
+ mappedFileQueue.destroy();
+ }
+
@Test
public void testAppendMessage() {
final String fixedMsg = "0123456789abcdef";
@@ -182,6 +202,33 @@ public class MappedFileQueueTest {
mappedFileQueue.destroy();
}
+ @Test
+ public void testDeleteExpiredFileByTime() throws Exception {
+ MappedFileQueue mappedFileQueue =
+ new MappedFileQueue("target/unit_test_store/f/", 1024, null);
+
+ for (int i = 0; i < 100; i++) {
+ MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
+ assertThat(mappedFile).isNotNull();
+ byte[] bytes = new byte[512];
+ assertThat(mappedFile.appendMessage(bytes)).isTrue();
+ }
+
+ assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(50);
+ long expiredTime = 100 * 1000;
+ for (int i = 0; i < mappedFileQueue.getMappedFiles().size(); i++) {
+ MappedFile mappedFile = mappedFileQueue.getMappedFiles().get(i);
+ if (i < 5) {
+ mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2);
+ }
+ if (i > 20) {
+ mappedFile.getFile().setLastModified(System.currentTimeMillis() - expiredTime * 2);
+ }
+ }
+ mappedFileQueue.deleteExpiredFileByTime(expiredTime, 0, 0, false);
+ assertThat(mappedFileQueue.getMappedFiles().size()).isEqualTo(45);
+ }
+
@After
public void destory() {
File file = new File("target/unit_test_store");
diff --git a/test/pom.xml b/test/pom.xml
index 5d9fe8ea5b3f6135fa86514faada447916d2eeba..fc01d2c75e85aa9f4a917e0ca4be18f869bf995c 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0
diff --git a/tools/pom.xml b/tools/pom.xml
index a986bc696c148ce1479ec155defb3c01b6e19bb4..97e5e9c5d004b4d8b88c332bf485b4b9b3e4703e 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.2.0-SNAPSHOT
+ 4.3.0-SNAPSHOT
4.0.0