未验证 提交 8fa47111 编写于 作者: R radish 提交者: GitHub

Merge pull request #3 from apache/develop

Develop
...@@ -52,6 +52,11 @@ public class PlainAccessValidator implements AccessValidator { ...@@ -52,6 +52,11 @@ public class PlainAccessValidator implements AccessValidator {
} else { } else {
accessResource.setWhiteRemoteAddress(remoteAddr); accessResource.setWhiteRemoteAddress(remoteAddr);
} }
if (request.getExtFields() == null) {
throw new AclException("request's extFields value is null");
}
accessResource.setRequestCode(request.getCode()); accessResource.setRequestCode(request.getCode());
accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY)); accessResource.setAccessKey(request.getExtFields().get(SessionCredentials.ACCESS_KEY));
accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE)); accessResource.setSignature(request.getExtFields().get(SessionCredentials.SIGNATURE));
......
...@@ -43,6 +43,7 @@ public class PlainAccessValidatorTest { ...@@ -43,6 +43,7 @@ public class PlainAccessValidatorTest {
@Before @Before
public void init() { public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources"); System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
plainAccessValidator = new PlainAccessValidator(); plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials(); sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ"); sessionCredentials.setAccessKey("RocketMQ");
...@@ -115,6 +116,22 @@ public class PlainAccessValidatorTest { ...@@ -115,6 +116,22 @@ public class PlainAccessValidatorTest {
plainAccessValidator.validate(accessResource); plainAccessValidator.validate(accessResource);
} }
@Test(expected = AclException.class)
public void validateForAdminCommandWithOutAclRPCHook() {
RemotingCommand consumerOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
plainAccessValidator.parse(consumerOffsetAdminRequest, "192.168.0.1:9876");
RemotingCommand subscriptionGroupAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
plainAccessValidator.parse(subscriptionGroupAdminRequest, "192.168.0.1:9876");
RemotingCommand delayOffsetAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
plainAccessValidator.parse(delayOffsetAdminRequest, "192.168.0.1:9876");
RemotingCommand allTopicConfigAdminRequest = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
plainAccessValidator.parse(allTopicConfigAdminRequest, "192.168.0.1:9876");
}
@Test @Test
public void validatePullMessageTest() { public void validatePullMessageTest() {
PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader(); PullMessageRequestHeader pullMessageRequestHeader=new PullMessageRequestHeader();
......
...@@ -100,7 +100,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -100,7 +100,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
msgExt.setCommitLogOffset( msgExt.setCommitLogOffset(
putMessageResult.getAppendMessageResult().getWroteOffset()); putMessageResult.getAppendMessageResult().getWroteOffset());
msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
log.info( log.debug(
"Send check message, the offset={} restored in queueOffset={} " "Send check message, the offset={} restored in queueOffset={} "
+ "commitLogOffset={} " + "commitLogOffset={} "
+ "newMsgId={} realMsgId={} topic={}", + "newMsgId={} realMsgId={} topic={}",
...@@ -127,7 +127,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -127,7 +127,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
log.warn("The queue of topic is empty :" + topic); log.warn("The queue of topic is empty :" + topic);
return; return;
} }
log.info("Check topic={}, queues={}", topic, msgQueues); log.debug("Check topic={}, queues={}", topic, msgQueues);
for (MessageQueue messageQueue : msgQueues) { for (MessageQueue messageQueue : msgQueues) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
MessageQueue opQueue = getOpQueue(messageQueue); MessageQueue opQueue = getOpQueue(messageQueue);
...@@ -168,7 +168,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -168,7 +168,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
break; break;
} }
if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
messageQueue, getMessageNullCount, getResult.getPullResult()); messageQueue, getMessageNullCount, getResult.getPullResult());
break; break;
} else { } else {
...@@ -187,7 +187,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -187,7 +187,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
continue; continue;
} }
if (msgExt.getStoreTimestamp() >= startTime) { if (msgExt.getStoreTimestamp() >= startTime) {
log.info("Fresh stored. the miss offset={}, check it later, store={}", i, log.debug("Fresh stored. the miss offset={}, check it later, store={}", i,
new Date(msgExt.getStoreTimestamp())); new Date(msgExt.getStoreTimestamp()));
break; break;
} }
...@@ -206,7 +206,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -206,7 +206,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
} }
} else { } else {
if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
checkImmunityTime, new Date(msgExt.getBornTimestamp())); checkImmunityTime, new Date(msgExt.getBornTimestamp()));
break; break;
} }
......
...@@ -236,6 +236,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -236,6 +236,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
break; break;
case RUNNING: case RUNNING:
this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup()); this.mQClientFactory.unregisterProducer(this.defaultMQProducer.getProducerGroup());
this.defaultAsyncSenderExecutor.shutdown();
if (shutdownFactory) { if (shutdownFactory) {
this.mQClientFactory.shutdown(); this.mQClientFactory.shutdown();
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConsumeMessageConcurrentlyServiceTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
}
@Test
public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();
Thread.sleep(1000);
org.apache.rocketmq.common.protocol.body.ConsumeStatus stats = normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic);
ConsumerStatsManager mgr = normalServie.getConsumerStatsManager();
Field statItmeSetField = mgr.getClass().getDeclaredField("topicAndGroupConsumeOKTPS");
statItmeSetField.setAccessible(true);
StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
assertThat(item.getValue().get()).isGreaterThan(0L);
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
@After
public void terminate() {
pushConsumer.shutdown();
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
ProcessQueue processQueue = new ProcessQueue();
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
pullRequest.setProcessQueue(processQueue);
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
}
...@@ -79,10 +79,10 @@ public class MomentStatsItemSet { ...@@ -79,10 +79,10 @@ public class MomentStatsItemSet {
if (null == statsItem) { if (null == statsItem) {
statsItem = statsItem =
new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem); MomentStatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null == prev) {
if (null != prev) {
statsItem = prev;
// statsItem.init(); // statsItem.init();
} }
} }
......
...@@ -162,10 +162,10 @@ public class StatsItemSet { ...@@ -162,10 +162,10 @@ public class StatsItemSet {
StatsItem statsItem = this.statsItemTable.get(statsKey); StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) { if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
StatsItem prev = this.statsItemTable.put(statsKey, statsItem); StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null == prev) {
if (null != prev) {
statsItem = prev;
// statsItem.init(); // statsItem.init();
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.admin;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class TopicStatsTableTest {
private volatile TopicStatsTable topicStatsTable;
private static final String TEST_TOPIC = "test_topic";
private static final String TEST_BROKER = "test_broker";
private static final int QUEUE_ID = 1;
private static final long CURRENT_TIME_MILLIS = System.currentTimeMillis();
private static final long MAX_OFFSET = CURRENT_TIME_MILLIS + 100;
private static final long MIN_OFFSET = CURRENT_TIME_MILLIS - 100;
@Before
public void buildTopicStatsTable() {
HashMap<MessageQueue, TopicOffset> offsetTableMap = new HashMap<MessageQueue, TopicOffset>();
MessageQueue messageQueue = new MessageQueue(TEST_TOPIC, TEST_BROKER, QUEUE_ID);
TopicOffset topicOffset = new TopicOffset();
topicOffset.setLastUpdateTimestamp(CURRENT_TIME_MILLIS);
topicOffset.setMinOffset(MIN_OFFSET);
topicOffset.setMaxOffset(MAX_OFFSET);
offsetTableMap.put(messageQueue, topicOffset);
topicStatsTable = new TopicStatsTable();
topicStatsTable.setOffsetTable(offsetTableMap);
}
@Test
public void testGetOffsetTable() throws Exception {
validateTopicStatsTable(topicStatsTable);
}
@Test
public void testFromJson() throws Exception {
String json = RemotingSerializable.toJson(topicStatsTable, true);
TopicStatsTable fromJson = RemotingSerializable.fromJson(json, TopicStatsTable.class);
validateTopicStatsTable(fromJson);
}
private static void validateTopicStatsTable(TopicStatsTable topicStatsTable) throws Exception {
Map.Entry<MessageQueue, TopicOffset> savedTopicStatsTableMap = topicStatsTable.getOffsetTable().entrySet().iterator().next();
MessageQueue savedMessageQueue = savedTopicStatsTableMap.getKey();
TopicOffset savedTopicOffset = savedTopicStatsTableMap.getValue();
Assert.assertTrue(savedMessageQueue.getTopic().equals(TEST_TOPIC));
Assert.assertTrue(savedMessageQueue.getBrokerName().equals(TEST_BROKER));
Assert.assertTrue(savedMessageQueue.getQueueId() == QUEUE_ID);
Assert.assertTrue(savedTopicOffset.getLastUpdateTimestamp() == CURRENT_TIME_MILLIS);
Assert.assertTrue(savedTopicOffset.getMaxOffset() == MAX_OFFSET);
Assert.assertTrue(savedTopicOffset.getMinOffset() == MIN_OFFSET);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import static org.junit.Assert.*;
import org.junit.Test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
public class ClusterInfoTest {
@Test
public void testFormJson() throws Exception {
ClusterInfo clusterInfo = buildClusterInfo();
byte[] data = clusterInfo.encode();
ClusterInfo json = RemotingSerializable.decode(data, ClusterInfo.class);
assertNotNull(json);
assertNotNull(json.getClusterAddrTable());
assertTrue(json.getClusterAddrTable().containsKey("DEFAULT_CLUSTER"));
assertTrue(json.getClusterAddrTable().get("DEFAULT_CLUSTER").contains("master"));
assertNotNull(json.getBrokerAddrTable());
assertTrue(json.getBrokerAddrTable().containsKey("master"));
assertEquals(json.getBrokerAddrTable().get("master").getBrokerName(), "master");
assertEquals(json.getBrokerAddrTable().get("master").getCluster(), "DEFAULT_CLUSTER");
assertEquals(json.getBrokerAddrTable().get("master").getBrokerAddrs().get(MixAll.MASTER_ID), MixAll.getLocalhostByNetworkInterface());
}
@Test
public void testRetrieveAllClusterNames() throws Exception {
ClusterInfo clusterInfo = buildClusterInfo();
byte[] data = clusterInfo.encode();
ClusterInfo json = RemotingSerializable.decode(data, ClusterInfo.class);
assertArrayEquals(new String[]{"DEFAULT_CLUSTER"}, json.retrieveAllClusterNames());
}
@Test
public void testRetrieveAllAddrByCluster() throws Exception {
ClusterInfo clusterInfo = buildClusterInfo();
byte[] data = clusterInfo.encode();
ClusterInfo json = RemotingSerializable.decode(data, ClusterInfo.class);
assertArrayEquals(new String[]{MixAll.getLocalhostByNetworkInterface()}, json.retrieveAllAddrByCluster("DEFAULT_CLUSTER"));
}
private ClusterInfo buildClusterInfo() throws Exception {
ClusterInfo clusterInfo = new ClusterInfo();
HashMap<String, BrokerData> brokerAddrTable = new HashMap<String, BrokerData>();
HashMap<String, Set<String>> clusterAddrTable = new HashMap<String, Set<String>>();
//build brokerData
BrokerData brokerData = new BrokerData();
brokerData.setBrokerName("master");
brokerData.setCluster("DEFAULT_CLUSTER");
//build brokerAddrs
HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
brokerAddrs.put(MixAll.MASTER_ID, MixAll.getLocalhostByNetworkInterface());
brokerData.setBrokerAddrs(brokerAddrs);
brokerAddrTable.put("master", brokerData);
Set<String> brokerNames = new HashSet<String>();
brokerNames.add("master");
clusterAddrTable.put("DEFAULT_CLUSTER", brokerNames);
clusterInfo.setBrokerAddrTable(brokerAddrTable);
clusterInfo.setClusterAddrTable(clusterAddrTable);
return clusterInfo;
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class CheckClientRequestBodyTest {
@Test
public void testFromJson() {
SubscriptionData subscriptionData = new SubscriptionData();
String expectedClientId = "defalutId";
String expectedGroup = "defaultGroup";
CheckClientRequestBody checkClientRequestBody = new CheckClientRequestBody();
checkClientRequestBody.setClientId(expectedClientId);
checkClientRequestBody.setGroup(expectedGroup);
checkClientRequestBody.setSubscriptionData(subscriptionData);
String json = RemotingSerializable.toJson(checkClientRequestBody, true);
CheckClientRequestBody fromJson = RemotingSerializable.fromJson(json, CheckClientRequestBody.class);
assertThat(fromJson.getClientId()).isEqualTo(expectedClientId);
assertThat(fromJson.getGroup()).isEqualTo(expectedGroup);
assertThat(fromJson.getSubscriptionData()).isEqualTo(subscriptionData);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumeStatsListTest {
@Test
public void testFromJson() {
ConsumeStats consumeStats = new ConsumeStats();
ArrayList<ConsumeStats> consumeStatsListValue = new ArrayList<ConsumeStats>();
consumeStatsListValue.add(consumeStats);
HashMap<String, List<ConsumeStats>> map = new HashMap<String, List<ConsumeStats>>();
map.put("subscriptionGroupName", consumeStatsListValue);
List<Map<String/*subscriptionGroupName*/, List<ConsumeStats>>> consumeStatsListValue2 = new ArrayList<Map<String, List<ConsumeStats>>>();
consumeStatsListValue2.add(map);
String brokerAddr = "brokerAddr";
long totalDiff = 12352L;
ConsumeStatsList consumeStatsList = new ConsumeStatsList();
consumeStatsList.setBrokerAddr(brokerAddr);
consumeStatsList.setTotalDiff(totalDiff);
consumeStatsList.setConsumeStatsList(consumeStatsListValue2);
String toJson = RemotingSerializable.toJson(consumeStatsList, true);
ConsumeStatsList fromJson = RemotingSerializable.fromJson(toJson, ConsumeStatsList.class);
assertThat(fromJson.getBrokerAddr()).isEqualTo(brokerAddr);
assertThat(fromJson.getTotalDiff()).isEqualTo(totalDiff);
List<Map<String, List<ConsumeStats>>> fromJsonConsumeStatsList = fromJson.getConsumeStatsList();
assertThat(fromJsonConsumeStatsList).isInstanceOf(List.class);
ConsumeStats fromJsonConsumeStats = fromJsonConsumeStatsList.get(0).get("subscriptionGroupName").get(0);
assertThat(fromJsonConsumeStats).isExactlyInstanceOf(ConsumeStats.class);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumerConnectionTest {
@Test
public void testFromJson() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connections = new HashSet<Connection>();
Connection conn = new Connection();
connections.add(conn);
ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionTable.put("topicA", subscriptionData);
ConsumeType consumeType = ConsumeType.CONSUME_ACTIVELY;
MessageModel messageModel = MessageModel.CLUSTERING;
ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(subscriptionTable);
consumerConnection.setConsumeType(consumeType);
consumerConnection.setMessageModel(messageModel);
consumerConnection.setConsumeFromWhere(consumeFromWhere);
String json = RemotingSerializable.toJson(consumerConnection, true);
ConsumerConnection fromJson = RemotingSerializable.fromJson(json, ConsumerConnection.class);
assertThat(fromJson.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
assertThat(fromJson.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
HashSet<Connection> connectionSet = fromJson.getConnectionSet();
assertThat(connectionSet).isInstanceOf(Set.class);
SubscriptionData data = fromJson.getSubscriptionTable().get("topicA");
assertThat(data).isExactlyInstanceOf(SubscriptionData.class);
}
@Test
public void testComputeMinVersion() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connections = new HashSet<Connection>();
Connection conn1 = new Connection();
conn1.setVersion(1);
connections.add(conn1);
Connection conn2 = new Connection();
conn2.setVersion(10);
connections.add(conn2);
consumerConnection.setConnectionSet(connections);
int version = consumerConnection.computeMinVersion();
assertThat(version).isEqualTo(1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import static org.assertj.core.api.Assertions.assertThat;
public class KVTableTest {
@Test
public void testFromJson() throws Exception {
HashMap<String, String> table = new HashMap<String, String>();
table.put("key1", "value1");
table.put("key2", "value2");
KVTable kvTable = new KVTable();
kvTable.setTable(table);
String json = RemotingSerializable.toJson(kvTable, true);
KVTable fromJson = RemotingSerializable.fromJson(json, KVTable.class);
assertThat(fromJson).isNotEqualTo(kvTable);
assertThat(fromJson.getTable().get("key1")).isEqualTo(kvTable.getTable().get("key1"));
assertThat(fromJson.getTable().get("key2")).isEqualTo(kvTable.getTable().get("key2"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class QueryCorrectionOffsetBodyTest {
@Test
public void testFromJson() throws Exception {
QueryCorrectionOffsetBody qcob = new QueryCorrectionOffsetBody();
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
offsetMap.put(1, 100L);
offsetMap.put(2, 200L);
qcob.setCorrectionOffsets(offsetMap);
String json = RemotingSerializable.toJson(qcob, true);
QueryCorrectionOffsetBody fromJson = RemotingSerializable.fromJson(json, QueryCorrectionOffsetBody.class);
assertThat(fromJson.getCorrectionOffsets().get(1)).isEqualTo(100L);
assertThat(fromJson.getCorrectionOffsets().get(2)).isEqualTo(200L);
assertThat(fromJson.getCorrectionOffsets().size()).isEqualTo(2);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class ResetOffsetBodyTest {
@Test
public void testFromJson() throws Exception {
ResetOffsetBody rob = new ResetOffsetBody();
Map<MessageQueue, Long> offsetMap = new HashMap<MessageQueue, Long>();
MessageQueue queue = new MessageQueue();
queue.setQueueId(1);
queue.setBrokerName("brokerName");
queue.setTopic("topic");
offsetMap.put(queue, 100L);
rob.setOffsetTable(offsetMap);
String json = RemotingSerializable.toJson(rob, true);
ResetOffsetBody fromJson = RemotingSerializable.fromJson(json, ResetOffsetBody.class);
assertThat(fromJson.getOffsetTable().get(queue)).isEqualTo(100L);
assertThat(fromJson.getOffsetTable().size()).isEqualTo(1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat;
public class SubscriptionGroupWrapperTest {
@Test
public void testFromJson(){
SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<String, SubscriptionGroupConfig>();
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
subscriptionGroupConfig.setBrokerId(1234);
subscriptionGroupConfig.setGroupName("Consumer-group-one");
subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
DataVersion dataVersion = new DataVersion();
dataVersion.nextVersion();
subscriptionGroupWrapper.setDataVersion(dataVersion);
String json = RemotingSerializable.toJson(subscriptionGroupWrapper, true);
SubscriptionGroupWrapper fromJson = RemotingSerializable.fromJson(json, SubscriptionGroupWrapper.class);
assertThat(fromJson.getSubscriptionGroupTable()).containsKey("Consumer-group-one");
assertThat(fromJson.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
assertThat(fromJson.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.stats;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class StatsItemSetTest {
private ThreadPoolExecutor executor;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Test
public void test_getAndCreateStatsItem_multiThread() throws InterruptedException {
for (int i = 0; i < 50; i++) {
assertEquals(20000L, test_unit().longValue());
}
}
@Test
public void test_getAndCreateMomentStatsItem_multiThread() throws InterruptedException {
for (int i = 0; i < 50; i++) {
assertEquals(10, test_unit_moment().longValue());
}
}
private AtomicLong test_unit() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(100, 200, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10000; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
statsItemSet.addValue("topicTest", 2, 1);
}
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10000) {
break;
}
Thread.sleep(1000);
}
return statsItemSet.getStatsItem("topicTest").getValue();
}
private AtomicLong test_unit_moment() throws InterruptedException {
final MomentStatsItemSet statsItemSet = new MomentStatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(100, 200, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10000; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
statsItemSet.setValue("test", 10);
}
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10000) {
break;
}
Thread.sleep(1000);
}
return statsItemSet.getAndCreateStatsItem("test").getValue();
}
@After
public void shutdown() {
executor.shutdown();
}
}
\ No newline at end of file
1 基本样例 1 基本样例
-------- --------
在基本样例中我们提供如下的功能场景: 在基本样例中我们提供如下的功能场景:
* 使用RocketMQ发送三种类型的消息:同步消息异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。 * 使用RocketMQ发送三种类型的消息:同步消息异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
* 使用RocketMQ来消费接收到的消息。 * 使用RocketMQ来消费接收到的消息。
### 1.1 加入依赖: ### 1.1 加入依赖:
...@@ -450,7 +450,7 @@ public class ScheduledMessageProducer { ...@@ -450,7 +450,7 @@ public class ScheduledMessageProducer {
您将会看到消息的消费比存储时间晚10秒。 您将会看到消息的消费比存储时间晚10秒。
### 3.4 延时消息的使用场景 ### 3.4 延时消息的使用场景
1. 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
### 3.5 延时消息的使用限制 ### 3.5 延时消息的使用限制
......
...@@ -5,7 +5,6 @@ ...@@ -5,7 +5,6 @@
RocketMQ架构上主要分为四部分,如上图所示: RocketMQ架构上主要分为四部分,如上图所示:
- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。 - Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。 - Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
......
# 最佳实践(best practice) # 最佳实践
## 1 生产者 ## 1 生产者
...@@ -38,7 +38,7 @@ ...@@ -38,7 +38,7 @@
- **SLAVE_NOT_AVAILABLE** - **SLAVE_NOT_AVAILABLE**
消息发送成功,但是此时Slave不可用。此时消息已经进入Master服务器队列,只有Master服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。 消息发送成功,但是此时Slave不可用。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。
### 1.2 消息发送失败处理方式 ### 1.2 消息发送失败处理方式
......
此差异已折叠。
...@@ -48,7 +48,7 @@ rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块 ...@@ -48,7 +48,7 @@ rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块
![](image/rocketmq_design_3.png) ![](image/rocketmq_design_3.png)
### 2.2 协议设计与编解码 #### 2.2 协议设计与编解码
在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。 在Client和Server之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。
Header字段 | 类型 | Request说明 | Response说明 Header字段 | 类型 | Request说明 | Response说明
...@@ -176,9 +176,9 @@ Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ ...@@ -176,9 +176,9 @@ Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ
![](image/rocketmq_design_11.png) ![](image/rocketmq_design_11.png)
RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消息。其实改变消息主题是RocketMQ的常用“套路”,回想一下定时任务的实现机制。 RocketMQ的具体实现策略是:写入的如果事务消息,对消息的Topic和Queue等属性进行替换,同时将原来的Topic和Queue信息存储到消息的属性中,正因为消息主题被替换,故消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费。其实改变消息主题是RocketMQ的常用“套路”,回想一下延时消息的实现机制。
2.倘若一阶段的消息对用户可见 2.Commit和Rollback操作以及Op消息的引入
在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。 在完成一阶段写入一条对用户不可见的消息后,二阶段如果是Commit操作,则需要让消息对用户可见;如果是Rollback则需要撤销一阶段的消息。先说Rollback的情况。对于Rollback,本身一阶段的消息对用户是不可见的,其实不需要真正撤销消息(实际上RocketMQ也无法去真正的删除一条消息,因为是顺序写文件的)。但是区别于这条消息没有确定状态(Pending状态,事务悬而未决),需要一个操作来标识这条消息的最终状态。RocketMQ事务消息方案中引入了Op消息的概念,用Op消息标识事务消息已经确定的状态(Commit或者Rollback)。如果一条事务消息没有对应的Op消息,说明这个事务的状态还无法确定(可能是二阶段失败了)。引入Op消息后,事务消息无论是Commit或者Rollback都会记录一个Op操作。Commit相对于Rollback只是在写入Op消息前创建Half消息的索引。
......
此差异已折叠。
# Basic Concept
## 1 Message Model
The RocketMQ message model is mainly composed of Producer, Broker and Consumer. The Producer is responsible for producing messages, the Consumer is responsible for consuming messages, and the Broker is responsible for storing messages.
The Broker corresponds to one server during actual deployment, and each Broker can store messages from multiple topics, and messages from each Topic can be stored in a different Broker by sharding strategy.
The Message Queue is used to store physical offsets of messages, and the Message addresses in multiple Topic are stored in multiple Message queues. The Consumer Group consists of multiple Consumer instances.
## 2 Producer
The Producer is responsible for producing messages, typically by business systems. It sends messages generated by the business application systems to brokers. The RocketMQ provides multiple paradigms of sending: synchronous, asynchronous,sequential and one-way. Both synchronous and asynchronous methods require the Broker to return confirmation information, while one-way sending is not required.
## 3 Consumer
The Consumer is responsible for consuming messages, typically the background system is responsible for asynchronous consumption. The Consumer pulls messages from brokers and feeds them into application. In perspective of user application, two types of consumers are provided:Pull Consumer and Push Consumer.
## 4 Topic
The Topic refers to a collection of one kind message. Each topic contains several messages and one message can only belong to one topic. The Topic is the basic unit of RocketMQ for message subscription.
## 5 Broker Server
As the role of the transfer station, the Broker Server stores, and forwards messages. In the RocketMQ system, the Broker Server is responsible for receiving messages sent from producers, storing them and preparing to handle pull requests. It also stores message related meta data, including consumer groups, consuming progress, topics and queues info.
## 6 Name Server
The Name Server serves as the provider of routing service. The Producer or Consumer can find the list of Broker IP addresses corresponding each topic through name server. Multiple Name Servers can be deployed as a cluster, but are independent of each other and do not exchange information.
## 7 Pull Consumer
A type of Consumer. Applications are usually pulls messages from brokers by actively calling the Consumer's pull message method, and the application has the advantages of controlling the timing and frequency of pulling messages. Once batches of messages are pulled, user application initiates consuming process.
## 8 Push Consumer
A type of Consumer. Under this mode, after the Broker receives the data, it will actively push it to the consumer, which is generally of high real-time performance.
## 9 Producer Group
A collection of the same type of Producer, which sends the same type of messages with consistent logic. If a transaction message is sent and the original producer crashes after sending, the Broker Server will contacts other Producer in the same Producer group to commit or rollback the transactional message.
## 10 Consumer Group
A collection of the same type of Consumer, which sends the same type of messages with consistent logic. The Consumer Group makes load-balance and fault-tolerance super easy in terms of message consuming.
Warning: consumer instances of one consumer group must have exactly the same topic subscription(s).
RocketMQ supports two type of consumption mode:Clustering and Broadcasting.
## 11 Consumption Mode - Clustering
Under the Clustering mode, all messages from one topic will be delivered to all consumer instances averagely as much as possible. That is, one message can be consumed by only one consumer instance.
## 12 Consumption Mode - Broadcasting
Under the Broadcasting mode, each Consumer instance of the same Consumer Group receives every message that published to the corresponding topic.
## 13 Normal Ordered Message
Under the Normal Ordered Message mode, the messages received by consumers from the same ConsumeQueue are sequential, while the messages received from different message queues may be non-sequential.
## 14 Strictly Ordered Message
Under the Strictly Ordered Message mode, all messages received by the consumers from the same topic are sequential, as the order they were stored.
## 15 Message
The physical carrier of information transmitted by a messaging system, the smallest unit of production and consumption data, each message must belong to a topic.
Each Message in RocketMQ has a unique Message ID and can carry a key, generally used to store business-related value. The system provides the function to query messages by Message ID and Key.
## 16 Tag
Flags set for messages to distinguish different types of messages under the same topic, functioning as a "sub-topic". Messages from the same business unit can set different tags under the same topic in terms of different business purposes. The Tag can effectively maintain the clarity and consistency of the code and optimize the query system provided by RocketMQ. The Consumer can realize different "sub-topic" by using Tag, so as to achieve better expansibility.
## Client Configuration
Relative to RocketMQ's Broker cluster, producers and consumers are client. In this section, it mainly describes the common behavior configuration of producers and consumers.
### Client Addressing mode
```RocketMQ``` can let client find the ```Name Server```, and then find the ```Broker```by the ```Name Server```. Followings show a variety of configurations, and priority level from highly to lower, the highly priority configurations can override the lower priority configurations.
- Specified ```Name Server``` address in the code, and multiple ```Name Server``` addresses are separated by semicolons
```java
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
```
- Specified ```Name Server``` address in the Java setup parameters
```text
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
```
- Specified ```Name Server``` address in the envionment variables
```text
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
```
- HTTP static server addressing(default)
After client started, it will access a http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
```text
192.168.0.1:9876;192.168.0.2:9876
```
By default, the client accesses the HTTP server every 2 minutes, and update the local Name Server address.The URL is hardcoded in the code, you can change the target server by updating ```/etc/hosts``` file, such as add following configuration at the ```/etc/hosts```:
```text
10.232.22.67 jmenv.taobao.net
```
HTTP static server addressing is recommended, because it is simple client deployment, and the Name Server cluster can be upgraded hot.
### Client Configuration
```DefaultMQProducer```,```TransactionMQProducer```,```DefaultMQPushConsumer```,```DefaultMQPullConsumer``` all extends the ```ClientConfig``` Class, ```ClientConfig``` as the client common configuration class. Client configuration style like getXXX,setXXX, each of the parameters can config by spring and also config their in the code. Such as the ```namesrvAddr``` parameter: ```producer.setNamesrvAddr("192.168.0.1:9876")```, same with the other parameters.
#### Client Common Configuration
| Pamater Name | Default Value | Description |
| ----------------------------- | ------- | ------------------------------------------------------------ |
| namesrvAddr | | Name Server address list, multiple NameServer addresses are separated by semicolons |
| clientIP | local IP | Client local ip address, some machines will fail to recognize the client IP address, which needs to be enforced in the code |
| instanceName | DEFAULT | Name of the client instance, Multiple producers and consumers created by the client actually share one internal instance (this instance contains network connection, thread resources, etc.). |
| clientCallbackExecutorThreads | 4 | Number of communication layer asynchronous callback threads |
| pollNameServerInteval | 30000 | Polling the Name Server interval in milliseconds |
| heartbeatBrokerInterval | 30000 | The heartbeat interval, in milliseconds, is sent to the Broker |
| persistConsumerOffsetInterval | 5000 | The persistent Consumer consumes the progress interval in milliseconds |
#### Producer Configuration
| Pamater Name | Default Value | Description |
| -------------------------------- | ---------------- | ------------------------------------------------------------ |
| producerGroup | DEFAULT_PRODUCER | The name of the Producer group. If multiple producers belong to one application and send the same message, they should be grouped into the same group |
| createTopicKey | TBW102 | When a message is sent, topics that do not exist on the server are automatically created and a Key is specified that can be used to configure the default route to the topic where the message is sent.|
| defaultTopicQueueNums | 4 | The number of default queue when sending messages and auto created topic which not exists the server|
| sendMsgTimeout | 10000 | Timeout time of sending message in milliseconds |
| compressMsgBodyOverHowmuch | 4096 | The message Body begins to compress beyond the size(the Consumer gets the message automatically unzipped.), unit of byte|
| retryAnotherBrokerWhenNotStoreOK | FALSE | If send message and return sendResult but sendStatus!=SEND_OK, Whether to resend |
| retryTimesWhenSendFailed | 2 | If send message failed, maximum number of retries, this parameter only works for synchronous send mode|
| maxMessageSize | 4MB | Client limit message size, over it may error. Server also limit so need to work with server |
| transactionCheckListener | | The transaction message looks back to the listener, if you want send transaction message, you must setup this
| checkThreadPoolMinSize | 1 | Minimum of thread in thread pool when Broker look back Producer transaction status |
| checkThreadPoolMaxSize | 1 | Maximum of thread in thread pool when Broker look back Producer transaction status |
| checkRequestHoldMax | 2000 | Producer local buffer request queue size when Broker look back Producer transaction status |
| RPCHook | null | This parameter is passed in when the Producer is creating, including the pre-processing before the message sending and the processing after the message response. The user can do some security control or other operations in the first interface.|
#### PushConsumer Configuration
| Pamater Name | Default Value | Description |
| ---------------------------- | ----------------------------- | ------------------------------------------------------------ |
| consumerGroup | DEFAULT_CONSUMER | Consumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together |
| messageModel | CLUSTERING | Message support two mode: cluster consumption and broadcast consumption |
| consumeFromWhere | CONSUME_FROM_LAST_OFFSET | After Consumer started, default consumption from last location, it include two situation: One is last consumption location is not expired, and consumption start at last location; The other is last location expired, start consumption at current queue's first message |
| consumeTimestamp | Half an hour ago | Only consumeFromWhere=CONSUME_FROM_TIMESTAMP, this can work |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Implements strategy of Rebalance algorithms |
| subscription | | subscription relation |
| messageListener | | message listener |
| offsetStore | | Consumption progress store |
| consumeThreadMin | 10 | Minimum of thread in consumption thread pool |
| consumeThreadMax | 20 | Maximum of thread in consumption thread pool |
| | | |
| consumeConcurrentlyMaxSpan | 2000 | Maximum span allowed for single queue parallel consumption |
| pullThresholdForQueue | 1000 | Pull message local queue cache maximum number of messages |
| pullInterval | 0 | Pull message interval, because long polling it is 0, but for flow control, you can set value which greater than 0 in milliseconds |
| consumeMessageBatchMaxSize | 1 | Batch consume message |
| pullBatchSize | 32 | Batch pull message |
#### PullConsumer Configuration
| Pamater Name | Default Value | Description |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
| consumerGroup | DEFAULT_CONSUMER | Consumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together |
| brokerSuspendMaxTimeMillis | 20000 | Long polling, Consumer pull message request suspended for the longest time in the Broker in milliseconds |
| consumerTimeoutMillisWhenSuspend | 30000 | Long polling, Consumer pull message request suspend in the Broker over this time value, client think timeout. Unit is milliseconds |
| consumerPullTimeoutMillis | 10000 | Not long polling, timeout time of pull message in milliseconds |
| messageModel | BROADCASTING | Message support two mode: cluster consumption and broadcast consumption |
| messageQueueListener | | Listening changing of queue |
| offsetStore | | Consumption schedule store |
| registerTopics | | Collection of registered topics |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Implements strategy about Rebalance algorithm |
#### Message Data Structure
| Field Name | Default Value | Description |
| -------------- | ------ | ------------------------------------------------------------ |
| Topic | null | Required, the name of the topic to which the message belongs |
| Body | null | Required, message body |
| Tags | null | Optional, message tag, convenient for server filtering. Currently only one tag per message is supported |
| Keys | null | Optional, represent this message's business keys, server create hash indexes based keys. After setting, you can find message by ```Topics```,```Keys``` in Console system. Because of hash indexes, please make key as unique as possible, such as order number, goods Id and so on.|
| Flag | 0 | Optional, it is entirely up to the application, and RocketMQ does not intervene |
| DelayTimeLevel | 0 | Optional, message delay level, 0 represent no delay, greater tan 0 can consume |
| WaitStoreMsgOK | TRUE | Optional, indicates whether the message is not answered until the server is down. |
# ** The system configuration** #
This section focuses on the configuration of the system (JVM/OS)
## **1 JVM Options** ##
The latest released version of JDK 1.8 is recommended. Set the same Xms and Xmx value to prevent the JVM from resizing the heap for better performance. A simple JVM configurations looks like this:
-server -Xms8g -Xmx8g -Xmn4g
If you don’t care about the boot time of RocketMQ broker, pre-touch the Java heap to make sure that every page will be allocated during JVM initialization is a better choice. Those who don’t care about the boot time can enable it:
-XX:+AlwaysPreTouch
Disable biased locking maybe reduce JVM pauses:
-XX:-UseBiasedLocking
As for garbage collection, G1 collector with JDK 1.8 is recommended:
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30
These GC options looks a little aggressive, but it’s proved to have good performance in our production environment
Don’t set a too small value for -XX:MaxGCPauseMillis, otherwise JVM will use a small young generation to achieve this goal which will cause very frequent minor GC.So use rolling GC log file is recommended:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30m
If write GC file will increase latency of broker, consider redirect GC log file to a memory file system:
-Xloggc:/dev/shm/mq_gc_%p.log123
## 2 Linux Kernel Parameters ##
There is a os.sh script that lists a lot of kernel parameters in folder bin which can be used for production use with minor changes. Below parameters need attention, and more details please refer to documentation for /proc/sys/vm/*.
- **vm.extra_free_kbytes**, tells the VM to keep extra free memory between the threshold where background reclaim (kswapd) kicks in, and the threshold where direct reclaim (by allocating processes) kicks in. RocketMQ uses this parameter to avoid high latency in memory allocation. (It is specific to the kernel version)
- **vm.min_free_kbytes**, if you set this to lower than 1024KB, your system will become subtly broken, and prone to deadlock under high loads.
- **vm.max_map_count**, limits the maximum number of memory map areas a process may have. RocketMQ will use mmap to load CommitLog and ConsumeQueue, so set a bigger value for this parameter is recommended.
- **vm.swappiness**, define how aggressive the kernel will swap memory pages. Higher values will increase agressiveness, lower values decrease the amount of swap. 10 is recommended for this value to avoid swap latency.
- **File descriptor limits**, RocketMQ needs open file descriptors for files(CommitLog and ConsumeQueue) and network connections. We recommend set 655350 for file descriptors.
- **Disk scheduler**, the deadline I/O scheduler is recommended for RocketMQ, which attempts to provide a guaranteed latency for requests.
# Message Filter
RocketMQ - a distributed message queue, is different with all other MQ middleware, on the way of filtering messages. It's do the filter when the messages are subscribed via consumer side.RocketMQ do it lies in the separate storage mechanism that Producer side writing messages and Consomer subscribe messages, Consumer side will get an index from a logical message queue ConsumeQueue when subscribing, then read message entity from CommitLog using the index. So in the end, it is still impossible to get around its storage structure.The storage structure of ConsumeQueue is as follows, and there is a 8-byte Message Tag hashcode, The message filter based on Tag value is just used this Message Tag hash-code.
![](images/rocketmq_design_7.png)
The RocketMQ has two mainly filter types:
* Tag filtering: Consumer can specify not only the message topic but also the message tag values, when subscribing. Multiple tag values need to be separated by '||'. When consumer subscribing a message, it builds the subscription request into a `SubscriptionData` object and sends a Pull message request to the Broker side. Before the Broker side reads data from the RocketMQ file storage layer - Store, it will construct a `MessageFilter` using the `SubscriptionData` object and then pass it to the Store. Store get a record from `ConsumeQueue`, and it will filter the message by the saved tag hashcode, it is unable to filter the messages exactly in the server side because of only the hashcode will be used when filtering, Therefore, after the Consumer pulls the message, it also needs to compare the original tag string of the message. If the original tag string is not same with the expected, the message will be ignored.
* SQL92 filtering: This filter behavior is almost same with the above Tag filtering method. The only difference is on the way how Store works. The rocketmq-filter module is responsible for the construction and execution of the real SQL expression. Executing an SQL expression every time a filter is executed affects efficiency, so RocketMQ uses BloomFilter to avoid doing it every time. The expression context of SQL92 is a property of the message.
## 4 Load Balancing
Load balancing in RocketMQ is accomplished on Client side. Specifically, it can be divided into load balancing at Producer side when sending messages and load balancing at Constumer side when subscribing messages.
### 4.1 Producer Load Balancing
When the Producer sends a message, it will first find the specified TopicPublishInfo according to Topic. After getting the routing information of TopicPublishInfo, the RocketMQ client will select a queue (MessageQueue) from the messageQueue List in TopicPublishInfo to send the message by default.Specific fault-tolerant strategies are defined in the MQFaultStrategy class.
Here is a sendLatencyFaultEnable switch variable, which, if turned on, filters out the Broker agent of not available on the basis of randomly gradually increasing modular arithmetic selection. The so-called "latencyFault Tolerance" refers to a certain period of time to avoid previous failures. For example, if the latency of the last request exceeds 550 Lms, it will evade 3000 Lms; if it exceeds 1000L, it will evade 60000 L; if it is closed, it will choose a queue (MessageQueue) to send messages by randomly gradually increasing modular arithmetic, and the latencyFault Tolerance mechanism is the key to achieve high availability of message sending.
### 4.2 Consumer Load Balancing
In RocketMQ, the two consumption modes (Push/Pull) on the Consumer side are both based on the pull mode to get the message, while in the Push mode it is only a kind of encapsulation of the pull mode, which is essentially implemented as the message pulling thread after pulling a batch of messages from the server. After submitting to the message consuming thread pool, it continues to try again to pull the message to the server. If the message is not pulled, the pull is delayed and continues. In both pull mode based consumption patterns (Push/Pull), the Consumer needs to know which message queue - queue from the Broker side to get the message. Therefore, it is necessary to do load balancing on the Consumer side, that is, which Consumer consumption is allocated to the same ConsumerGroup by more than one MessageQueue on the Broker side.
1, Heartbeat Packet Sending on Consumer side
After Consumer is started, it continuously sends heartbeat packets to all Broker instances in the RocketMQ cluster via timing task (which contains the message consumption group name, subscription relationship collection,Message communication mode and the value of the client id,etc). After receiving the heartbeat message from Consumer, Broker side maintains it in Consumer Manager's local caching variable—consumerTable, At the same time, the encapsulated client network channel information is stored in the local caching variable—channelInfoTable, which can provide metadata information for the later load balancing of Consumer.
2,Core Class for Load Balancing on Consumer side—RebalanceImpl
Starting the MQClientInstance instance in the startup process of the Consumer instance will complete the start of the load balancing service thread-RebalanceService (executed every 20 s). By looking at the source code, we can find that the run () method of the RebalanceService thread calls the rebalanceByTopic () method of the RebalanceImpl class, which is the core of the Consumer end load balancing. Here, rebalanceByTopic () method will do different logical processing depending on whether the consumer communication type is "broadcast mode" or "cluster mode". Here we mainly look at the main processing flow in cluster mode:
(1) Get the message consumption queue set (mqSet) under the Topic from the local cache variable—topicSubscribeInfoTable of the rebalanceImpl instance.
(2) Call mQClientFactory. findConsumerIdList () method to send a RPC communication request to Broker side to obtain the consumer Id list under the consumer group based on the parameters of topic and consumer group (consumer table constructed by Broker side based on the heartbeat data reported by the front consumer side responds and returns, business request code: GET_CONSUMER_LIST_BY_GROUP);
(3) First, the message consumption queue and the consumer Id under Topic are sorted, then the message queue to be pulled is calculated by using the message queue allocation strategy algorithm (default: the average allocation algorithm of the message queue). The average allocation algorithm here is similar to the paging algorithm. It ranks all MessageQueues like records. It ranks all consumers like pages. It calculates the average size of each page and the range of each page record. Finally, it traverses the whole range and calculates the records that the current consumer should allocate to (MessageQueue here).
![Image text](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_8.png)
(4) Then, the updateProcessQueueTableInRebalance () method is invoked, which first compares the allocated message queue set (mqSet) with processQueueTable for filtering.
![Image text](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_9.png)
- The red part of the processQueueTable annotation in the figure above
indicates that it is not included with the assigned message queue set
mqSet. Set the Dropped attribute to true for these queues, and then
check whether these queues can remove the processQueueTable cache
variable or not. The removeUnnecessaryMessageQueue () method is
executed here, that is, check every 1s to see if the locks of the
current consumption processing queue can be retrieved and return true
if they are retrieved. If the lock of the current consumer processing
queue is still not available after waiting for 1s, it returns false.
If true is returned, the corresponding Entry is removed from the
processQueueTable cache variable.
- The green section in processQueueTable above represents the
intersection with the assigned message queue set mqSet. Determine
whether the ProcessQueue has expired, regardless of Pull mode, if it
is Push mode, set the Dropped attribute to true, and call the
removeUnnecessaryMessageQueue () method to try to remove Entry as
above;
Finally, a ProcessQueue object is created for each MessageQueue in the filtered message queue set (mqSet) and stored in the processQueueTable queue of RebalanceImpl (where the computePullFromWhere (MessageQueue mq) method of the RebalanceImpl instance is invoked to obtain the next progress consumption value offset of the MessageQueue object, which is then populated into the attribute of pullRequest object to be created next time.), and create pull request object—pullRequest to add to pull list—pullRequestList, and finally execute dispatchPullRequest () method. PullRequest object of Pull message is put into the blocking queue pullRequestQueue of PullMessageService service thread in turn, and the request of Pull message is sent to Broker end after the service thread takes out. Among them, we can focus on the contrast, RebalancePushImpl and RebalancePullImpl two implementation classes dispatchPullRequest () method is different, the method in RebalancePullImpl class is empty, thus answering the last question in the previous article.
The core design idea of message consumption queue is that a message consumption queue can only be consumed by one consumer in the same consumer group at the same time, and a message consumer can consume multiple message queues at the same time.
# Message Queries
RocketMQ supports message queries by two dimensions, which are "Query Message by Message Id" and "Query Message by Message Key".
## 1. Query Message by Message Id
The MessageId in RocketMQ has a total length of 16 bytes, including the broker address (IP address and port) and CommitLog offset. In RocketMQ, the specific approach is that the Client resolves the Broker's address (IP address and port) and the CommitLog's offset address from the MessageId. Then both of them are encapsulated into an RPC request, and finally it will be sent through the communication layer (business request code: VIEW_MESSAGE_BY_ID). The Broker reads a message by using the CommitLog offset and size to find the real message in the CommitLog and then return, which is how QueryMessageProcessor works.
## 2. Query Message by Message Id
"Query Messages by Message Key" is mainly based on RocketMQ's IndexFile. The logical structure of the IndexFile is similar to the implementation of HashMap in JDK. The specific structure of the IndexFile is as follows:
![](images/rocketmq_design_message_query.png)
The IndexFile provides the user with the querying service by “Querying Messages by Message Key”. The IndexFile is stored in $HOME\store\index${fileName}, and the file name is named after the timestamp at the time of creation. The file size is fixed, which is 420,000,040 bytes (40+5million\*4+20million\*20). If the UNIQ_KEY is set in the properties of the message, then the "topic + ‘#’ + UNIQ_KEY" will be used as the index. Likewise, if the KEYS is set in the properties of the message (multiple KEYs should be separated by spaces), then the "topic + ‘#’ + KEY" will be used as the index.
The index data contains four fields, Key Hash, CommitLog offset, Timestamp and NextIndex offset, for a total of 20 Bytes. The NextIndex offset of the index data will point to the previous index data if the Key Hash of the index data is the same as that of the previous index data. If a hash conflict occurs, then the NextIndex offset can be used as the field to string all conflicting indexes in a linked list. What the Timestamp records is the time difference between two storeTimestamps, instead of a specific time. The structure of the entire IndexFile is shown in the graph. The Header is used to store some general statistics, which needs 40 bytes. The Slot Table of 4\*5million bytes does not save the real index data, but saves the header of the singly linked list corresponding to each slot. The Index Linked List of 20\*20million is the real index data, that is, an Index File can hold 20million indexes.
The specific method of "Query Message by Message Key" is that the topic and message key are used to find the record in the IndexFile, and then read the message from the file of CommitLog according to the CommitLog offset in this record.
\ No newline at end of file
## 2 Communication Mechanism
RocketMQ message queue cluster mainly includes four roles: NameServer, Broker (Master/Slave), Producer and Consumer. The basic communication process is as follows:
(1) After Broker start-up, it needs to complete one operation: register itself to NameServer, and then report Topic routing information to NameServer at regular intervals of 30 seconds.
(2) When message producer Producer sends a message as a client, it needs to obtain routing information from the local cache TopicPublishInfoTable according to the Topic of the message. If not, it will be retrieved from NameServer and update to local cache, at the same time, Producer will retrieve routing information from NameServer every 30 seconds by default.
(3) Message producer Producer chooses a queue to send the message according to the routing information obtained in 2); Broker receives the message and records it in disk as the receiver of the message.
(4) After message consumer Consumer get the routing information according to 2) and complete the load balancing of the client, then select one or several message queues to pull messages and consume them.
From 1) ~ 3 above, we can see that both Producer, Broker and NameServer communicate with each other(only part of MQ communication is mentioned here), so how to design a good network communication module is very important in MQ. It will determine the overall messaging capability and final performance of the RocketMQ cluster.
rocketmq-remoting module is the module responsible for network communication in RocketMQ message queue. It is relied on and referenced by almost all other modules (such as rocketmq-client,rocketmq-broker,rocketmq-namesrv) that need network communication. In order to realize the efficient data request and reception between the client and the server, the RocketMQ message queue defines the communication protocol and extends the communication module on the basis of Netty.
### 2.1 Remoting Communication Class Structure
![](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_3.png)
### 2.2 Protocol Design and Codec
When a message is sent between Client and Server, a protocol convention is needed for the message sent, so it is necessary to customize the message protocol of RocketMQ. At the same time, in order to efficiently transmit messages and read the received messages, it is necessary to encode and decode the messages. In RocketMQ, the RemotingCommand class encapsulates all data content in the process of message transmission, which includes not only all data structures, but also encoding and decoding operations.
Header field | Type | Request desc | Response desc
--- | --- | --- | --- |
code |int | Request code. answering business processing is different according to different requests code | Response code. 0 means success, and non-zero means errors.
language | LanguageCode | Language implemented by the requester | Language implemented by the responder
version | int | Version of Request Equation | Version of Response Equation
opaque | int |Equivalent to reqeustId, the different request identification codes on the same connection correspond to those in the response message| The response returns directly without modification
flag | int | Sign, used to distinguish between ordinary RPC or oneway RPC | Sign, used to distinguish between ordinary RPC or oneway RPC
remark | String | Transfer custom text information | Transfer custom text information
extFields | HashMap<String, String> | Request custom extension information| Response custom extension information
![](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_4.png)
From the above figure, the transport content can be divided into four parts:
(1) Message length: total length, four bytes of storage, occupying an int type;
(2) Serialization type header length: occupying an int type. The first byte represents the serialization type, and the last three bytes represent the header length;
(3) Header data: serialized header data;
(4) Message body data: binary byte data content of message body;
#### 2.3 Message Communication Mode and Procedure
There are three main ways to support communication in RocketMQ message queue: synchronous (sync), asynchronous (async), one-way (oneway). The "one-way" communication mode is relatively simple and is generally used in sending heartbeat packets without paying attention to its Response. Here, mainly introduce the asynchronous communication flow of RocketMQ.
![](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_5.png)
#### 2.4 Reactor Multithread Design
The RPC communication of RocketMQ uses Netty component as the underlying communication library, and also follows the Reactor multithread model. At the same time, some extensions and optimizations are made on it.
![](https://github.com/apache/rocketmq/raw/develop/docs/cn/image/rocketmq_design_6.png)
Above block diagram can roughly understand the Reactor multi-thread model of NettyRemotingServer in RocketMQ. A Reactor main thread (eventLoopGroupBoss, is 1 above) is responsible for listening to TCP network connection requests, establishing connections, creating SocketChannel, and registering on selector. The source code of RocketMQ automatically selects NIO and Epoll according to the type of OS. Then listen to real network data. After you get the network data, you throw it to the Worker thread pool (eventLoopGroupSelector, is the "N" above, the default is 3 in the source code). You need to do SSL verification, codec, idle check, network connection management before you really execute the business logic. These tasks to defaultEventExecutorGroup (that is, "M1" above, the default set to 8 in the source code) to do. The processing business operations are executed in the business thread pool. According to the RomotingCommand business request code, the corresponding processor is found in the processorTable local cache variable and encapsulated into the task, and then submitted to the corresponding business processor processing thread pool for execution (sendMessageExecutor,). Take sending a message, for example, the "M2" above. The thread pool continues to increase in several steps from entry to business logic, which is related to the complexity of each step. The more complex the thread pool is, the wider the concurrent channel is required.
Number of thread | Name of thread | Desc of thread
--- | --- | ---
1 | NettyBoss_%d | Reactor Main thread
N | NettyServerEPOLLSelector_%d_%d | Reactor thread pool
M1 | NettyServerCodecThread_%d | Worker thread pool
M2 | RemotingExecutorThread_%d | bussiness processor thread pool
# Message Storage
![](images/rocketmq_storage_arch.png)
Message storage is the most complicated and important part of RocketMQ. This section will describe the three aspects of RocketMQ: message storage architecture, PageCache and memory mapping, and RocketMQ's two different disk flushing methods.
## 1 Message Storage Architecture
The message storage architecture diagram consists of 3 files related to message storage: `CommitLog` file, `ConsumeQueue` file, and `IndexFile`.
* `CommitLog`:The `CommitLog` file stores message body and metadata sent by producer, and the message content is not fixed length. The default size of one `CommitLog` file is 1G, the length of the file name is 20 digits, the left side is zero padded, and the remaining is the starting offset. For example, `00000000000000000000` represents the first file, the starting offset is 0, and the file size is 1G=1073741824, when the first `CommitLog` file is full, the second `CommitLog` file is `00000000001073741824`, the starting offset is 1073741824, and so on. The message is mainly appended to the log file sequentially. When one `CommitLog` file is full, the next will be written.
* `ConsumeQueue`: The `ConsumeQueue` is used to improve the performance of message consumption. Since RocketMQ uses topic-based subscription mode, message consumption is specific to the topic. Traversing the commitlog file to retrieve messages of one topic is very inefficient. The consumer can find the messages to be consumed according to the `ConsumeQueue`. The `ConsumeQueue`(logic consume queue) as an index of the consuming message stores the starting physical offset `offset` in `CommitLog` of the specified topic, the message size `size` and the hash code of the message tag. The `ConsumeQueue` file can be regarded as a topic-based `CommitLog` index file, so the consumequeue folder is organized as follows: `topic/queue/file` three-layer organization structure, the specific storage path is `$HOME/store/consumequeue/{topic}/{queueId }/{fileName}`. The consumequeue file uses a fixed-length design, each entry occupies 20 bytes, which is an 8-byte commitlog physical offset, a 4-byte message length, and an 8-byte tag hashcode. One consumequeue file consists of 30W entries, each entry can be randomly accessed like an array, each `ConsumeQueue` file's size is about 5.72M.
* `IndexFile`: The `IndexFile` provides a way to query messages by key or time interval. The path of the `IndexFile` is `$HOME/store/index/${fileName}`, the file name `fileName` is named after the timestamp when it was created. One IndexFile's size is about 400M, and it can store 2000W indexes. The underlying storage of `IndexFile` is designed to implement the `HashMap` structure in the file system, so RocketMQ's index file is a hash index.
From the above architecture of the RocketMQ message storage, we can see RocketMQ uses a hybrid storage structure, that is, all the queues in an instance of the broker share a single log file `CommitLog` to store messages. RocketMQ's hybrid storage structure(messages of multiple topics are stored in one CommitLog) uses a separate storage structure for the data and index parts for Producer and Consumer respectively. The Producer sends the message to the Broker, then the Broker persists the message to the CommitLog file synchronously or asynchronously. As long as the message is persisted to the CommitLog on the disk, the message sent by the Producer will not be lost. Because of this, Consumer will definitely have the opportunity to consume this message. When no message can be pulled, the consumer can wait for the next pull. And the server also supports the long polling mode: if a pull request pulls no messages, the Broker can wait for 30 seconds, as long as new message arrives in this interval, it will be returned directly to the consumer. Here, RocketMQ's specific approach is using Broker's background service thread `ReputMessageService` to continuously dispatch requests and asynchronously build ConsumeQueue (Logical Queue) and IndexFile data.
## 2 PageCache and memory map
PageCache is a cache of files by the operating system to speed up the reading and writing of files. In general, the speed of sequential read and write files is almost the same as the speed of read and write memory. The main reason is that the OS uses a portion of the memory as PageCache to optimize the performance of the read and write operations. For data writing, the OS will first write to the Cache, and then the `pdflush` kernel thread asynchronously flush the data in the Cache to the physical disk. For data reading, if it can not hit the page cache when reading a file at a time, the OS will read the file from the physical disk and prefetch the data files of other neighboring blocks sequentially.
In RocketMQ, the logic consumption queue `ConsumeQueue` stores less data and is read sequentially. With the help of prefetch of the page cache mechanism, the read performance of the `ConsumeQueue` file is almost close to the memory read, even in the case of message accumulation, it does not affect performance. But for the log data file `CommitLog`, it will generate many random access reads when reading the message content, which seriously affects the performance. If you choose the appropriate IO scheduling algorithm, such as setting the IO scheduling algorithm to "Deadline" (when the block storage uses SSD), the performance of random reads will also be improved.
In addition, RocketMQ mainly reads and writes files through `MappedByteBuffer`. `MappedByteBuffer` uses the `FileChannel` model in NIO to directly map the physical files on the disk to the memory address in user space (`Mmap` method reduces the performance overhead of traditional IO copying disk file data back and forth between the buffer in kernel space and the buffer in user space), it converts the file operation into direct memory address manipulation, which greatly improves the efficiency of reading and writing files (Because of the need to use the memory mapping mechanism, RocketMQ's file storage is fixed-length, making it easy to map the entire file to memory at a time).
## 3 Message Disk Flush
![](images/rocketmq_storage_flush.png)
* synchronous flush: As shown above, the RocketMQ's Broker will return a successful `ACK` response to the Producer after the message is truly persisted to disk. Synchronous flushing is a good guarantee for the reliability of MQ messages, but it will have a big impact on performance. Generally, it is suitable for financial business applications.
* asynchronous flush: Asynchronous flushing can take full advantage of the PageCache of the OS, as long as the message is written to the PageCache, the successful `ACK` can be returned to the Producer. The message flushing is performed by the background asynchronous thread, which reduces the read and write delay and improves the performance and throughput of the MQ.
# Transaction Message
## 1 Transaction Message
Apache RocketMQ supports distributed transaction message from version 4.3.0. RocketMQ implements transaction message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.
![](../cn/image/rocketmq_design_10.png)
### 1.1 The Process of RocketMQ Transaction Message
The picture above shows the overall architecture of transaction message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.
1. The sending of message and Commit/Rollback.
(1) Sending the message(named Half message in RocketMQ)
(2) The server responds the writing result(success or failure) of Half message.
(3) Handle local transaction according to the result(local transaction won't be executed when the result is failure).
(4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).
2. Compensation process
(1) For a transaction message without a Commit/Rollback (means the message in the pending status), a "back-check" request is initiated from the broker.
(2) The Producer receives the "back-check" request and checks the status of the local transaction corresponding to the "back-check" message.
(3) Redo Commit or Rollback based on local transaction status.
The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.
### 1.2 The design of RocketMQ Transaction Message
1. Transaction message is invisible to users in first phase(commit-request phase)
Upon on the main process of transaction message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.
In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:
![](../cn/image/rocketmq_design_11.png)
The specific implementation strategy of RocketMQ is: if the transaction message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).
2. Commit/Rollback operation and introduction of Op message
After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can't actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of "Op message" is introduced, which means the message has a certain status(Commit or Rollback). If a transaction message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.
3. How Op message stored and the correspondence between Op message and Half message
RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.
![](../cn/image/rocketmq_design_12.png)
4. Index construction of Half messages
When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.
5. How to handle the message failed in the second phase?
If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called "back-check". The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transaction messages that the status are certain).
RocketMQ does not back-check the status of transaction messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.
# Schedule example # Schedule example
### 1Start consumer to wait for incoming subscribed messages ### 1.Start consumer to wait for incoming subscribed messages
```java ```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
...@@ -35,7 +35,7 @@ public class ScheduledMessageConsumer { ...@@ -35,7 +35,7 @@ public class ScheduledMessageConsumer {
} }
``` ```
### 2Send scheduled messages ### 2.Send scheduled messages
```java ```java
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
...@@ -64,15 +64,15 @@ public class ScheduledMessageProducer { ...@@ -64,15 +64,15 @@ public class ScheduledMessageProducer {
} }
``` ```
### 3Verification ### 3.Verification
You should see messages are consumed about 10 seconds later than their storing time. You should see messages are consumed about 10 seconds later than their storing time.
### 4Use scenarios for scheduled messages ### 4.Use scenarios for scheduled messages
For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released. For example, in e-commerce, if an order is submitted, a delay message can be sent, and the status of the order can be checked after 1 hour. If the order is still unpaid, the order can be cancelled and the inventory released.
### 5Restrictions on the use of scheduled messages ### 5.Restrictions on the use of scheduled messages
```java ```java
// org/apache/rocketmq/store/config/MessageStoreConfig.java // org/apache/rocketmq/store/config/MessageStoreConfig.java
......
...@@ -28,7 +28,7 @@ SQL feature could do some calculation through the properties you put in when sen ...@@ -28,7 +28,7 @@ SQL feature could do some calculation through the properties you put in when sen
------------ ------------
``` ```
## 1. Grammars ## 1.Grammars
RocketMQ only defines some basic grammars to support this feature. You could also extend it easily. RocketMQ only defines some basic grammars to support this feature. You could also extend it easily.
- Numeric comparison, like **>**, **>=**, **<**, **<=**, **BETWEEN**, **=**; - Numeric comparison, like **>**, **>=**, **<**, **<=**, **BETWEEN**, **=**;
...@@ -43,13 +43,13 @@ Constant types are: ...@@ -43,13 +43,13 @@ Constant types are:
- **NULL**, special constant; - **NULL**, special constant;
- Boolean, **TRUE** or **FALSE**; - Boolean, **TRUE** or **FALSE**;
## 2. Usage constraints ## 2.Usage constraints
Only push consumer could select messages by SQL92. The interface is: Only push consumer could select messages by SQL92. The interface is:
``` ```
public void subscribe(finalString topic, final MessageSelector messageSelector) public void subscribe(finalString topic, final MessageSelector messageSelector)
``` ```
## 3. Producer example ## 3.Producer example
You can put properties in message through method putUserProperty when sending. You can put properties in message through method putUserProperty when sending.
```java ```java
...@@ -67,7 +67,7 @@ producer.shutdown(); ...@@ -67,7 +67,7 @@ producer.shutdown();
``` ```
## 4. Consumer example ## 4.Consumer example
Use `MessageSelector.bySql` to select messages through SQL when consuming. Use `MessageSelector.bySql` to select messages through SQL when consuming.
......
# Example for ordered messages
RocketMQ provides ordered messages using FIFO order. All related messages need to be sent into the same message queue in an orderly manner.
The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process.
## 1 produce ordered messages
```java
package org.apache.rocketmq.example.order2
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/*
* ordered messages producer
*/
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// sales orders list
List<OrderStep> orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// generate message timestamp
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //message queue is selected by #salesOrderID
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**
* each sales order step
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/**
* to generate ten OrderStep objects for three sales orders:
* #SalesOrder "15103111039L": create, pay, send, finish;
* #SalesOrder "15103111065L": create, pay, finish;
* #SalesOrder "15103117235L": create, pay, finish;
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
//create sales order with orderid="15103111039L"
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("create");
orderList.add(orderDemo);
//create sales order with orderid="15103111065L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("create");
orderList.add(orderDemo);
//pay sales order #"15103111039L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("pay");
orderList.add(orderDemo);
//create sales order with orderid="15103117235L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("create");
orderList.add(orderDemo);
//pay sales order #"15103111065L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("pay");
orderList.add(orderDemo);
//pay sales order #"15103117235L"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("pay");
orderList.add(orderDemo);
//mark sales order #"15103111065L" as "finish"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("finish");
orderList.add(orderDemo);
//mark mark sales order #"15103111039L" as "send"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("send");
orderList.add(orderDemo);
////mark sales order #"15103117235L" as "finish"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("finish");
orderList.add(orderDemo);
//mark sales order #"15103111039L" as "finish"
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("finish");
orderList.add(orderDemo);
return orderList;
}
}
```
## 2 Consume ordered messages
```java
package org.apache.rocketmq.example.order2;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* consume messages in order
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**
* when the consumer is first run, the start point of message queue where it can get messages will be set.
* or if it is restarted, it will continue from the last place to get messages.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// one consumer for each message queue, and messages order are kept in a single message queue.
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
```
# Transaction message example # Transaction message example
## 1 Transaction message status ## 1 Transaction message status
There are three states for transactional message: There are three states for transaction message:
1. TransactionStatus.CommitTransaction: commit transaction,it means that allow consumers to consume this message. - TransactionStatus.CommitTransaction: commit transaction, it means that allow consumers to consume this message.
2. TransactionStatus.RollbackTransaction: rollback transaction,it means that the message will be deleted and not allowed to consume. - TransactionStatus.RollbackTransaction: rollback transaction, it means that the message will be deleted and not allowed to consume.
3. TransactionStatus.Unknown: intermediate state,it means that MQ is needed to check back to determine the status. - TransactionStatus.Unknown: intermediate state, it means that MQ is needed to check back to determine the status.
## 2 Send transactional message example ## 2 Send transactional message example
### 2.1 Create the transactional producer ### 2.1 Create the transactional producer
Use ```TransactionMQProducer```class to create producer client, and specify a unique ```ProducerGroup```, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution resultand the reply status is described in the above section. Use ```TransactionMQProducer```class to create producer client, and specify a unique ```ProducerGroup```, and you can set up a custom thread pool to process check requests. After executing the local transaction, you need to reply to MQ according to the execution result, and the reply status is described in the above section.
```java ```java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
...@@ -88,7 +88,7 @@ public class TransactionListenerImpl implements TransactionListener { ...@@ -88,7 +88,7 @@ public class TransactionListenerImpl implements TransactionListener {
## 3 Usage Constraint ## 3 Usage Constraint
1. Messages of the transactional have no schedule and batch support. 1. Messages of the transactional have no schedule and batch support.
2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the ```transactionCheckMax``` parameter in the configuration of the broker, if one message has been checked over ```transactionCheckMax``` times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the ```AbstractTransactionCheckListener``` class. 2. In order to avoid a single message being checked too many times and lead to half queue message accumulation, we limited the number of checks for a single message to 15 times by default, but users can change this limit by change the ```transactionCheckMax``` parameter in the configuration of the broker, if one message has been checked over ```transactionCheckMax``` times, broker will discard this message and print an error log at the same time by default. Users can change this behavior by override the ```AbstractTransactionCheckListener``` class.
3. A transactional message will be checked after a certain period of time that determined by parameter ```transactionTimeout``` in the configuration of the broker. And users also can change this limit by set user property “CHECK_IMMUNITY_TIME_IN_SECONDS” when sending transactional message, this parameter takes precedence over the “transactionMsgTimeout” parameter. 3. A transactional message will be checked after a certain period of time that determined by parameter ```transactionTimeout``` in the configuration of the broker. And users also can change this limit by set user property “CHECK_IMMUNITY_TIME_IN_SECONDS” when sending transactional message, this parameter takes precedence over the “transactionMsgTimeout” parameter.
4. A transactional message maybe checked or consumed more than once. 4. A transactional message maybe checked or consumed more than once.
5. Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism. 5. Committed message reput to the user’s target topic may fail. Currently, it depends on the log record. High availability is ensured by the high availability mechanism of RocketMQ itself. If you want to ensure that the transactional message isn’t lost and the transaction integrity is guaranteed, it is recommended to use synchronous double write. mechanism.
......
# Frequently Asked Questions
The following questions are frequently asked with regard to the RocketMQ project in general.
## General
### 1. Why did we create rocketmq project instead of selecting other products?
Please refer to [Why RocketMQ](http://rocketmq.apache.org/docs/motivation)
### 2. Do I have to install other softeware, such as zookeeper, to use RocketMQ?
No. RocketMQ can run independently.
## Usage
### 1. Where does the newly created Consumer ID start consuming messages?
1. If the topic sends a message within three days, then the consumer start consuming messages from the first message saved in the server.
2. If the topic sends a message three days ago, the consumer starts to consume messages from the latest message in the server, in other words, starting from the tail of message queue.
3. If such consumer is rebooted, then it starts to consume messages from the last consumption location.
### 2. How to reconsume message when consumption fails?
1. Cluster consumption pattern, The consumer business logic code returns Action.ReconsumerLater, NULL, or throws an exception, if a message failed to be consumed, it will retry for up to 16 times, after that, the message would be descarded.
2. Broadcast consumption patternThe broadcaset consumption still ensures that a message is consumered at least once, but no resend option is provided.
### 3. How to query the failed message if there is a consumption failure?
1. Using topic query by time, you can query messages within a period of time.
2. Using Topic and Message Id to accurately query the message.
3. Using Topic and Message Key accurately query a class of messages with the same Message Key.
### 4. Are messages delivered exactly once?
RocketMQ ensures that all messages are delivered at least once. In most cases, the messages are not repeated.
### 5. How to add a new broker?
1. Start up a new broker and register it to the same list of name servers.
2. By default, only internal system topics and consumer groups are created automatically. If you would like to have your business topic and consumer groups on the new node, please replicate them from the existing broker. Admin tool and command lines are provided to handle this.
## Configuration related
The following answers are all default values and can be modified by configuration.
### 1. How long are the messages saved on the server?
Stored messages will be saved for up to 3 days, and messages that are not consumed for more than 3 days will be deleted.
### 2. What is the size limit for message Body?
Generally 256KB.
### 3. How to set the number of consumer threads?
When you start Consumer, set a ConsumeThreadNums property, example is as follows:
```
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(20);
```
## Errors
### 1. If you start a producer or consumer failed and the error message is producer group or consumer repeat.
Reason:Using the same Producer /Consumer Group to launch multiple instances of Producer/Consumer in the same JVM may cause the client fail to start.
Solution: Make sure that a JVM corresponding to one Producer /Consumer Group starts only with one Producer/Consumer instance.
### 2. Consumer failed to start loading json file in broadcast mode.
Reason: Fastjson version is too low to allow the broadcast consumer to load local offsets.json, causing the consumer boot failure. Damaged fastjson file can also cause the same problem.
Solution: Fastjson version has to be upgraded to rocketmq client dependent version to ensure that the local offsets.json can be loaded. By default offsets.json file is in /home/{user}/.rocketmq_offsets. Or check the integrity of fastjson.
### 3. What is the impact of a broker crash.
1. Master crashes
Messages can no longer be sent to this broker set, but if you have another broker set available, messages can still be sent given the topic is present. Messages can still be consumed from slaves.
2. Some slave crash
As long as there is another working slave, there will be no impact on sending messages. There will also be no impact on consuming messages except when the consumer group is set to consume from this slave preferably. By default, comsumer group consumes from master.
3. All slaves crash
There will be no impact on sending messages to master, but, if the master is SYNC_MASTER, producer will get a SLAVE_NOT_AVAILABLE indicating that the message is not sent to any slaves. There will also be no impact on consuming messages except that if the consumer group is set to consume from slave preferably. By default, comsumer group consumes from master.
### 4. Producer complains “No Topic Route Info”, how to diagnose?
This happens when you are trying to send messages to a topic whose routing info is not available to the producer.
1. Make sure that the producer can connect to a name server and is capable of fetching routing meta info from it.
2. Make sure that name servers do contain routing meta info of the topic. You may query the routing meta info from name server through topicRoute using admin tools or web console.
3. Make sure that your brokers are sending heartbeats to the same list of name servers your producer is connecting to.
4. Make sure that the topic’s permssion is 6(rw-), or at least 2(-w-).
If you can’t find this topic, create it on a broker via admin tools command updateTopic or web console.
# Features
## Subscribe and Publish
Message publication refers to that a producer sends messages to a topic; Message subscription means a consumer follows a topic with certain tags and then consumes data from that topic.
## Message Ordering
Message ordering refers to that a group of messages can be consumed orderly as they are published. For example, an order generates three messages: order creation, order payment, and order completion. It only makes sense to consume them in their generated order, but orders can be consumed in parallel at the same time. RocketMQ can strictly guarantee these messages are in order.
Orderly message are divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.
- Global message ordering:
For a given Topic, all messages are published and consumed in strict first-in-first-out (FIFO) order.
Applicable scenario: the performance requirement is not high, and all messages are published and consumed according to FIFO principle strictly.
- Partitioned message ordering:
For a given Topic, all messages are partitioned according to sharding key. Messages within the same partition are published and consumed in strict FIFO order. Sharding key is the key field to distinguish message's partition, which is a completely different concept from the key of ordinary messages.
Applicable scenario: high performance requirement, with sharding key as the partition field, messages within the same partition are published and consumed according to FIFO principle strictly.
## Message Filter
Consumers of RocketMQ can filter messages based on tags as well as support for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.
## Message Reliability
RocketMQ supports high reliability of messages in several situations:
1) Broker shutdown normally
2) Broker abnormal crash
3) OS Crash
4) The machine is out of power, but it can be recovered immediately
5) The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)
6) Disk equipment damaged
In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous)
5) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.
## At Least Once
At least Once refers to that every message will be delivered at least once. RocketMQ supports this feature because the Consumer pulls the message locally and does not send an ack back to the server until it has consumed it.
## Backtracking Consumption
Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failure, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.
## Transactional Message
RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or fail simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.
## Scheduled Message
Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.
The broker has a configuration item, messageDelayLevel, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom messageDelayLevel. Note that messageDelayLevel is a broker's property rather than a topic's. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:
- level == 0, The message is not a delayed message
- 1<=level<=maxLevel, Message delay specific time, such as level==1, delay for 1s
- level > maxLevel, than level== maxLevel, such as level==20, delay for 2h
Scheduled messages are temporarily saved in a topic named SCHEDULE_TOPIC_XXXX, and saved in a specific queue according to delayTimeLevel, queueId = delayTimeLevel - 1, that is, only messages with the same delay are saved in a queue, ensuring that messages with the same sending delay can be consumed orderly. The broker consumes SCHEDULE_TOPIC_XXXX on schedule and writes messages to the real topic.
Note that Scheduled messages are counted both the first time they are written and the time they are scheduled to be written to the real topic, so both the send number and the TPS are increased.
## Message Retry
When the Consumer fails to consume the message, a retry mechanism is needed to make the message to be consumed again. Consumer's consume failure can usually be classified as follows:
- due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
- due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.
RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.
## Message Resend
When a producer sends a message, the synchronous message will be resent if fails, the asynchronous message will retry and oneway message is without any guarantee. Message resend ensures that messages are sent successfully and without lost as much as possible, but it can lead to message duplication, which is an unavoidable problem in RocketMQ. Under normal circumstances, message duplication will not occur, but when there is a large number of messages and network jitter, message duplication will be a high-probability event. In addition, producer initiative messages resend and the consumer load changes will also result in duplicate messages. The message retry policy can be set as follows:
- retryTimesWhenSendFailed: Synchronous message retry times when send failed, default value is 2, so the producer will try to send retryTimesWhenSendFailed + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
- retryTimesWhenSendAsyncFailed: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
- retryAnotherBrokerWhenNotStoreOK: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.
## Flow Control
Producer flow control, because broker processing capacity reaches a bottleneck; Consumer flow control, because the consumption capacity reaches a bottleneck.
Producer flow control:
- When commitLog file locked time exceeds osPageCacheBusyTimeOutMills, default value of osPageCacheBusyTimeOutMills is 1000 ms, then return flow control.
- If transientStorePoolEnable == true, and the broker is asynchronous flush disk type, and resources are insufficient in the transientStorePool, reject the current send request and return flow control.
- The broker checks the head request wait time of the send request queue every 10ms. If the wait time exceeds waitTimeMillsInSendQueue, which default value is 200ms, the current send request is rejected and the flow control is returned.
- The broker implements flow control by rejecting send requests.
Consumer flow control:
- When consumer local cache messages number exceeds pullThresholdForQueue, default value is 1000.
- When consumer local cache messages size exceeds pullThresholdSizeForQueue, default value is 100MB.
- When consumer local cache messages span exceeds consumeConcurrentlyMaxSpan, default value is 2000.
The result of consumer flow control is to reduce the pull frequency.
## Dead Letter Queue
Dead letter queue is used to deal messages that cannot be consumed normally. When a message is consumed failed at first time, the message queue will automatically resend the message. If the consumption still fails after the maximum number retry, it indicates that the consumer cannot properly consume the message under normal circumstances. At this time, the message queue will not immediately abandon the message, but send it to the special queue corresponding to the consumer.
RocketMQ defines the messages that could not be consumed under normal circumstances as Dead-Letter Messages, and the special queue in which the Dead-Letter Messages are saved as Dead-Letter Queues. In RocketMQ, the consumer instance can consume again by resending messages in the Dead-Letter Queue using console.
\ No newline at end of file
# 3 Broker
## 3.1 Broker Role
Broker Role is ASYNC_MASTER, SYNC_MASTER or SLAVE. If you cannot tolerate message missing, we suggest you deploy SYNC_MASTER and attach a SLAVE to it. If you feel ok about missing, but you want the Broker to be always available, you may deploy ASYNC_MASTER with SLAVE. If you just want to make it easy, you may only need a ASYNC_MASTER without SLAVE.
## 3.2 FlushDiskType
ASYNC_FLUSH is recommended, for SYNC_FLUSH is expensive and will cause too much performance loss. If you want reliability, we recommend you use SYNC_MASTER with SLAVE.
## 3.3 Broker Configuration
| Parameter name | Default | Description |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
| listenPort | 10911 | listen port for client |
| namesrvAddr | null | name server address |
| brokerIP1 | InetAddress for network interface | Should be configured if having multiple addresses |
| brokerIP2 | InetAddress for network interface | If configured for the Master broker in the Master/Slave cluster, slave broker will connect to this port for data synchronization |
| brokerName | null | broker name |
| brokerClusterName | DefaultCluster | this broker belongs to which cluster |
| brokerId | 0 | broker id, 0 means master, positive integers mean slave |
| storePathCommitLog | $HOME/store/commitlog/ | file path for commit log |
| storePathConsumerQueue | $HOME/store/consumequeue/ | file path for consume queue |
| mapedFileSizeCommitLog | 1024 * 1024 * 1024(1G) | mapped file size for commit log |​
| deleteWhen | 04 | When to delete the commitlog which is out of the reserve time |​
| fileReserverdTime | 72 | The number of hours to keep a commitlog before deleting it |​
| brokerRole | ASYNC_MASTER | SYNC_MASTER/ASYNC_MASTER/SLAVE |​
| flushDiskType | ASYNC_FLUSH | {SYNC_FLUSH/ASYNC_FLUSH}. Broker of SYNC_FLUSH mode flushes each message onto disk before acknowledging producer. Broker of ASYNC_FLUSH mode, on the other hand, takes advantage of group-committing, achieving better performance. |​
\ No newline at end of file
## Consumer
----
### 2.1 Consumption process idempotent
RocketMQ cannot avoid Exactly-Once, so if the business is very sensitive to consumption repetition, it is important to perform deduplication at the business level. Deduplication can be done with a relational database. First, you need to determine the unique key of the message, which can be either msgId or a unique identifier field in the message content, such as the order Id. Determine if a unique key exists in the relational database before consumption. If it does not exist, insert it and consume it, otherwise skip it. (The actual process should consider the atomic problem, determine whether there is an attempt to insert, if the primary key conflicts, the insertion fails, skip directly)
### 2.2 Slow message processing
#### 2.2.1 Increase consumption parallelism
Most messages consumption behaviors are IO-intensive, That is, it may be to operate the database, or call RPC. The consumption speed of such consumption behavior lies in the throughput of the back-end database or the external system. By increasing the consumption parallelism, the total consumption throughput can be increased, but the degree of parallelism is increased to a certain extent. Instead it will fall.Therefore, the application must set a reasonable degree of parallelism. There are several ways to modify the degree of parallelism of consumption as follows:
* Under the same ConsumerGroup, increase the degree of parallelism by increasing the number of Consumer instances (note that the Consumer instance that exceeds the number of subscription queues is invalid). Can be done by adding machines, or by starting multiple processes on an existing machine.
* Improve the consumption parallel thread of a single Consumer by modifying the parameters consumeThreadMin and consumeThreadMax.
#### 2.2.2 Batch mode consumption
Some business processes can increase consumption throughput to a large extent if they support batch mode consumption. For example, order deduction application, it takes 1s to process one order at a time, and it takes only 2s to process 10 orders at a time. In this way, the throughput of consumption can be greatly improved. By setting the consumer's consumeMessageBatchMaxSize to return a parameter, the default is 1, that is, only one message is consumed at a time, for example, set to N, then the number of messages consumed each time is less than or equal to N.
#### 2.2.3 Skip non-critical messages
When a message is accumulated, if the consumption speed cannot keep up with the transmission speed, if the service does not require high data, you can choose to discard the unimportant message. For example, when the number of messages in a queue is more than 100,000 , try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages. The sample code is as follows:
```java
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
long offest = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if(diff > 100000){
//TODO Special handling of message accumulation
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//TODO Normal consumption process
return ConcumeConcurrentlyStatus.CONSUME_SUCCESS;
}
```
#### 2.2.4 Optimize each message consumption process
For example, the consumption process of a message is as follows:
* Query from DB according to the message [data 1]
* Query from DB according to the message [data 2]
* Complex business calculations
* Insert [Data 3] into the DB
* Insert [Data 4] into the DB
There are 4 interactions with the DB in the consumption process of this message. If it is calculated by 5ms each time, it takes a total of 20ms. If the business calculation takes 5ms, then the total time is 25ms, So if you can optimize 4 DB interactions to 2 times, the total time can be optimized to 15ms, which means the overall performance is increased by 40%. Therefore, if the application is sensitive to delay, the DB can be deployed on the SSD hard disk. Compared with the SCSI disk, the former RT will be much smaller.
### 2.3 Print Log
If the amount of messages is small, it is recommended to print the message in the consumption entry method, consume time, etc., to facilitate subsequent troubleshooting.
```java
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
//TODO Normal consumption process
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
```
If you can print the time spent on each message, it will be more convenient when troubleshooting online problems such as slow consumption.
### 2.4 Other consumption suggestions
#### 2.4.1、Consumer Group and Subscriptions
The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.
#### 2.4.2、Orderly
The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.
#### 2.4.3、Concurrently
As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.
#### 2.4.4、Consume Status
For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.
#### 2.4.5、Blocking
It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process.
#### 2.4.6、Thread Number
The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.
#### 2.4.7、ConsumeFromWhere
When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
### Producer
----
##### 1 Message Sending Tips
###### 1.1 The Use of Tags
One application instance should use one topic as much as possible and the subtype of messages can be marked by tags. Tag provides extra flexibility to users. In the consume subscribing process, the messages filtering can only be handled by using tags when the tags are specified in the message sending process: `message.setTags("TagA")`
###### 1.2 The Use of Keys
A business key can be set in one message and it will be easier to look up the message on a broker server to diagnose issues during development. Each message will be created index(hash index) by server, instance can query the content of this message by topic and key and who consumes the message.Because of the hash index, make sure that the key should be unique in order to avoid potential hash index conflict.
``` java
// Order Id
String orderId = "20034568923546";
message.setKeys(orderId);
```
###### 1.3 The Log Print
When sending a message,no matter success or fail, a message log must be printed which contains SendResult and Key. It is assumed that we will always get SEND_OK if no exception is thrown. Below is a list of descriptions about each status:
* SEND_OK
SEND_OK means sending message successfully. SEND_OK does not mean it is reliable. To make sure no message would be lost, you should also enable SYNC_MASTER or SYNC_FLUSH.
* FLUSH_DISK_TIMEOUT
FLUSH_DISK_TIMEOUT means sending message successfully but the Broker flushing the disk with timeout. In this kind of condition, the Broker has saved this message in memory, this message will be lost only if the Broker was down. The FlushDiskType and SyncFlushTimeout could be specified in MessageStoreConfig. If the Broker set MessageStoreConfig’s FlushDiskType=SYNC_FLUSH(default is ASYNC_FLUSH), and the Broker doesn’t finish flushing the disk within MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.
* FLUSH_SLAVE_TIMEOUT
FLUSH_SLAVE_TIMEOUT means sending messages successfully but the slave Broker does not finish synchronizing with the master. If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), and the slave Broker doesn’t finish synchronizing with the master within the MessageStoreConfig’s syncFlushTimeout(default is 5 secs), you will get this status.
* SLAVE_NOT_AVAILABLE
SLAVE_NOT_AVAILABLE means sending messages successfully but no slave Broker configured. If the Broker’s role is SYNC_MASTER(default is ASYNC_MASTER), but no slave Broker is configured, you will get this status.
##### 2 Operations on Message Sending failed
The send method of Producer can be retried, the retry process is illustrated below:
* The method will retry at most 2 times(2 times in synchronous mode, 0 times in asynchronous mode).
* If sending failed, it will turn to the next Broker. This strategy will be executed when the total costing time is less then sendMsgTimeout(default is 10 seconds).
* The retry method will be terminated if timeout exception was thrown when sending messages to Broker.
The strategy above could make sure message sending successfully to a certain degree. Some more retry strategies, such as we could try to save the message to database if calling the send synchronous method failed and then retry by background thread's timed tasks, which will make sure the message is sent to Broker,could be improved if asking for high reliability business requirement.
The reasons why the retry strategy using database have not integrated by the RocketMQ client will be explained below: Firstly, the design mode of the RocketMQ client is stateless mode. It means that the client is designed to be horizontally scalable at each level and the consumption of the client to physical resources is only CPU, memory and network. Then, if a key-value memory module is integrated by the client itself, the Asyn-Saving strategy will be utilized in consideration of the high resource consumption of the Syn-Saving strategy. However, given that operations staff does not manage the client shutoff, some special commands, such as kill -9, may be used which will lead to the lost of message because of no saving in time. Furthermore, the physical resource running Producer is not appropriate to save some significant data because of low reliability. Above all, the retry process should be controlled by program itself.
##### 3 Send Messages in One-way Mode
The message sending is usually a process like below:
* Client sends request to sever.
* Sever handles request
* Sever returns response to client
The total costing time of sending one message is the sum of costing time of three steps above. Some situations demand that total costing time must be in a quite low level, however, do not take reliable performance into consideration, such as log collection. This kind of application could be called in one-way mode, which means client sends request but not wait for response. In this kind of mode, the cost of sending request is only a call of system operation which means one operation writing data to client socket buffer. Generally, the time cost of this process will be controlled n microseconds level.
\ No newline at end of file
# Message Trace
## Introduction
This document focuses on how to quickly deploy and use RocketMQ clusters that support message trace features
## 1. Key Attributes of Message Trace Data
| Producer | Consumer | Broker |
| ---------------- | ----------------- | ------------ |
| production instance information | consumption instance information | message Topic |
| send message time | post time, post round | message storage location |
| whether the message was sent successfully | Whether the message was successfully consumed | The Key of the message |
| Time spent sending | Time spent consuming | Tag of the message |
## 2. Support for Message Trace Cluster Deployment
### 2.1 Broker Configuration Fille
The properties profile content of the Broker side enabled message trace feature is pasted here:
```
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if msg tracing is open,the flag will be true
traceTopicEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876
```
### 2.2 Normal Mode
Each Broker node in the RocketMQ cluster is used to store message trace data collected and sent from the Client.Therefore, there are no requirements or restrictions on the number of Broker nodes in the RocketMQ cluster.
### 2.3 Physical IO Isolation Mode
For scenarios with large amount of trace message data , one of the Broker nodes in the RocketMQ cluster can be selected to store the trace message , so that the common message data of the user and the physical IO of the trace message data are completely isolated from each other.In this mode, there are at least two Broker nodes in the RockeMQ cluster, one of which is defined as the server on which message trace data is stored.
### 2.4 Start the Broker that Starts the MessageTrace
`nohup sh mqbroker -c ../conf/2m-noslave/broker-a.properties &`
## 3. Save the Topic Definition of Message Trace
RocketMQ's message trace feature supports two ways to store trace data:
### 3.1 System-level TraceTopic
By default, message track data is stored in the system-level TraceTopic(names:**RMQ_SYS_TRACE_TOPIC**)。This Topic is automatically created when the Broker node is started(As described above, the switch variable **traceTopicEnable** needs to be set to **true** in the Broker configuration file)。
### 3.2 Custom TraceTopic
If the user is not prepared to store the message track data in the system-level default TraceTopic, you can also define and create a user-level Topic to save the track (that is, to create a regular Topic to save the message track data)。The following section introduces how the Client interface supports the user-defined TraceTopic.
## 4. Client Practices that Support Message Trace
In order to reduce as much as possible the transformation work of RocketMQ message trace feature used in the user service system, the author added a switch parameter (**enableMsgTrace**) to the original interface in the design to realize whether the message trace is opened or not.
### 4.1 Opening the Message Trace when Sending the Message
```
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
producer.setNamesrvAddr("XX.XX.XX.XX1");
producer.start();
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
```
### 4.2 Opening Message Trace whenSubscribing to a Message
```
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setConsumeTimestamp("20181109221800");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
```
### 4.3 Support for Custom Storage Message Trace Topic
The initialization of `DefaultMQProducer` and `DefaultMQPushConsumer` instances can be changed to support the custom storage message trace Topic as follows when sending and subscriving messages above.
```
##Where Topic_test11111 needs to be pre-created by the user to save the message trace;
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true,"Topic_test11111");
......
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true,"Topic_test11111");
......
```
\ No newline at end of file
# Operation FAQ
## 1. RocketMQ's mqadmin command error.
> Problem: Sometimes after deploying the RocketMQ cluster, when you try to execute some commands of "mqadmin", the following exception will appear:
>
> ```java
> org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
> ```
Solution: Execute `export NAMESRV_ADDR=ip:9876` (ip refers to the address of NameServer deployed in the cluster) on the VM that deploys the RocketMQ cluster.Then you will execute commands of "mqadmin" successfully.
## 2. The inconsistent version of RocketMQ between the producer and consumer leads to the problem that message can't be consumed normally.
> Problem: The same producer sends a message, consumer A can consume, but consumer B can't consume, and the RocketMQ Console appears:
>
> ```java
> Not found the consumer group consume stats, because return offset table is empty, maybe the consumer not consume any message
> ```
Solution: The jar package of RocketMQ, such as rocketmq-client, should be the same version on the consumer and producer.
## 3. When adding a new topic consumer group, historical messages can't be consumed.
> Problem: When a new consumer group of the same topic is started, the consumed message is the current offset message, and the historical message is not obtained.
Solution: The default policy of rocketmq is to start from the end of the message queue and skip the historical message. If you want to consume historical message, you need to set:
```java
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#setConsumeFromWhere
```
There are three common configurations:
- By default, a new subscription group starts to consume from the end of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to skip the historical message.
```java
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
```
- A new subscription group starts to consume from the head of the queue for the first time, and then restarts and continue to consume from the last consume position, that is, to consume the historical message that is not expired on Broker.
```java
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
```
- A new subscription group starts to consume from the specified time point for the first time, and then restarts and continue to consume from the last consume position. It is used together with `consumer.setConsumeTimestamp()`. The default is half an hour ago.
```java
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
```
## 4. How to enable reading data from Slave
In some cases, the Consumer needs to reset the consume position to 1-2 days ago. At this time, on the Master Broker with limited memory, the CommitLog will carry a relatively heavy IO pressure, affecting the reading and writing of other messages on that Broker. You can enable `slaveReadEnable=true`. When Master Broker finds that the difference between the Consumer's consume position and the latest value of CommitLog exceeds the percentage of machine's memory (`accessMessageInMemoryMaxRatio=40%`), it will recommend Consumer to read from Slave Broker and relieve Master Broker's IO.
## 5. Performance
Asynchronous flush disk is recommended to use spin lock.
Synchronous flush disk is recommended to use reentrant lock. Adjust the Broker configuration item `useReentrantLockWhenPutMessage`, and the default value is false.
Asynchronous flush disk is recommended to open `TransientStorePoolEnable` and close `transferMsgByHeap` to improve the efficiency of pulling message;
Synchronous flush disk is recommended to increase the `sendMessageThreadPoolNums` appropriately. The specific configuration needs to be tested.
## 6. The meaning and difference between msgId and offsetMsgId in RocketMQ
After sending message with RocketMQ, you will usually see the following log:
```java
SendResult [sendStatus=SEND_OK, msgId=0A42333A0DC818B4AAC246C290FD0000, offsetMsgId=0A42333A00002A9F000000000134F1F5, messageQueue=MessageQueue [topic=topicTest1, BrokerName=mac.local, queueId=3], queueOffset=4]
```
- msgId,for the client, the msgId is generated by the producer instance. Specifically, the method `MessageClientIDSetter.createUniqIDBuffer()` is called to generate a unique Id.
- offsetMsgId, offsetMsgId is generated by the Broker server when writing a message ( string concating "IP address + port" and "CommitLog's physical offset address"), and offsetMsgId is the messageId used to query in the RocketMQ console.
# Access control list
## Overview
This document focuses on how to quickly deploy and use a RocketMQ cluster that supports the privilege control feature.
## 1. Access control features
Access Control (ACL) mainly provides Topic resource level user access control for RocketMQ.If you want to enable RocketMQ permission control, you can inject the AccessKey and SecretKey signatures through the RPCHook on the Client side.And then, the corresponding permission control attributes (including Topic access rights, IP whitelist and AccessKey and SecretKey signature) are set in the configuration file of distribution/conf/plain_acl.yml.The Broker side will check the permissions owned by the AccessKey, and if the verification fails, an exception is thrown;
The source code about ACL on the Client side can be find in **org.apache.rocketmq.example.simple.AclClient.java**
## 2. Access control definition and attribute values
### 2.1 Access control definition
The definition of Topic resource access control for RocketMQ is mainly as shown in the following table.
| Permission | explanation |
| --- | --- |
| DENY | permission deny |
| ANY | PUB or SUB permission |
| PUB | Publishing permission |
| SUB | Subscription permission |
### 2.2 Main properties
| key | value | explanation |
| --- | --- | --- |
| globalWhiteRemoteAddresses | string |Global IP whitelist,example:<br>\*; <br>192.168.\*.\*; <br>192.168.0.1 |
| accessKey | string | Access Key |
| secretKey | string | Secret Key |
| whiteRemoteAddress | string | User IP whitelist,example:<br>\*; <br>192.168.\*.\*; <br>192.168.0.1 |
| admin | true;false | Whether an administrator account |
| defaultTopicPerm | DENY;PUB;SUB;PUB\|SUB | Default Topic permission |
| defaultGroupPerm | DENY;PUB;SUB;PUB\|SUB | Default ConsumerGroup permission |
| topicPerms | topic=permission | Topic only permission |
| groupPerms | group=permission | ConsumerGroup only permission |
For details, please refer to the **distribution/conf/plain_acl.yml** configuration file.
## 3. Cluster deployment with permission control
After defining the permission attribute in the **distribution/conf/plain_acl.yml** configuration file as described above, open the **aclEnable** switch variable to enable the ACL feature of the RocketMQ cluster.The configuration file of the ACL feature enabled on the broker is as follows:
```properties
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/rootdir-a-m
storePathCommitLog=/data/rocketmq/commitlog-a-m
autoCreateSubscriptionGroup=true
## if acl is open,the flag will be true
aclEnable=true
listenPort=10911
brokerIP1=XX.XX.XX.XX1
namesrvAddr=XX.XX.XX.XX:9876
```
## 4. Main process of access control
The main ACL process is divided into two parts, including privilege resolution and privilege check.
### 4.1 Privilege resolution
The Broker side parses the client's RequestCommand request and obtains the attribute field that needs to be authenticated.
main attributes:
(1) AccessKey:Similar to the user name, on behalf of the user entity, the permission data corresponds to it;
(2) Signature:The client obtains the string according to the signature of the SecretKey, and the server uses the SecretKey to perform signature verification.
### 4.2 Privilege check
The check logic of the right side of the broker is mainly divided into the following steps:
(1) Check if the global IP whitelist is hit; if yes, the check passes; otherwise, go to step (2);
(2) Check if the user IP whitelist is hit; if yes, the check passes; otherwise, go to step (3);
(3) Check the signature, if the verification fails, throw an exception; if the verification passes, go to step (4);
(4) Check the permissions required by the user request and the permissions owned by the user; if not, throw an exception;
The verification of the required permissions of the user requires attention to the following points:
(1) Special requests such as UPDATE_AND_CREATE_TOPIC can only be operated by the admin account;
(2) For a resource, if there is explicit configuration permission, the configured permission is used; if there is no explicit configuration permission, the default permission is adopted;
## 5. Hot loading modified Access control
The default implementation of RocketrMQ's permission control store is based on the yml configuration file. Users can dynamically modify the properties defined by the permission control without restarting the Broker service node.
# Architecture design
## Technology Architecture
![](image/rocketmq_architecture_1.png)
The RocketMQ architecture is divided into four parts, as shown in the figure above:
- Producer:The role of message publishing supports distributed cluster mode deployment. Producer selects the corresponding Broker cluster queue for message delivery through MQ's load balancing module. The delivery process supports fast failure and low latency.
- Consumer:The role of message consumption supports distributed cluster deployment. Support push, pull two modes to consume messages. It also supports cluster mode and broadcast mode consumption, and it provides a real-time message subscription mechanism to meet the needs of most users.
- NameServer:NameServer is a very simple Topic routing registry with a role similar to ZooKeeper in Dubbo, which supports dynamic registration and discovery of Broker. It mainly includes two functions: Broker management, NameServer accepts the registration information of the Broker cluster and saves it as the basic data of the routing information. Then provide a heartbeat detection mechanism to check whether the broker is still alive; routing information management, each NameServer will save the entire routing information about the Broker cluster and the queue information for the client query. Then the Producer and Conumser can know the routing information of the entire Broker cluster through the NameServer, so as to deliver and consume the message. The NameServer is usually deployed in a cluster mode, and each instance does not communicate with each other. Broker registers its own routing information with each NameServer, so each NameServer instance stores a complete routing information. When a NameServer is offline for some reason, the Broker can still synchronize its routing information with other NameServers. The Producer and Consumer can still dynamically sense the information of the Broker's routing.
- BrokerServer:Broker is responsible for the storage, delivery and query of messages and high availability guarantees. In order to achieve these functions, Broker includes the following important sub-modules.
1. Remoting Module:The entire broker entity handles requests from the clients side.
2. Client Manager:Topic subscription information for managing the client (Producer/Consumer) and maintaining the Consumer
3. Store Service:Provides a convenient and simple API interface for handling message storage to physical hard disks and query functions.
4. HA Service:Highly available service that provides data synchronization between Master Broker and Slave Broker.
5. Index Service:The message delivered to the Broker is indexed according to a specific Message key to provide a quick query of the message.
![](image/rocketmq_architecture_2.png)
## Deployment architecture
![](image/rocketmq_architecture_3.png)
### RocketMQ Network deployment features
- NameServer is an almost stateless node that can be deployed in a cluster without any information synchronization between nodes.
- The broker deployment is relatively complex. The Broker is divided into the Master and the Slave. One Master can correspond to multiple Slaves. However, one Slave can only correspond to one Master. The correspondence between the Master and the Slave is defined by specifying the same BrokerName and different BrokerId. The BrokerId is 0. Indicates Master, non-zero means Slave. The Master can also deploy multiple. Each broker establishes a long connection with all nodes in the NameServer cluster, and periodically registers Topic information to all NameServers. Note: The current RocketMQ version supports a Master Multi Slave on the deployment architecture, but only the slave server with BrokerId=1 will participate in the read load of the message.
- The Producer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically obtains Topic routing information from the NameServer, and establishes a long connection to the Master that provides the Topic service, and periodically sends a heartbeat to the Master. Producer is completely stateless and can be deployed in a cluster.
- The Consumer establishes a long connection with one of the nodes in the NameServer cluster (randomly selected), periodically obtains Topic routing information from the NameServer, and establishes a long connection to the Master and Slave that provides the Topic service, and periodically sends heartbeats to the Master and Slave. The Consumer can subscribe to the message from the Master or subscribe to the message from the Slave. When the consumer pulls the message to the Master, the Master server will generate a read according to the distance between the offset and the maximum offset. I/O), and whether the server is readable or not, the next time it is recommended to pull from the Master or Slave.
Describe the cluster workflow in conjunction with the deployment architecture diagram:
- Start the NameServer, listen to the port after the NameServer, and wait for the Broker, Producer, and Consumer to connect, which is equivalent to a routing control center.
- The Broker starts, keeps a long connection with all NameServers, and sends heartbeat packets periodically. The heartbeat packet contains the current broker information (IP+ port, etc.) and stores all Topic information. After the registration is successful, there is a mapping relationship between Topic and Broker in the NameServer cluster.
- Before sending and receiving a message, create a Topic. When creating a Topic, you need to specify which Brokers the Topic should be stored on, or you can automatically create a Topic when sending a message.
- Producer sends a message. When starting, it first establishes a long connection with one of the NameServer clusters, and obtains from the NameServer which Brokers are currently sent by the Topic. Polling selects a queue from the queue list and then establishes with the broker where the queue is located. Long connection to send a message to the broker.
- The Consumer is similar to the Producer. It establishes a long connection with one of the NameServers, obtains which Brokers the current Topic exists on, and then directly establishes a connection channel with the Broker to start consuming messages.
## Design
### 1 Message Store
![](../cn/image/rocketmq_design_1.png)
#### 1.1 The Architecure of Message Store
#### 1.2 PageCache and Memory-Map(Mmap)
#### 1.3 Message Flush
![](../cn/image/rocketmq_design_2.png)
### 2 Communication Mechanism
#### 2.1 The class diagram of Remoting module
![](../cn/image/rocketmq_design_3.png)
#### 2.2 The design of protocol and encode/decode
![](../cn/image/rocketmq_design_4.png)
#### 2.3 The three ways and process of message communication
![](../cn/image/rocketmq_design_5.png)
#### 2.4 The multi-thread design of Reactor
![](../cn/image/rocketmq_design_6.png)
### 3 Message Filter
![](../cn/image/rocketmq_design_7.png)
### 4 LoadBalancing
#### 4.1 The loadBalance of Producer
#### 4.2 The loadBalance of Consumer
![](../cn/image/rocketmq_design_8.png)
![](../cn/image/rocketmq_design_9.png)
### 5 Transactional Message
Apache RocketMQ supports distributed transactional message from version 4.3.0. RocketMQ implements transactional message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.
![](../cn/image/rocketmq_design_10.png)
#### 5.1 The Process of RocketMQ Transactional Message
The picture above shows the overall architecture of transactional message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.
1. The sending of message and Commit/Rollback.
(1) Sending the message(named Half message in RocketMQ)
(2) The server responds the writing result(success or failure) of Half message.
(3) Handle local transaction according to the result(local transaction won't be executed when the result is failure).
(4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).
2. Compensation process
(1) For a transactional message without a Commit/Rollback (means the message in the pending status), a "back-check" request is initiated from the broker.
(2) The Producer receives the "back-check" request and checks the status of the local transaction corresponding to the "back-check" message.
(3) Redo Commit or Rollback based on local transaction status.
The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.
#### 5.2 The design of RocketMQ Transactional Message
1. Transactional message is invisible to users in first phase(commit-request phase)
Upon on the main process of transactional message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.
In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:
![](../cn/image/rocketmq_design_11.png)
The specific implementation strategy of RocketMQ is: if the transactional message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).
2. Commit/Rollback operation and introduction of Op message
After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can't actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of "Op message" is introduced, which means the message has a certain status(Commit or Rollback). If a transactional message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.
3. How Op message stored and the correspondence between Op message and Half message
RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionalMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.
![](../cn/image/rocketmq_design_12.png)
4. Index construction of Half messages
When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.
5. How to handle the message failed in the second phase?
If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called "back-check". The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transactional messages that the status are certain).
RocketMQ does not back-check the status of transactional messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.
### 6 Message Query
#### 6.1 Query messages by messageId
#### 6.2 Query messages by message key
![](../cn/image/rocketmq_design_13.png)
...@@ -78,7 +78,7 @@ public class BatchPutMessageTest { ...@@ -78,7 +78,7 @@ public class BatchPutMessageTest {
} }
@Test @Test
public void testPutMessages() { public void testPutMessages() throws Exception {
List<Message> messages = new ArrayList<>(); List<Message> messages = new ArrayList<>();
String topic = "batch-write-topic"; String topic = "batch-write-topic";
int queue = 0; int queue = 0;
...@@ -111,6 +111,8 @@ public class BatchPutMessageTest { ...@@ -111,6 +111,8 @@ public class BatchPutMessageTest {
PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch); PutMessageResult putMessageResult = messageStore.putMessages(messageExtBatch);
assertThat(putMessageResult.isOk()).isTrue(); assertThat(putMessageResult.isOk()).isTrue();
Thread.sleep(3 * 1000);
for (long i = 0; i < 10; i++) { for (long i = 0; i < 10; i++) {
MessageExt messageExt = messageStore.lookMessageByOffset(msgLengthArr[(int) i]); MessageExt messageExt = messageStore.lookMessageByOffset(msgLengthArr[(int) i]);
......
...@@ -212,6 +212,7 @@ public class ConsumeQueueTest { ...@@ -212,6 +212,7 @@ public class ConsumeQueueTest {
try { try {
try { try {
putMsg(master); putMsg(master);
Thread.sleep(3000L);//wait ConsumeQueue create success.
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
assertThat(Boolean.FALSE).isTrue(); assertThat(Boolean.FALSE).isTrue();
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.tools.command.topic; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.tools.command.topic;
import java.util.Set; import java.util.Set;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.sysflag.TopicSysFlag; import org.apache.rocketmq.common.sysflag.TopicSysFlag;
...@@ -43,13 +44,16 @@ public class UpdateTopicSubCommand implements SubCommand { ...@@ -43,13 +44,16 @@ public class UpdateTopicSubCommand implements SubCommand {
@Override @Override
public Options buildCommandlineOptions(Options options) { public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
opt.setRequired(false); optionGroup.addOption(opt);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "create topic to which cluster"); opt = new Option("c", "clusterName", true, "create topic to which cluster");
opt.setRequired(false); optionGroup.addOption(opt);
options.addOption(opt);
optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);
opt = new Option("t", "topic", true, "topic name"); opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true); opt.setRequired(true);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册