未验证 提交 1cad368d 编写于 作者: H hao.z 提交者: GitHub

Merge pull request #6 from apache/develop

Develop
......@@ -21,6 +21,8 @@ It offers a variety of features:
* Various message filter mechanics such as SQL and Tag
* Docker images for isolated testing and cloud isolated clusters
* Feature-rich administrative dashboard for configuration, metrics and monitoring
* Access control list
* Message trace
----------
......
......@@ -52,6 +52,11 @@ public class PlainAccessValidator implements AccessValidator {
} else {
accessResource.setWhiteRemoteAddress(remoteAddr);
}
if (request.getExtFields() == null) {
throw new AclException("request's extFields value is null");
}
accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
......
......@@ -130,8 +130,8 @@ public class PlainPermissionLoader {
if (!ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
......
......@@ -43,6 +43,7 @@ public class PlainAccessValidatorTest {
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
......@@ -115,6 +116,22 @@ public class PlainAccessValidatorTest {
plainAccessValidator.validate(accessResource);
}
@Test(expected = AclException.class)
public void validateForAdminCommandWithOutAclRPCHook() {
RemotingCommand consumerOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
plainAccessValidator.parse(consumerOffsetAdminRequest, "192.168.0.1:9876");
RemotingCommand subscriptionGroupAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
plainAccessValidator.parse(subscriptionGroupAdminRequest, "192.168.0.1:9876");
RemotingCommand delayOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
plainAccessValidator.parse(delayOffsetAdminRequest, "192.168.0.1:9876");
RemotingCommand allTopicConfigAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
plainAccessValidator.parse(allTopicConfigAdminRequest, "192.168.0.1:9876");
}
@Test
public void validatePullMessageTest() {
PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
......
......@@ -158,10 +158,10 @@ public class PlainPermissionLoaderTest {
}
@Test(expected = AclException.class)
public void checkErrorPerm() {
public void checkErrorPermDefaultValueNotMatch() {
plainAccessResource = new PlainAccessResource();
plainAccessResource.addResourceAndPerm("topicF", Permission.SUB);
plainAccessResource.addResourceAndPerm("topicF", Permission.PUB);
plainPermissionLoader.checkPerm(plainAccessResource, SUBPlainAccessResource);
}
@Test(expected = AclException.class)
......
......@@ -100,7 +100,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
msgExt.setCommitLogOffset(
putMessageResult.getAppendMessageResult().getWroteOffset());
msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
log.info(
log.debug(
"Send check message, the offset={} restored in queueOffset={} "
+ "commitLogOffset={} "
+ "newMsgId={} realMsgId={} topic={}",
......@@ -127,7 +127,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
log.warn("The queue of topic is empty :" + topic);
return;
}
log.info("Check topic={}, queues={}", topic, msgQueues);
log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue);
......@@ -168,7 +168,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
break;
}
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult());
break;
} else {
......@@ -187,7 +187,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
continue;
}
if (msgExt.getStoreTimestamp() >= startTime) {
log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp()));
break;
}
......@@ -206,7 +206,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
}
} else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.admin;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TopicStatsTableTest {
private volatile TopicStatsTable topicStatsTable;
private static final String TEST_TOPIC = "test_topic";
private static final String TEST_BROKER = "test_broker";
private static final int QUEUE_ID = 1;
private static final long CURRENT_TIME_MILLIS = System.currentTimeMillis();
private static final long MAX_OFFSET = CURRENT_TIME_MILLIS + 100;
private static final long MIN_OFFSET = CURRENT_TIME_MILLIS - 100;
@Before
public void buildTopicStatsTable() {
HashMap<MessageQueue, TopicOffset> offsetTableMap = new HashMap<MessageQueue, TopicOffset>();
MessageQueue messageQueue = new MessageQueue(TEST_TOPIC, TEST_BROKER, QUEUE_ID);
TopicOffset topicOffset = new TopicOffset();
topicOffset.setLastUpdateTimestamp(CURRENT_TIME_MILLIS);
topicOffset.setMinOffset(MIN_OFFSET);
topicOffset.setMaxOffset(MAX_OFFSET);
offsetTableMap.put(messageQueue, topicOffset);
topicStatsTable = new TopicStatsTable();
topicStatsTable.setOffsetTable(offsetTableMap);
}
@Test
public void testGetOffsetTable() throws Exception {
validateTopicStatsTable(topicStatsTable);
}
@Test
public void testFromJson() throws Exception {
String json = RemotingSerializable.toJson(topicStatsTable, true);
TopicStatsTable fromJson = RemotingSerializable.fromJson(json, TopicStatsTable.class);
validateTopicStatsTable(fromJson);
}
private static void validateTopicStatsTable(TopicStatsTable topicStatsTable) throws Exception {
Map.Entry<MessageQueue, TopicOffset> savedTopicStatsTableMap = topicStatsTable.getOffsetTable().entrySet().iterator().next();
MessageQueue savedMessageQueue = savedTopicStatsTableMap.getKey();
TopicOffset savedTopicOffset = savedTopicStatsTableMap.getValue();
Assert.assertTrue(savedMessageQueue.getTopic().equals(TEST_TOPIC));
Assert.assertTrue(savedMessageQueue.getBrokerName().equals(TEST_BROKER));
Assert.assertTrue(savedMessageQueue.getQueueId() == QUEUE_ID);
Assert.assertTrue(savedTopicOffset.getLastUpdateTimestamp() == CURRENT_TIME_MILLIS);
Assert.assertTrue(savedTopicOffset.getMaxOffset() == MAX_OFFSET);
Assert.assertTrue(savedTopicOffset.getMinOffset() == MIN_OFFSET);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import static org.junit.Assert.*;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
public class ClusterInfoTest {
@Test
public void testFormJson() throws Exception {
ClusterInfo clusterInfo = buildClusterInfo();
byte[] data = clusterInfo.encode();
ClusterInfo json = RemotingSerializable.decode(data, ClusterInfo.class);
assertNotNull(json);
assertNotNull(json.getClusterAddrTable());
assertTrue(json.getClusterAddrTable().containsKey("DEFAULT_CLUSTER"));
assertTrue(json.getClusterAddrTable().get("DEFAULT_CLUSTER").contains("master"));
assertNotNull(json.getBrokerAddrTable());
assertTrue(json.getBrokerAddrTable().containsKey("master"));
assertEquals(json.getBrokerAddrTable().get("master").getBrokerName(), "master");
assertEquals(json.getBrokerAddrTable().get("master").getCluster(), "DEFAULT_CLUSTER");
assertEquals(json.getBrokerAddrTable().get("master").getBrokerAddrs().get(MixAll.MASTER_ID), MixAll.getLocalhostByNetworkInterface());
}
@Test
public void testRetrieveAllClusterNames() throws Exception {
ClusterInfo clusterInfo = buildClusterInfo();
byte[] data = clusterInfo.encode();
ClusterInfo json = RemotingSerializable.decode(data, ClusterInfo.class);
assertArrayEquals(new String[]{"DEFAULT_CLUSTER"}, json.retrieveAllClusterNames());
}
@Test
public void testRetrieveAllAddrByCluster() throws Exception {
ClusterInfo clusterInfo = buildClusterInfo();
byte[] data = clusterInfo.encode();
ClusterInfo json = RemotingSerializable.decode(data, ClusterInfo.class);
assertArrayEquals(new String[]{MixAll.getLocalhostByNetworkInterface()}, json.retrieveAllAddrByCluster("DEFAULT_CLUSTER"));
}
private ClusterInfo buildClusterInfo() throws Exception {
ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, BrokerData> brokerAddrTable = new HashMap<String, BrokerData>();
HashMap<String, Set<String>> clusterAddrTable = new HashMap<String, Set<String>>();
//build brokerData
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("master");
brokerData.setCluster("DEFAULT_CLUSTER");
//build brokerAddrs
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(MixAll.MASTER_ID, MixAll.getLocalhostByNetworkInterface());
brokerData.setBrokerAddrs(brokerAddrs);
brokerAddrTable.put("master", brokerData);
Set<String> brokerNames = new HashSet<String>();
brokerNames.add("master");
clusterAddrTable.put("DEFAULT_CLUSTER", brokerNames);
clusterInfo.setBrokerAddrTable(brokerAddrTable);
clusterInfo.setClusterAddrTable(clusterAddrTable);
return clusterInfo;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class CheckClientRequestBodyTest {
@Test
public void testFromJson() {
SubscriptionData subscriptionData = new SubscriptionData();
String expectedClientId = "defalutId";
String expectedGroup = "defaultGroup";
CheckClientRequestBody checkClientRequestBody = new CheckClientRequestBody();
checkClientRequestBody.setClientId(expectedClientId);
checkClientRequestBody.setGroup(expectedGroup);
checkClientRequestBody.setSubscriptionData(subscriptionData);
String json = RemotingSerializable.toJson(checkClientRequestBody, true);
CheckClientRequestBody fromJson = RemotingSerializable.fromJson(json, CheckClientRequestBody.class);
assertThat(fromJson.getClientId()).isEqualTo(expectedClientId);
assertThat(fromJson.getGroup()).isEqualTo(expectedGroup);
assertThat(fromJson.getSubscriptionData()).isEqualTo(subscriptionData);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumeStatsListTest {
@Test
public void testFromJson() {
ConsumeStats consumeStats = new ConsumeStats();
ArrayList<ConsumeStats> consumeStatsListValue = new ArrayList<ConsumeStats>();
consumeStatsListValue.add(consumeStats);
HashMap<String, List<ConsumeStats>> map = new HashMap<String, List<ConsumeStats>>();
map.put("subscriptionGroupName", consumeStatsListValue);
List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsListValue2 = new ArrayList<Map<String, List<ConsumeStats>>>();
consumeStatsListValue2.add(map);
String brokerAddr = "brokerAddr";
long totalDiff = 12352L;
ConsumeStatsList consumeStatsList = new ConsumeStatsList();
consumeStatsList.setBrokerAddr(brokerAddr);
consumeStatsList.setTotalDiff(totalDiff);
consumeStatsList.setConsumeStatsList(consumeStatsListValue2);
String toJson = RemotingSerializable.toJson(consumeStatsList, true);
ConsumeStatsList fromJson = RemotingSerializable.fromJson(toJson, ConsumeStatsList.class);
assertThat(fromJson.getBrokerAddr()).isEqualTo(brokerAddr);
assertThat(fromJson.getTotalDiff()).isEqualTo(totalDiff);
List<Map<String, List<ConsumeStats>>> fromJsonConsumeStatsList = fromJson.getConsumeStatsList();
assertThat(fromJsonConsumeStatsList).isInstanceOf(List.class);
ConsumeStats fromJsonConsumeStats = fromJsonConsumeStatsList.get(0).get("subscriptionGroupName").get(0);
assertThat(fromJsonConsumeStats).isExactlyInstanceOf(ConsumeStats.class);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import static org.assertj.core.api.Assertions.assertThat;
public class KVTableTest {
@Test
public void testFromJson() throws Exception {
HashMap<String, String> table = new HashMap<String, String>();
table.put("key1", "value1");
table.put("key2", "value2");
KVTable kvTable = new KVTable();
kvTable.setTable(table);
String json = RemotingSerializable.toJson(kvTable, true);
KVTable fromJson = RemotingSerializable.fromJson(json, KVTable.class);
assertThat(fromJson).isNotEqualTo(kvTable);
assertThat(fromJson.getTable().get("key1")).isEqualTo(kvTable.getTable().get("key1"));
assertThat(fromJson.getTable().get("key2")).isEqualTo(kvTable.getTable().get("key2"));
}
}
此差异已折叠。
......@@ -8,7 +8,9 @@ ACL客户端可以参考:**org.apache.rocketmq.example.simple**包下面的**A
## 2. 权限控制的定义与属性值
### 2.1权限定义
对RocketMQ的Topic资源访问权限控制定义主要如下表所示,分为以下四种:
对RocketMQ的Topic资源访问权限控制定义主要如下表所示,分为以下四种
| 权限 | 含义 |
| --- | --- |
| DENY | 拒绝 |
......
......@@ -5,7 +5,6 @@
RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
# 特性(features)
## 订阅与发布
消息的发布是指某个生产者向某个topic发送消息;消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
## 消息顺序
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
- 全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
- 分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
## 消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
## 消息可靠性
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
1) Broker正常关闭
2) Broker异常Crash
3) OS Crash
4) 机器掉电,但是能立即恢复供电情况
5) 机器无法开机(可能是cpu、主板、内存等关键设备损坏)
6) 磁盘设备损坏
1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。
5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写。
## 至少一次
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性。
## 回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
## 事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
## 定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
- level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。
## 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况:
- 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
- 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
## 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。如下方法可以设置消息重试策略:
- retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
- retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
- retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
## 流量控制
生产者流控,因为broker处理能力达到瓶颈;消费者流控,因为消费能力达到瓶颈。
生产者流控:
- commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。
- 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。
- broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。
- broker通过拒绝send 请求方式实现流量控制。
注意,生产者流控,不会尝试消息重投。
消费者流控:
- 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。
- 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。
- 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。
消费者流控的结果是降低拉取频率。
## 死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
此差异已折叠。
此差异已折叠。
# Basic Concept
## 1 Message Model
The RocketMQ message model is mainly composed of Producer, Broker and Consumer. The Producer is responsible for producing messages, the Consumer is responsible for consuming messages, and the Broker is responsible for storing messages.
The Broker corresponds to one server during actual deployment, and each Broker can store messages from multiple topics, and messages from each Topic can be stored in a different Broker by sharding strategy.
The Message Queue is used to store physical offsets of messages, and the Message addresses in multiple Topic are stored in multiple Message queues. The Consumer Group consists of multiple Consumer instances.
## 2 Producer
The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the business application systems to brokers. The RocketMQ provides multiple paradigms of sending: synchronous, asynchronous,sequential and one-way. Both synchronous and asynchronous methods require the Broker to return confirmation information, while one-way sending is not required.
## 3 Consumer
The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The Consumer pulls messages from brokers and feeds them into application. In perspective of user application, two types of consumers are provided:Pull Consumer and Push Consumer.
## 4 Topic
The Topic refers to a collection of one kind message. Each topic contains several messages and one message can only belong to one topic. The Topic is the basic unit of RocketMQ for message subscription.
## 5 Broker Server
As the role of the transfer station, the Broker Server stores, and forwards messages. In the RocketMQ system, the Broker Server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores message related meta data, including consumer groups, consuming progress, topics and queues info.
## 6 Name Server
The Name Server serves as the provider of routing service. The Producer or Consumer can find the list of Broker IP addresses corresponding each topic through name server. Multiple Name Servers can be deployed as a cluster, but are independent of each other and do not exchange information.
## 7 Pull Consumer
A type of Consumer. Applications are usually pulls messages from brokers by actively calling the Consumer's pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once batches of messages are pulled, user application initiates consuming process.
## 8 Push Consumer
A type of Consumer. Under this mode, after the Broker receives the data, it will actively push it to the consumer, which is generally of high real-time performance.
## 9 Producer Group
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the Broker Server will contacts other Producer in the same Producer group to commit or rollback the transactional message.
## 10 Consumer Group
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The Consumer Group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
RocketMQ supports two type of consumption mode:Clustering and Broadcasting.
## 11 Consumption Mode - Clustering
Under the Clustering mode, all messages from one topic will be delivered to all consumer instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.
## 12 Consumption Mode - Broadcasting
Under the Broadcasting mode, each Consumer instance of the same Consumer Group receives every message that published to the corresponding topic.
## 13 Normal Ordered Message
Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, while the messages received from different message queues may be non-sequential.
## 14 Strictly Ordered Message
Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential, as the order they were stored.
## 15 Message
The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to a topic.
Each Message in RocketMQ has a unique Message ID and can carry a key, generally used to store business-related value. The system provides the function to query messages by Message ID and Key.
## 16 Tag
Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The Tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The Consumer can realize different "sub-topic" by using Tag, so as to achieve better expansibility.
## Client Configuration
Relative to RocketMQ's Broker cluster, producers and consumers are client. In this section, it mainly describes the common behavior configuration of producers and consumers.
### Client Addressing mode
```RocketMQ``` can let client find the ```Name Server```, and then find the ```Broker```by the ```Name Server```. Followings show a variety of configurations, and priority level from highly to lower, the highly priority configurations can override the lower priority configurations.
- Specified ```Name Server``` address in the code, and multiple ```Name Server``` addresses are separated by semicolons
```java
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
```
- Specified ```Name Server``` address in the Java setup parameters
```text
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
```
- Specified ```Name Server``` address in the envionment variables
```text
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
```
- HTTP static server addressing(default)
After client started,it will access a http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
```text
192.168.0.1:9876;192.168.0.2:9876
```
By default, the client accesses the HTTP server every 2 minutes, and update the local Name Server address.The URL is hardcoded in the code, you can change the target server by updating ```/etc/hosts``` file, such as add following configuration at the ```/etc/hosts```:
```text
10.232.22.67 jmenv.taobao.net
```
HTTP static server addressing is recommended, because it is simple client deployment, and the Name Server cluster can be upgraded hot.
### Client Configuration
```DefaultMQProducer```、```TransactionMQProducer```、```DefaultMQPushConsumer```、```DefaultMQPullConsumer``` all extends the ```ClientConfig``` Class,```ClientConfig``` as the client common configuration class。Client configuration style like getXXX、setXXX, each of the parameters can config by spring and also config their in the code. Such as the ```namesrvAddr``` parameter: ```producer.setNamesrvAddr("192.168.0.1:9876")```, same with the other parameters.
#### Client Common Configuration
| Pamater Name | Default Value | Description |
| ----------------------------- | ------- | ------------------------------------------------------------ |
| namesrvAddr | | Name Server address list, multiple NameServer addresses are separated by semicolons |
| clientIP | local IP | Client local ip address, some machines will fail to recognize the client IP address, which needs to be enforced in the code |
| instanceName | DEFAULT | Name of the client instance, Multiple producers and consumers created by the client actually share one internal instance (this instance contains network connection, thread resources, etc.). |
| clientCallbackExecutorThreads | 4 | Number of communication layer asynchronous callback threads |
| pollNameServerInteval | 30000 | Polling the Name Server interval in milliseconds |
| heartbeatBrokerInterval | 30000 | The heartbeat interval, in milliseconds, is sent to the Broker |
| persistConsumerOffsetInterval | 5000 | The persistent Consumer consumes the progress interval in milliseconds |
#### Producer Configuration
| Pamater Name | Default Value | Description |
| -------------------------------- | ---------------- | ------------------------------------------------------------ |
| producerGroup | DEFAULT_PRODUCER | The name of the Producer group. If multiple producers belong to one application and send the same message, they should be grouped into the same group |
| createTopicKey | TBW102 | When a message is sent, topics that do not exist on the server are automatically created and a Key is specified that can be used to configure the default route to the topic where the message is sent.|
| defaultTopicQueueNums | 4 | The number of default queue when sending messages and auto created topic which not exists the server|
| sendMsgTimeout | 10000 | Timeout time of sending message in milliseconds |
| compressMsgBodyOverHowmuch | 4096 | The message Body begins to compress beyond the size(the Consumer gets the message automatically unzipped.), unit of byte|
| retryAnotherBrokerWhenNotStoreOK | FALSE | If send message and return sendResult but sendStatus!=SEND_OK, Whether to resend |
| retryTimesWhenSendFailed | 2 | If send message failed, maximum number of retries, this parameter only works for synchronous send mode|
| maxMessageSize | 4MB | Client limit message size, over it may error. Server also limit so need to work with server |
| transactionCheckListener | | The transaction message looks back to the listener, if you want send transaction message, you must setup this
| checkThreadPoolMinSize | 1 | Minimum of thread in thread pool when Broker look back Producer transaction status |
| checkThreadPoolMaxSize | 1 | Maximum of thread in thread pool when Broker look back Producer transaction status |
| checkRequestHoldMax | 2000 | Producer local buffer request queue size when Broker look back Producer transaction status |
| RPCHook | null | This parameter is passed in when the Producer is creating, including the pre-processing before the message sending and the processing after the message response. The user can do some security control or other operations in the first interface.|
#### PushConsumer Configuration
| Pamater Name | Default Value | Description |
| ---------------------------- | ----------------------------- | ------------------------------------------------------------ |
| consumerGroup | DEFAULT_CONSUMER | Consumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together |
| messageModel | CLUSTERING | Message support two mode: cluster consumption and broadcast consumption |
| consumeFromWhere | CONSUME_FROM_LAST_OFFSET | After Consumer started, default consumption from last location, it include two situation: One is last consumption location is not expired, and consumption start at last location; The other is last location expired, start consumption at current queue's first message |
| consumeTimestamp | Half an hour ago | Only consumeFromWhere=CONSUME_FROM_TIMESTAMP, this can work |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Implements strategy of Rebalance algorithms |
| subscription | | subscription relation |
| messageListener | | message listener |
| offsetStore | | Consumption progress store |
| consumeThreadMin | 10 | Minimum of thread in consumption thread pool |
| consumeThreadMax | 20 | Maximum of thread in consumption thread pool |
| | | |
| consumeConcurrentlyMaxSpan | 2000 | Maximum span allowed for single queue parallel consumption |
| pullThresholdForQueue | 1000 | Pull message local queue cache maximum number of messages |
| pullInterval | 0 | Pull message interval, because long polling it is 0, but for flow control, you can set value which greater than 0 in milliseconds |
| consumeMessageBatchMaxSize | 1 | Batch consume message |
| pullBatchSize | 32 | Batch pull message |
#### PullConsumer Configuration
| Pamater Name | Default Value | Description |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
| consumerGroup | DEFAULT_CONSUMER | Consumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together |
| brokerSuspendMaxTimeMillis | 20000 | Long polling,Consumer pull message request suspended for the longest time in the Broker in milliseconds |
| consumerTimeoutMillisWhenSuspend | 30000 | Long polling,Consumer pull message request suspend in the Broker over this time value, client think timeout. Unit is milliseconds |
| consumerPullTimeoutMillis | 10000 | Not long polling, timeout time of pull message in milliseconds |
| messageModel | BROADCASTING | Message support two mode: cluster consumption and broadcast consumption |
| messageQueueListener | | Listening changing of queue |
| offsetStore | | Consumption schedule store |
| registerTopics | | Collection of registered topics |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Implements strategy about Rebalance algorithm |
#### Message Data Structure
| Field Name | Default Value | Description |
| -------------- | ------ | ------------------------------------------------------------ |
| Topic | null | Required, the name of the topic to which the message belongs |
| Body | null | Required, message body |
| Tags | null | Optional, message tag, convenient for server filtering. Currently only one tag per message is supported |
| Keys | null | Optional, represent this message's business keys, server create hash indexes based keys. After setting, you can find message by ```Topics```、```Keys``` in Console system. Because of hash indexes, please make key as unique as possible, such as order number, goods Id and so on.|
| Flag | 0 | Optional, it is entirely up to the application, and RocketMQ does not intervene |
| DelayTimeLevel | 0 | Optional, message delay level, 0 represent no delay, greater tan 0 can consume |
| WaitStoreMsgOK | TRUE | Optional, indicates whether the message is not answered until the server is down. |
# ** The system configuration** #
This section focuses on the configuration of the system (JVM/OS)
## **1 JVM Options** ##
The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configurations looks like this:
-server -Xms8g -Xmx8g -Xmn4g
If you don’t care about the boot time of RocketMQ broker, pre-touch the Java heap to make sure that every page will be allocated during JVM initialization is a better choice. Those who don’t care about the boot time can enable it:
-XX:+AlwaysPreTouch
Disable biased locking maybe reduce JVM pauses:
-XX:-UseBiasedLocking
As for garbage collection, G1 collector with JDK 1.8 is recommended:
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
These GC options looks a little aggressive, but it’s proved to have good performance in our production environment
Don’t set a too small value for -XX:MaxGCPauseMillis, otherwise JVM will use a small young generation to achieve this goal which will cause very frequent minor GC.So use rolling GC log file is recommended:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m
If write GC file will increase latency of broker, consider redirect GC log file to a memory file system:
-Xloggc:/dev/shm/mq_gc_%p.log123
## 2 Linux Kernel Parameters ##
There is a os.sh script that lists a lot of kernel parameters in folder bin which can be used for production use with minor changes. Below parameters need attention, and more details please refer to documentation for /proc/sys/vm/*.
- **vm.extra_free_kbytes**, tells the VM to keep extra free memory between the threshold where background reclaim (kswapd) kicks in, and the threshold where direct reclaim (by allocating processes) kicks in. RocketMQ uses this parameter to avoid high latency in memory allocation. (It is specific to the kernel version)
- **vm.min_free_kbytes**, if you set this to lower than 1024KB, your system will become subtly broken, and prone to deadlock under high loads.
- **vm.max_map_count**, limits the maximum number of memory map areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so set a bigger value for this parameter is recommended.
- **vm.swappiness**, define how aggressive the kernel will swap memory pages. Higher values will increase agressiveness, lower values decrease the amount of swap. 10 is recommended for this value to avoid swap latency.
- **File descriptor limits**, RocketMQ needs open file descriptors for files(CommitLog and ConsumeQueue) and network connections. We recommend set 655350 for file descriptors.
- **Disk scheduler**, the deadline I/O scheduler is recommended for RocketMQ, which attempts to provide a guaranteed latency for requests.
# Deployment Architectures and Setup Steps
## Cluster Setup
### 1 Single Master mode
This is the simplest, but also the riskiest, mode that makes the entire service unavailable once the broker restarts or goes down. Production environments are not recommended, but can be used for local testing and development. Here are the steps to build.
**1)Start NameServer**
```shell
### Start Name Server first
$ nohup sh mqnamesrv &
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
We can see 'The Name Server boot success.. ' in namesrv.log that indicates the NameServer has been started successfully.
**2)Start Broker**
```shell
### Also start broker first
$ nohup sh bin/mqbroker -n localhost:9876 &
### Then verify that the broker is started successfully, for example, the IP of broker is 192.168.1.2 and the name is broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log
The broker[broker-a,192.169.1.2:10911] boot success...
```
We can see 'The broker[brokerName,ip:port] boot success..' in Broker.log that indicates the broker has been started successfully.
### 2 Multiple Master mode
Multiple master mode means a mode with all master nodes(such as 2 or 3 master nodes) and no slave node. The advantages and disadvantages of this mode are as follows:
- Advantages:
1. Simple configuration.
2. Outage or restart(for maintenance) of one master node has no impact on the application.
3. When the disk is configured as RAID10, messages are not lost because the RAID10 disk is very reliable, even if the machine is not recoverable (In the case of asynchronous flush disk mode of the message, a small number of messages are lost; If the brush mode of a message is synchronous, no message will be lost).
4. In this mode, the performance is the highest.
- Disadvantages:
1. During a single machine outage, messages that are not consumed on this machine are not subscribed to until the machine recovers, and message real-time is affected.
The starting steps for multiple master mode are as follows:
**1)Start NameServer**
```shell
### Start Name Server first
$ nohup sh mqnamesrv &
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2)Start the Broker cluster**
```shell
### For example, starting the first Master on machine A, assuming that the configured NameServer IP is: 192.168.1.1.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties &
### Then starting the second Master on machine B, assuming that the configured NameServer IP is: 192.168.1.1.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties &
...
```
The boot command shown above is used in the case of a single NameServer.For clusters of multiple NameServer, the address list after the -n argument in the broker boot command is separated by semicolons, for example, 192.168.1.1: 9876;192.161.2: 9876.
### 3 Multiple Master And Multiple Slave Mode-Asynchronous replication
Each master node configures more thran one slave nodes, with multiple pairs of master-slave.HA uses asynchronous replication, with a short message delay (millisecond) between master node and slave node.The advantages and disadvantages of this mode are as follows:
- Advantages:
1. Even if the disk is corrupted, very few messages will be lost and the real-time performance of the message will not be affected.
2. At the same time, when master node is down, consumers can still consume messages from slave node, and the process is transparent to the application itself and does not require human intervention.
3. Performance is almost as high as multiple master mode.
- Disadvantages:
1. A small number of messages will be lost when master node is down and the disk is corrupted.
The starting steps for multiple master and multiple slave mode are as follows:
**1)Start NameServer**
```shell
### Start Name Server first
$ nohup sh mqnamesrv &
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2)Start the Broker cluster**
```shell
### For example, starting the first Master on machine A, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties &
### Then starting the second Master on machine B, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties &
### Then starting the first Slave on machine C, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties &
### Last starting the second Slave on machine D, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties &
```
The above shows a startup command for 2M-2S-Async mode, similar to other nM-nS-Async modes.
### 4 Multiple Master And Multiple Slave Mode-Synchronous dual write
In this mode, multiple slave node are configured for each master node and there are multiple pairs of Master-Slave.HA uses synchronous double-write, that is, the success response will be returned to the application only when the message is successfully written into the master node and replicated to more than one slave node.
The advantages and disadvantages of this model are as follows:
- Advantages:
1. Neither the data nor the service has a single point of failure.
2. In the case of master node shutdown, the message is also undelayed.
3. Service availability and data availability are very high;
- Disadvantages:
1. The performance in this mode is slightly lower than in asynchronous replication mode (about 10% lower).
2. The RT sending a single message is slightly higher, and the current version, the slave node cannot automatically switch to the master after the master node is down.
The starting steps are as follows:
**1)Start NameServer**
```shell
### Start Name Server first
$ nohup sh mqnamesrv &
### Then verify that the Name Server starts successfully
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
```
**2)Start the Broker cluster**
```shell
### For example, starting the first Master on machine A, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
### Then starting the second Master on machine B, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
### Then starting the first Slave on machine C, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
### Last starting the second Slave on machine D, assuming that the configured NameServer IP is: 192.168.1.1 and port is 9876.
$ nohup sh mqbroker -n 192.168.1.1:9876 -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
```
The above Master and Slave are paired by specifying the same config named "brokerName", the "brokerId" of the master node must be 0, and the "brokerId" of the slave node must be greater than 0.
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册