diff --git a/acl/pom.xml b/acl/pom.xml
index 4482cd1e7cc76c1f3123f3cbb820402ad87d7369..ea12ce5301baf6c76ad7cb18702918cc6052a031 100644
--- a/acl/pom.xml
+++ b/acl/pom.xml
@@ -13,7 +13,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
rocketmq-acl
rocketmq-acl ${project.version}
diff --git a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
index 1a618456f405caed137033a4d6cb1d7074550d09..ce63cbf299e3f06c8334445d4b6724c8545e95bd 100644
--- a/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
+++ b/acl/src/main/java/org/apache/rocketmq/acl/common/AclUtils.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.acl.common;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
@@ -124,14 +125,15 @@ public class AclUtils {
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
+ } catch (FileNotFoundException ignore) {
+ return null;
} catch (Exception e) {
- throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
+ throw new AclException(e.getMessage());
} finally {
if (fis != null) {
try {
fis.close();
- } catch (IOException e) {
- throw new AclException("close transport fileInputStream Exception", e);
+ } catch (IOException ignore) {
}
}
}
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
index 72bcda6bb3ac9c2dd51b6316092911cf99d060d7..12f43725a49fb9c1676cf6805b35aab5f8244f4f 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclUtilsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.acl.common;
+import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -133,9 +134,20 @@ public class AclUtilsTest {
Assert.assertFalse(map.isEmpty());
}
+ @Test
+ public void getYamlDataIgnoreFileNotFoundExceptionTest() {
+
+ JSONObject yamlDataObject = AclUtils.getYamlDataObject("plain_acl.yml", JSONObject.class);
+ Assert.assertTrue(yamlDataObject == null);
+ }
+
@Test(expected = Exception.class)
- public void getYamlDataObjectExceptionTest() {
+ public void getYamlDataExceptionTest() {
- AclUtils.getYamlDataObject("plain_acl.yml", Map.class);
+ AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_format_error.yml", Map.class);
}
+
+
+
+
}
diff --git a/acl/src/test/resources/conf/plain_acl_format_error.yml b/acl/src/test/resources/conf/plain_acl_format_error.yml
new file mode 100644
index 0000000000000000000000000000000000000000..46782c5655e68eaf81bb81bd3385e0aed68f153d
--- /dev/null
+++ b/acl/src/test/resources/conf/plain_acl_format_error.yml
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+## suggested format
+
+date 2015-02-01
+accounts:
+ - name: Jai
+accounts:
+- accessKey: RocketMQ
+ secretKey: 12345678
+ whiteRemoteAddress: 192.168.0.*
+ admin: false
+
diff --git a/broker/pom.xml b/broker/pom.xml
index 01390fd3c534a63ebb65f87636f4b7bc4913a909..39221fd799f6c43010ee7ab9ee67cc11977355a4 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -13,7 +13,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/client/pom.xml b/client/pom.xml
index 1b8abfc766678aa276b8c997fc77ed2c1b0fd83f..35908f20ac2ccec2f3c45fb61b6d74011aecc77d 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index 6a2a275ab753f89cff275d2d7c087a8dd3e6a023..9a66744c4dedf17732de5e3fb711386e3d03fec6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -52,7 +52,7 @@ public class ClientConfig {
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean unitMode = false;
private String unitName;
- private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
+ private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
private boolean useTLS = TlsSystemConfig.tlsEnable;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 32ee92357b77fea919d13b6a12810a04d6f45a0e..f3b6caaa71f23861f8bc767e5f64178cef1ad752 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -41,18 +41,16 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
/**
- * Do the same thing for the same Group, the application must be set,and
- * guarantee Globally unique
+ * Do the same thing for the same Group, the application must be set,and guarantee Globally unique
*/
private String consumerGroup;
/**
- * Long polling mode, the Consumer connection max suspend time, it is not
- * recommended to modify
+ * Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
/**
- * Long polling mode, the Consumer connection timeout(must greater than
- * brokerSuspendMaxTimeMillis), it is not recommended to modify
+ * Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
+ * recommended to modify
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
@@ -117,42 +115,74 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(queueWithNamespace(mq));
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.minOffset(queueWithNamespace(mq));
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
@@ -171,6 +201,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
return brokerSuspendMaxTimeMillis;
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
}
@@ -223,6 +257,11 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.registerTopics = withNamespace(registerTopics);
}
+ /**
+ * This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
+ * please do not use this method.
+ */
+ @Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -230,6 +269,11 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
+ /**
+ * This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
+ * please do not use this method.
+ */
+ @Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -361,14 +405,26 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public OffsetStore getOffsetStore() {
return offsetStore;
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
return defaultMQPullConsumerImpl;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index b53d92bfc6516052a25bd38b24463f28dd3323a8..44edfb68b6fe62533e0dbeec999ab54e1a2334b1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -161,7 +161,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Max consumer thread number
*/
- private int consumeThreadMax = 64;
+ private int consumeThreadMax = 20;
/**
* Threshold for dynamic adjustment of the number of thread pool
@@ -399,49 +399,84 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
}
-
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(queueWithNamespace(mq));
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.minOffset(queueWithNamespace(mq));
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPushConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -510,6 +545,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeThreadMin = consumeThreadMin;
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
return defaultMQPushConsumerImpl;
}
@@ -582,6 +621,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
return subscription;
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public void setSubscription(Map subscription) {
Map subscriptionWithNamespace = new HashMap();
for (String topic : subscription.keySet()) {
@@ -593,6 +636,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Send message back to broker which will be re-delivered in future.
*
+ * This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
+ * please do not use this method.
+ *
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
@@ -600,6 +646,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -611,6 +658,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Send message back to the broker whose name is brokerName
and the message will be re-delivered in
* future.
*
+ * This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
+ * please do not use this method.
+ *
* @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
@@ -619,6 +669,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -763,10 +814,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.resume();
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public OffsetStore getOffsetStore() {
return offsetStore;
}
+ /**
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ */
+ @Deprecated
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 54eaee9a018b0ec669a1963667081b3182c814b4..8c9717907fb2ff8f2b319f8e3c6bbdd281cebbb4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -40,7 +40,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
/**
* This class is the entry point for applications intending to send messages.
@@ -151,9 +150,27 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
+ * @param rpcHook RPC hook to execute per each remoting command execution.
+ * @param enableMsgTrace Switch flag instance for message trace.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
- public DefaultMQProducer(final String producerGroup) {
- this(null, producerGroup, null);
+ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
+ final String customizedTraceTopic) {
+ this.producerGroup = producerGroup;
+ defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
+ //if client open the message trace feature
+ if (enableMsgTrace) {
+ try {
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
+ dispatcher.setHostProducer(this.defaultMQProducerImpl);
+ traceDispatcher = dispatcher;
+ this.defaultMQProducerImpl.registerSendMessageHook(
+ new SendMessageTraceHookImpl(traceDispatcher));
+ } catch (Throwable e) {
+ log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
+ }
+ }
}
/**
@@ -199,13 +216,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this(null, producerGroup, null, enableMsgTrace, null);
}
-
/**
* Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
- * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
+ * @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
+ * trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
@@ -347,9 +364,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method returns immediately. On sending completion, sendCallback
will be executed.
*
*
- * Similar to {@link #send(Message)}, internal implementation would potentially retry up to
- * {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication
- * and application developers are the one to resolve this potential issue.
+ * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
+ * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
+ * application developers are the one to resolve this potential issue.
*
* @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
@@ -599,6 +616,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* This method is used to send transactional messages.
+ *
* @param msg Transactional message to send.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
@@ -611,20 +629,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Create a topic on broker.
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
- * Create a topic on broker.
+ * Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not
+ * use this method.
*
* @param key accesskey
* @param newTopic topic name
@@ -632,6 +652,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param topicSysFlag topic system flag
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
@@ -653,10 +674,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query maximum offset of the given message queue.
*
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
* @param mq Instance of MessageQueue
* @return maximum offset of the given consume queue.
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.maxOffset(queueWithNamespace(mq));
@@ -665,10 +689,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query minimum offset of the given message queue.
*
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
* @param mq Instance of MessageQueue
* @return minimum offset of the given message queue.
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.minOffset(queueWithNamespace(mq));
@@ -677,10 +704,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query earliest message store time.
*
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
* @param mq Instance of MessageQueue
* @return earliest message store time.
* @throws MQClientException if there is any client error.
*/
+ @Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
@@ -689,6 +719,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given offset message ID.
*
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
* @param offsetMsgId message id
* @return Message specified.
* @throws MQBrokerException if there is any broker error.
@@ -696,6 +728,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
+ @Deprecated
@Override
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -705,6 +738,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message by key.
*
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
* @param topic message topic
* @param key message key index word
* @param maxNum max message number
@@ -714,6 +749,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws MQClientException if there is any client error.
* @throws InterruptedException if the thread is interrupted.
*/
+ @Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
@@ -723,6 +759,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given message ID.
*
+ * This method will be removed in a certain version after April 5, 2020, so please do not use this method.
+ *
* @param topic Topic
* @param msgId Message ID
* @return Message specified.
@@ -731,6 +769,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
+ @Deprecated
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
@@ -767,8 +806,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
- * Sets an Executor to be used for executing callback methods.
- * If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used.
+ * Sets an Executor to be used for executing callback methods. If the Executor is not set, {@link
+ * NettyRemotingClient#publicExecutor} will be used.
*
* @param callbackExecutor the instance of Executor
*/
@@ -835,6 +874,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
}
+ @Deprecated
public DefaultMQProducerImpl getDefaultMQProducerImpl() {
return defaultMQProducerImpl;
}
diff --git a/common/pom.xml b/common/pom.xml
index c13465d119a904d79156c94fc4655e17c3077387..dbbdf5e48319c43e45f6ddea0cf66e49948fc7f8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 2296619751686e01adb76b16a9fcf401e11a63f7..32b539af9aa9bf3f0f0d18aa84f5007b66609c62 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -20,7 +20,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
rocketmq-distribution
rocketmq-distribution ${project.version}
@@ -69,7 +69,7 @@
- apache-rocketmq
+ rocketmq-${project.version}
@@ -104,7 +104,7 @@
- apache-rocketmq
+ rocketmq-client-${project.version}
diff --git a/distribution/release-client.xml b/distribution/release-client.xml
index 9f5da2552aca3f4d98f3e3a8167eb2e2364af700..f787c3338976b69fc986cdbe9018424a588519e4 100644
--- a/distribution/release-client.xml
+++ b/distribution/release-client.xml
@@ -17,18 +17,21 @@
-->
client
- false
+ true
dir
tar.gz
+ zip
-
- ../
-
- README.md
-
-
+
+
+ ../
+
+ README.md
+
+
+
diff --git a/distribution/release.xml b/distribution/release.xml
index ae9eb081875a4a2d807559685c8e1faf0359f72c..64cffc81ea6ad51c0c28e73bc9d1bb3dfa9d1a2f 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -17,7 +17,7 @@
-->
all
- false
+ true
dir
tar.gz
diff --git a/docs/cn/README.md b/docs/cn/README.md
new file mode 100644
index 0000000000000000000000000000000000000000..a47ec6452dc31d1a63c7db9c36c2e5429f42661f
--- /dev/null
+++ b/docs/cn/README.md
@@ -0,0 +1,48 @@
+Apache RocketMQ开发者指南
+--------
+
+##### 这个开发者指南是帮忙您快速了解,并使用 Apache RocketMQ
+
+### 1. 概念和特性
+
+- [概念(Concept)](concept.md):介绍RocketMQ的基本概念模型。
+
+- [特性(Features)](features.md):介绍RocketMQ实现的功能特性。
+
+
+### 2. 架构设计
+
+- [架构(Architecture)](architecture.md):介绍RocketMQ部署架构和技术架构。
+
+- [设计(Design)](design.md):介绍RocketMQ关键机制的设计原理,主要包括消息存储、通信机制、消息过滤、负载均衡、事物消息等。
+
+
+### 3. 样例
+
+- [样例(Example)](RocketMQ_Example.md) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事物消息样例等。
+
+
+### 4. 最佳实践
+- [最佳实践(Best Practice)](best_practice.md):介绍RocketMQ的最佳实践,包括生产者、消费者、Broker以及NameServer的最佳实践,客户端的配置方式以及JVM和linux的最佳参数配置。
+- [消息轨迹指南(Message Trace)](msg_trace/user_guide.md):介绍RocketMQ消息轨迹的使用方法。
+- [权限管理(Auth Management)](acl/user_guide.md):介绍如何快速部署和使用支持权限控制特性的RocketMQ集群。
+
+- [Dledger快速搭建(Quick Start)](dledger/quick_start.md):介绍Dledger的快速搭建方法。
+
+- [集群部署(Cluster Deployment)](dledger/deploy_guide.md):介绍Dledger的集群部署方式。
+
+### 5. 运维管理
+- [集群部署(Operation)](operation.md):介绍单Master模式、多Master模式、多Master多slave模式等RocketMQ集群各种形式的部署方法以及运维工具mqadmin的使用方式。
+
+
+
+### 6. API Reference(待补充)
+
+- [DefaultMQProducer API Reference](client/java/API_Reference_DefaultMQProducer.md)
+
+
+
+
+
+
+
diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md
index 885b00a3401388245e6d194bac0d79f4426e8d43..d298db84203d99eef96a9e50bc45d46b179dff8a 100644
--- a/docs/cn/RocketMQ_Example.md
+++ b/docs/cn/RocketMQ_Example.md
@@ -1,5 +1,7 @@
-1 基本样例
---------
+# 样例
+-----
+## 1 基本样例
+
在基本样例中我们提供如下的功能场景:
@@ -953,4 +955,4 @@ public class SimplePushConsumer {
System.out.printf("Consumer startup OK%n");
}
}
-```
\ No newline at end of file
+```
diff --git a/docs/cn/acl/user_guide.md b/docs/cn/acl/user_guide.md
index deeb2fa56afa2c9821a13835fa782261db234421..838ed2eaa0d854603516674390fd0fec6598821c 100644
--- a/docs/cn/acl/user_guide.md
+++ b/docs/cn/acl/user_guide.md
@@ -1,6 +1,6 @@
# 权限控制
-## 前言
-该文档主要介绍如何快速部署和使用支持权限控制特性的RocketMQ 集群。
+----
+
## 1.权限控制特性介绍
权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常;
diff --git a/docs/cn/architecture.md b/docs/cn/architecture.md
index 12493a67eaed925c53230380ead46f9a78d33f88..7b41edaea779a2c39924a59d00c4c8b693f881bf 100644
--- a/docs/cn/architecture.md
+++ b/docs/cn/architecture.md
@@ -1,6 +1,6 @@
# 架构设计
-
-## 技术架构
+---
+## 1 技术架构
![](image/rocketmq_architecture_1.png)
RocketMQ架构上主要分为四部分,如上图所示:
@@ -20,7 +20,7 @@ RocketMQ架构上主要分为四部分,如上图所示:
![](image/rocketmq_architecture_2.png)
-## 部署架构
+## 2 部署架构
![](image/rocketmq_architecture_3.png)
diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md
index ffa76b953777be638d852ba6857600387e96a1d2..731a24be267c8f3fcf38d33349c6306d900864c5 100755
--- a/docs/cn/best_practice.md
+++ b/docs/cn/best_practice.md
@@ -1,6 +1,6 @@
# 最佳实践
-
+---
## 1 生产者
### 1.1 发送消息注意事项
diff --git a/docs/cn/concept.md b/docs/cn/concept.md
index ef59050748ada02eea9bf78537d18573667cd7f1..87d691b8e6627f822d31b0315c98389ae5c602e3 100644
--- a/docs/cn/concept.md
+++ b/docs/cn/concept.md
@@ -1,37 +1,53 @@
# 基本概念
-## 消息模型(Message Model)
+----
+## 1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
-## 消息生产者(Producer)
+
+## 2 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
-## 消息消费者(Consumer)
+
+## 3 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
-## 主题(Topic)
+
+## 4 主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
-##代理服务器(Broker Server)
+
+## 5 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
-## 名字服务(Name Server)
+
+## 6 名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
-## 拉取式消费(Pull Consumer)
+
+## 7 拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
-## 推动式消费(Push Consumer)
+
+## 8 推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
-## 生产者组(Producer Group)
+
+## 9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
-## 消费者组(Consumer Group)
+
+## 10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
-## 集群消费(Clustering)
+
+## 11 集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
-## 广播消费(Broadcasting)
+
+## 12 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
-## 普通顺序消息(Normal Ordered Message)
+
+## 13 普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
-## 严格顺序消息(Strictly Ordered Message)
+
+## 14 严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
-## 代理服务器(Broker Server)
+
+## 15 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
-## 消息(Message)
+
+## 16 消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
-## 标签(Tag)
+## 17 标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
diff --git a/docs/cn/design.md b/docs/cn/design.md
index 5b11912bafe3662cacc160793bbe5c8f420efc3c..e30b466f0613d5fdf60b9d91303cb9723bd81728 100644
--- a/docs/cn/design.md
+++ b/docs/cn/design.md
@@ -1,5 +1,6 @@
-## 设计(design)
+# 设计(design)
+---
### 1 消息存储
![](image/rocketmq_design_1.png)
diff --git a/docs/cn/dledger/deploy_guide.md b/docs/cn/dledger/deploy_guide.md
index faebb96e59856b98b77dabfe59cb79740fa36dd8..c97e8dd8b423129f7f31492798d82cff54cfb667 100644
--- a/docs/cn/dledger/deploy_guide.md
+++ b/docs/cn/dledger/deploy_guide.md
@@ -1,3 +1,5 @@
+# Dledger集群搭建
+---
## 前言
该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。
diff --git a/docs/cn/dledger/quick_start.md b/docs/cn/dledger/quick_start.md
index 3d1989a52cd8a3b2d606f8934059fcdb69e87539..48540c9dae106a302ff46efd65230b41ff8da59b 100644
--- a/docs/cn/dledger/quick_start.md
+++ b/docs/cn/dledger/quick_start.md
@@ -1,3 +1,5 @@
+# Dledger快速搭建
+---
### 前言
该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。
diff --git a/docs/cn/features.md b/docs/cn/features.md
index 4bfe94421e324a4ce97cb2534a1cc8633d3f1487..1c0456b28842e9a1d281bc1d51bcb14aeed5afa8 100644
--- a/docs/cn/features.md
+++ b/docs/cn/features.md
@@ -1,7 +1,8 @@
# 特性(features)
-## 订阅与发布
+----
+## 1 订阅与发布
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
-## 消息顺序
+## 2 消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
@@ -11,9 +12,9 @@
- 分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
-## 消息过滤
+##3 消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
-## 消息可靠性
+## 4 消息可靠性
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
1) Broker正常关闭
2) Broker异常Crash
@@ -26,15 +27,15 @@ RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
-## 至少一次
+## 5 至少一次
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
-## 回溯消费
+## 6 回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
-## 事务消息
+## 7 事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
-## 定时消息
+## 8 定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
@@ -46,19 +47,20 @@ broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
-## 消息重试
+## 9 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
-## 消息重投
+## 10 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:
- retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
- retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
- retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
-## 流量控制
+
+## 11 流量控制
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
@@ -75,7 +77,7 @@ RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGro
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是降低拉取频率。
-## 死信队列
+## 12 死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
diff --git a/docs/cn/index.md b/docs/cn/index.md
deleted file mode 100644
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000
diff --git a/docs/cn/msg_trace/user_guide.md b/docs/cn/msg_trace/user_guide.md
index 0320e16e006dc2b4c5224af249cd6477f095f75d..828d8c0ce7fc0a7ed9de6db535ae483a96e6ec8a 100644
--- a/docs/cn/msg_trace/user_guide.md
+++ b/docs/cn/msg_trace/user_guide.md
@@ -1,6 +1,5 @@
# 消息轨迹
-## 前言
-该文档主要介绍如何快速部署和使用支持消息轨迹特性的RocketMQ 集群。
+----
## 1. 消息轨迹数据关键属性
| Producer端| Consumer端 | Broker端 |
diff --git a/docs/cn/operation.md b/docs/cn/operation.md
index ecd37079d038d2c066922ba3123404826e13a776..ae85a30b2320d5e0387ab31202d657b28cf966bc 100644
--- a/docs/cn/operation.md
+++ b/docs/cn/operation.md
@@ -1,4 +1,6 @@
-## 运维管理
+
+# 运维管理
+---
### 1 集群搭建
diff --git a/example/pom.xml b/example/pom.xml
index c43ff785e1ef68be1070d302a299e2799a449a30..6f02f43761570af345dbc6d2559b310a5e38c7d0 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
@@ -51,12 +51,12 @@
org.apache.rocketmq
rocketmq-openmessaging
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
org.apache.rocketmq
rocketmq-acl
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
diff --git a/filter/pom.xml b/filter/pom.xml
index 86f1bbb5d9454ce29132d94eef3c7686c5bad157..362de7cd23003d4716d43245ada01f9a023a2863 100644
--- a/filter/pom.xml
+++ b/filter/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/logappender/pom.xml b/logappender/pom.xml
index 5388be6a26de706549167abf24a7729258f6ab57..79bc90d6ad8e43158546c55ef8e99f5cd57d7257 100644
--- a/logappender/pom.xml
+++ b/logappender/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
rocketmq-logappender
diff --git a/logging/pom.xml b/logging/pom.xml
index ac435e7f111bc5b3b71cf0a42644e217163ea7c3..eb8fb937519c4b037d0a9bbbc756e08de24c98ea 100644
--- a/logging/pom.xml
+++ b/logging/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/namesrv/pom.xml b/namesrv/pom.xml
index eea7f85456f5a5d3c323c06252f6811509704feb..52aa0dd8c04622ff56bdaa6107303e27785c505c 100644
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index 4843662e4dff839f97bb8539eaedb64ebdad8809..1d3dce39363da1c8d9c88b5652cf6c52b14dc068 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/pom.xml b/pom.xml
index 7a349369c0ef5d770e869d3d30a45b4c17996667..bb83a16af855837bf41bff8735176ef8f06cf06f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
2012
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
pom
Apache RocketMQ ${project.version}
http://rocketmq.apache.org/
diff --git a/remoting/pom.xml b/remoting/pom.xml
index 84c2654e235195b0b3105fac11564f75d270f60b..4b8cceca451d09d8da4d9598ab3671e90f20a27d 100644
--- a/remoting/pom.xml
+++ b/remoting/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/srvutil/pom.xml b/srvutil/pom.xml
index e175088cb44d756978bf0c5653357baef84a119f..01531dedb86c2b2736d618ff4de89a6468a6b393 100644
--- a/srvutil/pom.xml
+++ b/srvutil/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/store/pom.xml b/store/pom.xml
index a1aaf39a419f5cce09cdd2076a19fa8d440b5035..d8d0176e1a672448dbd1cb635f018656e3b701cb 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/test/pom.xml b/test/pom.xml
index ba603c815d4ee84a11d54c6b1b6946c503ea8333..3553546cac568e0aa896a4b0049b38eff365a3bf 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -20,7 +20,7 @@
rocketmq-all
org.apache.rocketmq
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/tools/pom.xml b/tools/pom.xml
index 9d36e7093a432d226d4c9f8d623e90e09e409bc6..dd97a25f6d3ef3ff352cc18194131c468d91179f 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -19,7 +19,7 @@
org.apache.rocketmq
rocketmq-all
- 4.4.1-SNAPSHOT
+ 4.5.1-SNAPSHOT
4.0.0
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 2ca60aa860cd65dbc49fd5d63bbc6f22a051c756..da71513a30ebd5d87a5feb9018a7a1c2ee20b03e 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -249,11 +249,22 @@ public class MQAdminStartup {
public static RPCHook getAclRPCHook() {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
String fileName = "/conf/tools.yml";
- JSONObject yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName ,
+ JSONObject yamlDataObject = null;
+ try {
+ yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName,
JSONObject.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ if (yamlDataObject == null) {
+ System.out.printf("Cannot find conf file %s, acl isn't be enabled.%n" ,fileHome + fileName);
+ return null;
+ }
- if (yamlDataObject == null || yamlDataObject.isEmpty()) {
- System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName);
+ if (yamlDataObject.isEmpty()) {
+ System.out.printf("Content of conf file %s is empty, acl isn't be enabled.%n" ,fileHome + fileName);
return null;
}