未验证 提交 a73c5f8f 编写于 作者: H Heng Du 提交者: GitHub

Merge branch 'develop' into RocketMQ-1120

......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-acl</artifactId>
<name>rocketmq-acl ${project.version}</name>
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.acl.common;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Map;
import java.util.SortedMap;
......@@ -124,14 +125,15 @@ public class AclUtils {
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (FileNotFoundException ignore) {
return null;
} catch (Exception e) {
throw new AclException(String.format("The file for Plain mode was not found , paths %s", path), e);
throw new AclException(e.getMessage());
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new AclException("close transport fileInputStream Exception", e);
} catch (IOException ignore) {
}
}
}
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.acl.common;
import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
......@@ -133,9 +134,20 @@ public class AclUtilsTest {
Assert.assertFalse(map.isEmpty());
}
@Test
public void getYamlDataIgnoreFileNotFoundExceptionTest() {
JSONObject yamlDataObject = AclUtils.getYamlDataObject("plain_acl.yml", JSONObject.class);
Assert.assertTrue(yamlDataObject == null);
}
@Test(expected = Exception.class)
public void getYamlDataObjectExceptionTest() {
public void getYamlDataExceptionTest() {
AclUtils.getYamlDataObject("plain_acl.yml", Map.class);
AclUtils.getYamlDataObject("src/test/resources/conf/plain_acl_format_error.yml", Map.class);
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
## suggested format
date 2015-02-01
accounts:
- name: Jai
accounts:
- accessKey: RocketMQ
secretKey: 12345678
whiteRemoteAddress: 192.168.0.*
admin: false
......@@ -13,7 +13,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -52,7 +52,7 @@ public class ClientConfig {
private int persistConsumerOffsetInterval = 1000 * 5;
private boolean unitMode = false;
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
private boolean useTLS = TlsSystemConfig.tlsEnable;
......
......@@ -41,18 +41,16 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl;
/**
* Do the same thing for the same Group, the application must be set,and
* guarantee Globally unique
* Do the same thing for the same Group, the application must be set,and guarantee Globally unique
*/
private String consumerGroup;
/**
* Long polling mode, the Consumer connection max suspend time, it is not
* recommended to modify
* Long polling mode, the Consumer connection max suspend time, it is not recommended to modify
*/
private long brokerSuspendMaxTimeMillis = 1000 * 20;
/**
* Long polling mode, the Consumer connection timeout(must greater than
* brokerSuspendMaxTimeMillis), it is not recommended to modify
* Long polling mode, the Consumer connection timeout(must greater than brokerSuspendMaxTimeMillis), it is not
* recommended to modify
*/
private long consumerTimeoutMillisWhenSuspend = 1000 * 30;
/**
......@@ -117,42 +115,74 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
defaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this, rpcHook);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPullConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPullConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(queueWithNamespace(mq));
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.minOffset(queueWithNamespace(mq));
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
return this.defaultMQPullConsumerImpl.viewMessage(offsetMsgId);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
......@@ -171,6 +201,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
return brokerSuspendMaxTimeMillis;
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public void setBrokerSuspendMaxTimeMillis(long brokerSuspendMaxTimeMillis) {
this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
}
......@@ -223,6 +257,11 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.registerTopics = withNamespace(registerTopics);
}
/**
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
*/
@Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -230,6 +269,11 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, null);
}
/**
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
*/
@Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -361,14 +405,26 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.defaultMQPullConsumerImpl.sendMessageBack(msg, delayLevel, brokerName, consumerGroup);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public OffsetStore getOffsetStore() {
return offsetStore;
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public DefaultMQPullConsumerImpl getDefaultMQPullConsumerImpl() {
return defaultMQPullConsumerImpl;
}
......
......@@ -161,7 +161,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Max consumer thread number
*/
private int consumeThreadMax = 64;
private int consumeThreadMax = 20;
/**
* Threshold for dynamic adjustment of the number of thread pool
......@@ -399,49 +399,84 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
}
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQPushConsumerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long searchOffset(MessageQueue mq, long timestamp) throws MQClientException {
return this.defaultMQPushConsumerImpl.searchOffset(queueWithNamespace(mq), timestamp);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(queueWithNamespace(mq));
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.minOffset(queueWithNamespace(mq));
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
return this.defaultMQPushConsumerImpl.viewMessage(offsetMsgId);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
return this.defaultMQPushConsumerImpl.queryMessage(withNamespace(topic), key, maxNum, begin, end);
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -510,6 +545,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.consumeThreadMin = consumeThreadMin;
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public DefaultMQPushConsumerImpl getDefaultMQPushConsumerImpl() {
return defaultMQPushConsumerImpl;
}
......@@ -582,6 +621,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
return subscription;
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public void setSubscription(Map<String, String> subscription) {
Map<String, String> subscriptionWithNamespace = new HashMap<String, String>();
for (String topic : subscription.keySet()) {
......@@ -593,6 +636,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Send message back to broker which will be re-delivered in future.
*
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
*
* @param msg Message to send back.
* @param delayLevel delay level.
* @throws RemotingException if there is any network-tier error.
......@@ -600,6 +646,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -611,6 +658,9 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* Send message back to the broker whose name is <code>brokerName</code> and the message will be re-delivered in
* future.
*
* This method will be removed or it's visibility will be changed in a certain version after April 5, 2020, so
* please do not use this method.
*
* @param msg Message to send back.
* @param delayLevel delay level.
* @param brokerName broker name.
......@@ -619,6 +669,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
* @throws InterruptedException if the thread is interrupted.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -763,10 +814,18 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.defaultMQPushConsumerImpl.resume();
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public OffsetStore getOffsetStore() {
return offsetStore;
}
/**
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*/
@Deprecated
public void setOffsetStore(OffsetStore offsetStore) {
this.offsetStore = offsetStore;
}
......
......@@ -40,7 +40,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
/**
* This class is the entry point for applications intending to send messages.
......@@ -151,9 +150,27 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* Constructor specifying producer group.
*
* @param producerGroup Producer group, see the name-sake field.
* @param rpcHook RPC hook to execute per each remoting command execution.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQProducer(final String producerGroup) {
this(null, producerGroup, null);
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
final String customizedTraceTopic) {
this.producerGroup = producerGroup;
defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
//if client open the message trace feature
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
/**
......@@ -199,13 +216,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this(null, producerGroup, null, enableMsgTrace, null);
}
/**
* Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.
*
* @param producerGroup Producer group, see the name-sake field.
* @param enableMsgTrace Switch flag instance for message trace.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.
* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default
* trace topic name.
*/
public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {
this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);
......@@ -347,9 +364,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
* </p>
*
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to
* {@link #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication
* and application developers are the one to resolve this potential issue.
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
* application developers are the one to resolve this potential issue.
*
* @param msg Message to send.
* @param sendCallback Callback to execute on sending completed, either successful or unsuccessful.
......@@ -599,6 +616,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* This method is used to send transactional messages.
*
* @param msg Transactional message to send.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
......@@ -611,20 +629,22 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Create a topic on broker.
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param key accesskey
* @param newTopic topic name
* @param queueNum topic's queue number
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum) throws MQClientException {
createTopic(key, withNamespace(newTopic), queueNum, 0);
}
/**
* Create a topic on broker.
* Create a topic on broker. This method will be removed in a certain version after April 5, 2020, so please do not
* use this method.
*
* @param key accesskey
* @param newTopic topic name
......@@ -632,6 +652,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @param topicSysFlag topic system flag
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
this.defaultMQProducerImpl.createTopic(key, withNamespace(newTopic), queueNum, topicSysFlag);
......@@ -653,10 +674,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query maximum offset of the given message queue.
*
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
* @return maximum offset of the given consume queue.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.maxOffset(queueWithNamespace(mq));
......@@ -665,10 +689,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query minimum offset of the given message queue.
*
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
* @return minimum offset of the given message queue.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public long minOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.minOffset(queueWithNamespace(mq));
......@@ -677,10 +704,13 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query earliest message store time.
*
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param mq Instance of MessageQueue
* @return earliest message store time.
* @throws MQClientException if there is any client error.
*/
@Deprecated
@Override
public long earliestMsgStoreTime(MessageQueue mq) throws MQClientException {
return this.defaultMQProducerImpl.earliestMsgStoreTime(queueWithNamespace(mq));
......@@ -689,6 +719,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given offset message ID.
*
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param offsetMsgId message id
* @return Message specified.
* @throws MQBrokerException if there is any broker error.
......@@ -696,6 +728,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@Override
public MessageExt viewMessage(
String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -705,6 +738,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message by key.
*
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param topic message topic
* @param key message key index word
* @param maxNum max message number
......@@ -714,6 +749,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws MQClientException if there is any client error.
* @throws InterruptedException if the thread is interrupted.
*/
@Deprecated
@Override
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)
throws MQClientException, InterruptedException {
......@@ -723,6 +759,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
/**
* Query message of the given message ID.
*
* This method will be removed in a certain version after April 5, 2020, so please do not use this method.
*
* @param topic Topic
* @param msgId Message ID
* @return Message specified.
......@@ -731,6 +769,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* @throws RemotingException if there is any network-tier error.
* @throws InterruptedException if the sending thread is interrupted.
*/
@Deprecated
@Override
public MessageExt viewMessage(String topic,
String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
......@@ -767,8 +806,8 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
}
/**
* Sets an Executor to be used for executing callback methods.
* If the Executor is not set, {@link NettyRemotingClient#publicExecutor} will be used.
* Sets an Executor to be used for executing callback methods. If the Executor is not set, {@link
* NettyRemotingClient#publicExecutor} will be used.
*
* @param callbackExecutor the instance of Executor
*/
......@@ -835,6 +874,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
}
@Deprecated
public DefaultMQProducerImpl getDefaultMQProducerImpl() {
return defaultMQProducerImpl;
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<artifactId>rocketmq-distribution</artifactId>
<name>rocketmq-distribution ${project.version}</name>
......@@ -69,7 +69,7 @@
</executions>
</plugin>
</plugins>
<finalName>apache-rocketmq</finalName>
<finalName>rocketmq-${project.version}</finalName>
</build>
</profile>
......@@ -104,7 +104,7 @@
</executions>
</plugin>
</plugins>
<finalName>apache-rocketmq</finalName>
<finalName>rocketmq-client-${project.version}</finalName>
</build>
</profile>
</profiles>
......
......@@ -17,18 +17,21 @@
-->
<assembly>
<id>client</id>
<includeBaseDirectory>false</includeBaseDirectory>
<includeBaseDirectory>true</includeBaseDirectory>
<formats>
<format>dir</format>
<format>tar.gz</format>
<format>zip</format>
</formats>
<fileSet>
<directory>../</directory>
<includes>
<include>README.md</include>
</includes>
</fileSet>
<fileSets>
<fileSet>
<directory>../</directory>
<includes>
<include>README.md</include>
</includes>
</fileSet>
</fileSets>
<files>
<file>
......
......@@ -17,7 +17,7 @@
-->
<assembly>
<id>all</id>
<includeBaseDirectory>false</includeBaseDirectory>
<includeBaseDirectory>true</includeBaseDirectory>
<formats>
<format>dir</format>
<format>tar.gz</format>
......
Apache RocketMQ开发者指南
--------
##### 这个开发者指南是帮忙您快速了解,并使用 Apache RocketMQ
### 1. 概念和特性
- [概念(Concept)](concept.md):介绍RocketMQ的基本概念模型。
- [特性(Features)](features.md):介绍RocketMQ实现的功能特性。
### 2. 架构设计
- [架构(Architecture)](architecture.md):介绍RocketMQ部署架构和技术架构。
- [设计(Design)](design.md):介绍RocketMQ关键机制的设计原理,主要包括消息存储、通信机制、消息过滤、负载均衡、事物消息等。
### 3. 样例
- [样例(Example)](RocketMQ_Example.md) :介绍RocketMQ的常见用法,包括基本样例、顺序消息样例、延时消息样例、批量消息样例、过滤消息样例、事物消息样例等。
### 4. 最佳实践
- [最佳实践(Best Practice)](best_practice.md):介绍RocketMQ的最佳实践,包括生产者、消费者、Broker以及NameServer的最佳实践,客户端的配置方式以及JVM和linux的最佳参数配置。
- [消息轨迹指南(Message Trace)](msg_trace/user_guide.md):介绍RocketMQ消息轨迹的使用方法。
- [权限管理(Auth Management)](acl/user_guide.md):介绍如何快速部署和使用支持权限控制特性的RocketMQ集群。
- [Dledger快速搭建(Quick Start)](dledger/quick_start.md):介绍Dledger的快速搭建方法。
- [集群部署(Cluster Deployment)](dledger/deploy_guide.md):介绍Dledger的集群部署方式。
### 5. 运维管理
- [集群部署(Operation)](operation.md):介绍单Master模式、多Master模式、多Master多slave模式等RocketMQ集群各种形式的部署方法以及运维工具mqadmin的使用方式。
### 6. API Reference(待补充)
- [DefaultMQProducer API Reference](client/java/API_Reference_DefaultMQProducer.md)
1 基本样例
--------
# 样例
-----
## 1 基本样例
在基本样例中我们提供如下的功能场景:
......@@ -953,4 +955,4 @@ public class SimplePushConsumer {
System.out.printf("Consumer startup OK%n");
}
}
```
\ No newline at end of file
```
# 权限控制
## 前言
该文档主要介绍如何快速部署和使用支持权限控制特性的RocketMQ 集群。
----
## 1.权限控制特性介绍
权限控制(ACL)主要为RocketMQ提供Topic资源级别的用户访问控制。用户在使用RocketMQ权限控制时,可以在Client客户端通过 RPCHook注入AccessKey和SecretKey签名;同时,将对应的权限控制属性(包括Topic访问权限、IP白名单和AccessKey和SecretKey签名等)设置在distribution/conf/plain_acl.yml的配置文件中。Broker端对AccessKey所拥有的权限进行校验,校验不过,抛出异常;
......
# 架构设计
## 技术架构
---
## 1 技术架构
![](image/rocketmq_architecture_1.png)
RocketMQ架构上主要分为四部分,如上图所示:
......@@ -20,7 +20,7 @@ RocketMQ架构上主要分为四部分,如上图所示:
![](image/rocketmq_architecture_2.png)
## 部署架构
## 2 部署架构
![](image/rocketmq_architecture_3.png)
......
# 最佳实践
---
## 1 生产者
### 1.1 发送消息注意事项
......
# 基本概念
## 消息模型(Message Model)
----
## 1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
## 消息生产者(Producer)
## 2 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
## 消息消费者(Consumer)
## 3 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
## 主题(Topic)
## 4 主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
##代理服务器(Broker Server)
## 5 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
## 名字服务(Name Server)
## 6 名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
## 拉取式消费(Pull Consumer)
## 7 拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
## 推动式消费(Push Consumer)
## 8 推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。
## 生产者组(Producer Group)
## 9 生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
## 消费者组(Consumer Group)
## 10 消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
## 集群消费(Clustering)
## 11 集群消费(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
## 广播消费(Broadcasting)
## 12 广播消费(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
## 普通顺序消息(Normal Ordered Message)
## 13 普通顺序消息(Normal Ordered Message)
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列收到的消息则可能是无顺序的。
## 严格顺序消息(Strictly Ordered Message)
## 14 严格顺序消息(Strictly Ordered Message)
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。
## 代理服务器(Broker Server)
## 15 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
## 消息(Message)
## 16 消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。
## 标签(Tag)
## 17 标签(Tag)
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
## 设计(design)
# 设计(design)
---
### 1 消息存储
![](image/rocketmq_design_1.png)
......
# Dledger集群搭建
---
## 前言
该文档主要介绍如何部署自动容灾切换的 RocketMQ-on-DLedger Group。
......
# Dledger快速搭建
---
### 前言
该文档主要介绍如何快速构建和部署基于 DLedger 的可以自动容灾切换的 RocketMQ 集群。
......
# 特性(features)
## 订阅与发布
----
## 1 订阅与发布
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
## 消息顺序
## 2 消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
......@@ -11,9 +12,9 @@
- 分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
## 消息过滤
##3 消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
## 消息可靠性
## 4 消息可靠性
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
1) Broker正常关闭
2) Broker异常Crash
......@@ -26,15 +27,15 @@ RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
## 至少一次
## 5 至少一次
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
## 回溯消费
## 6 回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
## 事务消息
## 7 事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
## 定时消息
## 8 定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
......@@ -46,19 +47,20 @@ broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
## 消息重试
## 9 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
## 消息重投
## 10 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:
- retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
- retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
- retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
## 流量控制
## 11 流量控制
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
......@@ -75,7 +77,7 @@ RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGro
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是降低拉取频率。
## 死信队列
## 12 死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
......
# 消息轨迹
## 前言
该文档主要介绍如何快速部署和使用支持消息轨迹特性的RocketMQ 集群。
----
## 1. 消息轨迹数据关键属性
| Producer端| Consumer端 | Broker端 |
......
## 运维管理
# 运维管理
---
### 1 集群搭建
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -51,12 +51,12 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-logappender</artifactId>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -29,7 +29,7 @@
<inceptionYear>2012</inceptionYear>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Apache RocketMQ ${project.version}</name>
<url>http://rocketmq.apache.org/</url>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -20,7 +20,7 @@
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.1-SNAPSHOT</version>
<version>4.5.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -249,11 +249,22 @@ public class MQAdminStartup {
public static RPCHook getAclRPCHook() {
String fileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
String fileName = "/conf/tools.yml";
JSONObject yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName ,
JSONObject yamlDataObject = null;
try {
yamlDataObject = AclUtils.getYamlDataObject(fileHome + fileName,
JSONObject.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
if (yamlDataObject == null) {
System.out.printf("Cannot find conf file %s, acl isn't be enabled.%n" ,fileHome + fileName);
return null;
}
if (yamlDataObject == null || yamlDataObject.isEmpty()) {
System.out.printf(" Cannot find conf file %s, acl is not be enabled.%n" ,fileHome + fileName);
if (yamlDataObject.isEmpty()) {
System.out.printf("Content of conf file %s is empty, acl isn't be enabled.%n" ,fileHome + fileName);
return null;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册