提交 a28a8eb6 编写于 作者: K kavin

Merge remote-tracking branch 'upstream/develop' into develop

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl.consumer;
import org.apache.rocketmq.client.consumer.*;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.common.stats.StatsItem;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConsumeMessageConcurrentlyServiceTest {
private String consumerGroup;
private String topic = "FooBar";
private String brokerName = "BrokerA";
private MQClientInstance mQClientFactory;
@Mock
private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalancePushImpl rebalancePushImpl;
private DefaultMQPushConsumer pushConsumer;
@Before
public void init() throws Exception {
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.setPullInterval(60 * 1000);
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl();
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10912", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
}
@Test
public void testPullMessage_ConsumeSuccess() throws InterruptedException, RemotingException, MQBrokerException, NoSuchFieldException,Exception {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final MessageExt[] messageExts = new MessageExt[1];
ConsumeMessageConcurrentlyService normalServie = new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
messageExts[0] = msgs.get(0);
countDownLatch.countDown();
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(normalServie);
PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest());
countDownLatch.await();
Thread.sleep(1000);
org.apache.rocketmq.common.protocol.body.ConsumeStatus stats = normalServie.getConsumerStatsManager().consumeStatus(pushConsumer.getDefaultMQPushConsumerImpl().groupName(),topic);
ConsumerStatsManager mgr = normalServie.getConsumerStatsManager();
Field statItmeSetField = mgr.getClass().getDeclaredField("topicAndGroupConsumeOKTPS");
statItmeSetField.setAccessible(true);
StatsItemSet itemSet = (StatsItemSet)statItmeSetField.get(mgr);
StatsItem item = itemSet.getAndCreateStatsItem(topic + "@" + pushConsumer.getDefaultMQPushConsumerImpl().groupName());
assertThat(item.getValue().get()).isGreaterThan(0L);
assertThat(messageExts[0].getTopic()).isEqualTo(topic);
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
@After
public void terminate() {
pushConsumer.shutdown();
}
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(1024);
MessageQueue messageQueue = new MessageQueue();
messageQueue.setBrokerName(brokerName);
messageQueue.setQueueId(0);
messageQueue.setTopic(topic);
pullRequest.setMessageQueue(messageQueue);
ProcessQueue processQueue = new ProcessQueue();
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
pullRequest.setProcessQueue(processQueue);
return pullRequest;
}
private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus,
List<MessageExt> messageExtList) throws Exception {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (MessageExt messageExt : messageExtList) {
outputStream.write(MessageDecoder.encode(messageExt, false));
}
return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray());
}
}
...@@ -79,10 +79,10 @@ public class MomentStatsItemSet { ...@@ -79,10 +79,10 @@ public class MomentStatsItemSet {
if (null == statsItem) { if (null == statsItem) {
statsItem = statsItem =
new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); new MomentStatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
MomentStatsItem prev = this.statsItemTable.put(statsKey, statsItem); MomentStatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null == prev) {
if (null != prev) {
statsItem = prev;
// statsItem.init(); // statsItem.init();
} }
} }
......
...@@ -162,10 +162,10 @@ public class StatsItemSet { ...@@ -162,10 +162,10 @@ public class StatsItemSet {
StatsItem statsItem = this.statsItemTable.get(statsKey); StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) { if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
StatsItem prev = this.statsItemTable.put(statsKey, statsItem); StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null == prev) {
if (null != prev) {
statsItem = prev;
// statsItem.init(); // statsItem.init();
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class QueryCorrectionOffsetBodyTest {
@Test
public void testFromJson() throws Exception {
QueryCorrectionOffsetBody qcob = new QueryCorrectionOffsetBody();
Map<Integer, Long> offsetMap = new HashMap<Integer, Long>();
offsetMap.put(1, 100L);
offsetMap.put(2, 200L);
qcob.setCorrectionOffsets(offsetMap);
String json = RemotingSerializable.toJson(qcob, true);
QueryCorrectionOffsetBody fromJson = RemotingSerializable.fromJson(json, QueryCorrectionOffsetBody.class);
assertThat(fromJson.getCorrectionOffsets().get(1)).isEqualTo(100L);
assertThat(fromJson.getCorrectionOffsets().get(2)).isEqualTo(200L);
assertThat(fromJson.getCorrectionOffsets().size()).isEqualTo(2);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
public class ResetOffsetBodyTest {
@Test
public void testFromJson() throws Exception {
ResetOffsetBody rob = new ResetOffsetBody();
Map<MessageQueue, Long> offsetMap = new HashMap<MessageQueue, Long>();
MessageQueue queue = new MessageQueue();
queue.setQueueId(1);
queue.setBrokerName("brokerName");
queue.setTopic("topic");
offsetMap.put(queue, 100L);
rob.setOffsetTable(offsetMap);
String json = RemotingSerializable.toJson(rob, true);
ResetOffsetBody fromJson = RemotingSerializable.fromJson(json, ResetOffsetBody.class);
assertThat(fromJson.getOffsetTable().get(queue)).isEqualTo(100L);
assertThat(fromJson.getOffsetTable().size()).isEqualTo(1);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat;
public class SubscriptionGroupWrapperTest {
@Test
public void testFromJson(){
SubscriptionGroupWrapper subscriptionGroupWrapper = new SubscriptionGroupWrapper();
ConcurrentHashMap<String, SubscriptionGroupConfig> subscriptions = new ConcurrentHashMap<String, SubscriptionGroupConfig>();
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
subscriptionGroupConfig.setConsumeBroadcastEnable(true);
subscriptionGroupConfig.setBrokerId(1234);
subscriptionGroupConfig.setGroupName("Consumer-group-one");
subscriptions.put("Consumer-group-one", subscriptionGroupConfig);
subscriptionGroupWrapper.setSubscriptionGroupTable(subscriptions);
DataVersion dataVersion = new DataVersion();
dataVersion.nextVersion();
subscriptionGroupWrapper.setDataVersion(dataVersion);
String json = RemotingSerializable.toJson(subscriptionGroupWrapper, true);
SubscriptionGroupWrapper fromJson = RemotingSerializable.fromJson(json, SubscriptionGroupWrapper.class);
assertThat(fromJson.getSubscriptionGroupTable()).containsKey("Consumer-group-one");
assertThat(fromJson.getSubscriptionGroupTable().get("Consumer-group-one").getGroupName()).isEqualTo("Consumer-group-one");
assertThat(fromJson.getSubscriptionGroupTable().get("Consumer-group-one").getBrokerId()).isEqualTo(1234);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.stats;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class StatsItemSetTest {
private ThreadPoolExecutor executor;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Test
public void test_getAndCreateStatsItem_multiThread() throws InterruptedException {
for (int i = 0; i < 50; i++) {
assertEquals(20000L, test_unit().longValue());
}
}
@Test
public void test_getAndCreateMomentStatsItem_multiThread() throws InterruptedException {
for (int i = 0; i < 50; i++) {
assertEquals(10, test_unit_moment().longValue());
}
}
private AtomicLong test_unit() throws InterruptedException {
final StatsItemSet statsItemSet = new StatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(100, 200, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10000; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
statsItemSet.addValue("topicTest", 2, 1);
}
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10000) {
break;
}
Thread.sleep(1000);
}
return statsItemSet.getStatsItem("topicTest").getValue();
}
private AtomicLong test_unit_moment() throws InterruptedException {
final MomentStatsItemSet statsItemSet = new MomentStatsItemSet("topicTest", scheduler, null);
executor = new ThreadPoolExecutor(100, 200, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10000), new ThreadFactoryImpl("testMultiThread"));
for (int i = 0; i < 10000; i++) {
executor.submit(new Runnable() {
@Override
public void run() {
statsItemSet.setValue("test", 10);
}
});
}
while (true) {
if (executor.getCompletedTaskCount() == 10000) {
break;
}
Thread.sleep(1000);
}
return statsItemSet.getAndCreateStatsItem("test").getValue();
}
@After
public void shutdown() {
executor.shutdown();
}
}
\ No newline at end of file
...@@ -25,7 +25,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876 ...@@ -25,7 +25,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
``` ```
- HTTP static server addressing(default) - HTTP static server addressing(default)
After client startedit will access a http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents: After client started, it will access a http static server address, as: <http://jmenv.tbsite.net:8080/rocketmq/nsaddr>, this URL return the following contents:
```text ```text
192.168.0.1:9876;192.168.0.2:9876 192.168.0.1:9876;192.168.0.2:9876
...@@ -38,7 +38,7 @@ HTTP static server addressing is recommended, because it is simple client deploy ...@@ -38,7 +38,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
### Client Configuration ### Client Configuration
```DefaultMQProducer```、```TransactionMQProducer```、```DefaultMQPushConsumer```、```DefaultMQPullConsumer``` all extends the ```ClientConfig``` Class,```ClientConfig``` as the client common configuration class。Client configuration style like getXXX、setXXX, each of the parameters can config by spring and also config their in the code. Such as the ```namesrvAddr``` parameter: ```producer.setNamesrvAddr("192.168.0.1:9876")```, same with the other parameters. ```DefaultMQProducer```,```TransactionMQProducer```,```DefaultMQPushConsumer```,```DefaultMQPullConsumer``` all extends the ```ClientConfig``` Class, ```ClientConfig``` as the client common configuration class. Client configuration style like getXXX,setXXX, each of the parameters can config by spring and also config their in the code. Such as the ```namesrvAddr``` parameter: ```producer.setNamesrvAddr("192.168.0.1:9876")```, same with the other parameters.
#### Client Common Configuration #### Client Common Configuration
...@@ -96,8 +96,8 @@ HTTP static server addressing is recommended, because it is simple client deploy ...@@ -96,8 +96,8 @@ HTTP static server addressing is recommended, because it is simple client deploy
| Pamater Name | Default Value | Description | | Pamater Name | Default Value | Description |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ | | -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
| consumerGroup | DEFAULT_CONSUMER | Consumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together | | consumerGroup | DEFAULT_CONSUMER | Consumer group name. If multi Consumer belong to an application, subscribe the same message and consume logic as the same, they should be gathered together |
| brokerSuspendMaxTimeMillis | 20000 | Long pollingConsumer pull message request suspended for the longest time in the Broker in milliseconds | | brokerSuspendMaxTimeMillis | 20000 | Long polling, Consumer pull message request suspended for the longest time in the Broker in milliseconds |
| consumerTimeoutMillisWhenSuspend | 30000 | Long pollingConsumer pull message request suspend in the Broker over this time value, client think timeout. Unit is milliseconds | | consumerTimeoutMillisWhenSuspend | 30000 | Long polling, Consumer pull message request suspend in the Broker over this time value, client think timeout. Unit is milliseconds |
| consumerPullTimeoutMillis | 10000 | Not long polling, timeout time of pull message in milliseconds | | consumerPullTimeoutMillis | 10000 | Not long polling, timeout time of pull message in milliseconds |
| messageModel | BROADCASTING | Message support two mode: cluster consumption and broadcast consumption | | messageModel | BROADCASTING | Message support two mode: cluster consumption and broadcast consumption |
| messageQueueListener | | Listening changing of queue | | messageQueueListener | | Listening changing of queue |
...@@ -112,7 +112,7 @@ HTTP static server addressing is recommended, because it is simple client deploy ...@@ -112,7 +112,7 @@ HTTP static server addressing is recommended, because it is simple client deploy
| Topic | null | Required, the name of the topic to which the message belongs | | Topic | null | Required, the name of the topic to which the message belongs |
| Body | null | Required, message body | | Body | null | Required, message body |
| Tags | null | Optional, message tag, convenient for server filtering. Currently only one tag per message is supported | | Tags | null | Optional, message tag, convenient for server filtering. Currently only one tag per message is supported |
| Keys | null | Optional, represent this message's business keys, server create hash indexes based keys. After setting, you can find message by ```Topics``````Keys``` in Console system. Because of hash indexes, please make key as unique as possible, such as order number, goods Id and so on.| | Keys | null | Optional, represent this message's business keys, server create hash indexes based keys. After setting, you can find message by ```Topics```,```Keys``` in Console system. Because of hash indexes, please make key as unique as possible, such as order number, goods Id and so on.|
| Flag | 0 | Optional, it is entirely up to the application, and RocketMQ does not intervene | | Flag | 0 | Optional, it is entirely up to the application, and RocketMQ does not intervene |
| DelayTimeLevel | 0 | Optional, message delay level, 0 represent no delay, greater tan 0 can consume | | DelayTimeLevel | 0 | Optional, message delay level, 0 represent no delay, greater tan 0 can consume |
| WaitStoreMsgOK | TRUE | Optional, indicates whether the message is not answered until the server is down. | | WaitStoreMsgOK | TRUE | Optional, indicates whether the message is not answered until the server is down. |
......
# Message Queries
RocketMQ supports message queries by two dimensions, which are "Query Message by Message Id" and "Query Message by Message Key".
## 1. Query Message by Message Id
The MessageId in RocketMQ has a total length of 16 bytes, including the broker address (IP address and port) and CommitLog offset. In RocketMQ, the specific approach is that the Client resolves the Broker's address (IP address and port) and the CommitLog's offset address from the MessageId. Then both of them are encapsulated into an RPC request, and finally it will be sent through the communication layer (business request code: VIEW_MESSAGE_BY_ID). The Broker reads a message by using the CommitLog offset and size to find the real message in the CommitLog and then return, which is how QueryMessageProcessor works.
## 2. Query Message by Message Id
"Query Messages by Message Key" is mainly based on RocketMQ's IndexFile. The logical structure of the IndexFile is similar to the implementation of HashMap in JDK. The specific structure of the IndexFile is as follows:
![](images/rocketmq_design_message_query.png)
The IndexFile provides the user with the querying service by “Querying Messages by Message Key”. The IndexFile is stored in $HOME\store\index${fileName}, and the file name is named after the timestamp at the time of creation. The file size is fixed, which is 420,000,040 bytes (40+5million\*4+20million\*20). If the UNIQ_KEY is set in the properties of the message, then the "topic + ‘#’ + UNIQ_KEY" will be used as the index. Likewise, if the KEYS is set in the properties of the message (multiple KEYs should be separated by spaces), then the "topic + ‘#’ + KEY" will be used as the index.
The index data contains four fields, Key Hash, CommitLog offset, Timestamp and NextIndex offset, for a total of 20 Bytes. The NextIndex offset of the index data will point to the previous index data if the Key Hash of the index data is the same as that of the previous index data. If a hash conflict occurs, then the NextIndex offset can be used as the field to string all conflicting indexes in a linked list. What the Timestamp records is the time difference between two storeTimestamps, instead of a specific time. The structure of the entire IndexFile is shown in the graph. The Header is used to store some general statistics, which needs 40 bytes. The Slot Table of 4\*5million bytes does not save the real index data, but saves the header of the singly linked list corresponding to each slot. The Index Linked List of 20\*20million is the real index data, that is, an Index File can hold 20million indexes.
The specific method of "Query Message by Message Key" is that the topic and message key are used to find the record in the IndexFile, and then read the message from the file of CommitLog according to the CommitLog offset in this record.
\ No newline at end of file
# Transactional Message
## 1 Transactional Message
Apache RocketMQ supports distributed transactional message from version 4.3.0. RocketMQ implements transactional message by using the protocol of 2PC(two-phase commit), in addition adding a compensation logic to handle timeout-case or failure-case of commit-phase, as shown below.
![](../cn/image/rocketmq_design_10.png)
### 1.1 The Process of RocketMQ Transactional Message
The picture above shows the overall architecture of transactional message, including the sending of message(commit-request phase), the sending of commit/rollback(commit phase) and the compensation process.
1. The sending of message and Commit/Rollback.
(1) Sending the message(named Half message in RocketMQ)
(2) The server responds the writing result(success or failure) of Half message.
(3) Handle local transaction according to the result(local transaction won't be executed when the result is failure).
(4) Sending Commit/Rollback to broker according to the result of local transaction(Commit will generate message index and make the message visible to consumers).
2. Compensation process
(1) For a transactional message without a Commit/Rollback (means the message in the pending status), a "back-check" request is initiated from the broker.
(2) The Producer receives the "back-check" request and checks the status of the local transaction corresponding to the "back-check" message.
(3) Redo Commit or Rollback based on local transaction status.
The compensation phase is used to resolve the timeout or failure case of the message Commit or Rollback.
### 1.2 The design of RocketMQ Transactional Message
1. Transactional message is invisible to users in first phase(commit-request phase)
Upon on the main process of transactional message, the message of first phase is invisible to the user. This is also the biggest difference from normal message. So how do we write the message while making it invisible to the user? And below is the solution of RocketMQ: if the message is a Half message, the topic and queueId of the original message will be backed up, and then changes the topic to RMQ_SYS_TRANS_HALF_TOPIC. Since the consumer group does not subscribe to the topic, the consumer cannot consume the Half message. Then RocketMQ starts a timing task, pulls the message for RMQ_SYS_TRANS_HALF_TOPIC, obtains a channel according to producer group and sends a back-check to query local transaction status, and decide whether to submit or roll back the message according to the status.
In RocketMQ, the storage structure of the message in the broker is as follows. Each message has corresponding index information. The Consumer reads the content of the message through the secondary index of the ConsumeQueue. The flow is as follows:
![](../cn/image/rocketmq_design_11.png)
The specific implementation strategy of RocketMQ is: if the transactional message is written, topic and queueId of the message are replaced, and the original topic and queueId are stored in the properties of the message. Because the replace of the topic, the message will not be forwarded to the Consumer Queue of the original topic, and the consumer cannot perceive the existence of the message and will not consume it. In fact, changing the topic is the conventional method of RocketMQ(just recall the implementation mechanism of the delay message).
2. Commit/Rollback operation and introduction of Op message
After finishing writing a message that is invisible to the user in the first phase, here comes two cases in the second phase. One is Commit operation, after which the message needs to be visible to the user; the other one is Rollback operation, after which the first phase message(Half message) needs to be revoked. For the case of Rollback, since first-phase message itself is invisible to the user, there is no need to actually revoke the message (in fact, RocketMQ can't actually delete a message because it is a sequential-write file). But still some operation needs to be done to identity the final status of the message, to differ it from pending status message. To do this, the concept of "Op message" is introduced, which means the message has a certain status(Commit or Rollback). If a transactional message does not have a corresponding Op message, the status of the transaction is still undetermined (probably the second-phase failed). By introducing the Op message, the RocketMQ records an Op message for every Half message regardless it is Commit or Rollback. The only difference between Commit and Rollback is that when it comes to Commit, the index of the Half message is created before the Op message is written.
3. How Op message stored and the correspondence between Op message and Half message
RocketMQ writes the Op message to a specific system topic(RMQ_SYS_TRANS_OP_HALF_TOPIC) which will be created via the method - TransactionalMessageUtil.buildOpTopic(); this topic is an internal Topic (like the topic of RMQ_SYS_TRANS_HALF_TOPIC) and will not be consumed by the user. The content of the Op message is the physical offset of the corresponding Half message. Through the Op message we can index to the Half message for subsequent check-back operation.
![](../cn/image/rocketmq_design_12.png)
4. Index construction of Half messages
When performing Commit operation of the second phase, the index of the Half message needs to be built. Since the Half message is written to a special topic(RMQ_SYS_TRANS_HALF_TOPIC) in the first phase of 2PC, so it needs to be read out from the special topic when building index, and replace the topic and queueId with the real target topic and queueId, and then write through a normal message that is visible to the user. Therefore, in conclusion, the second phase recovers a complete normal message using the content of the Half message stored in the first phase, and then goes through the message-writing process.
5. How to handle the message failed in the second phase?
If commit/rollback phase fails, for example, a network problem causes the Commit to fail when you do Commit. Then certain strategy is required to make sure the message finally commit. RocketMQ uses a compensation mechanism called "back-check". The broker initiates a back-check request for the message in pending status, and sends the request to the corresponding producer side (the same producer group as the producer group who sent the Half message). The producer checks the status of local transaction and redo Commit or Rollback. The broker performs the back-check by comparing the RMQ_SYS_TRANS_HALF_TOPIC messages and the RMQ_SYS_TRANS_OP_HALF_TOPIC messages and advances the checkpoint(recording those transactional messages that the status are certain).
RocketMQ does not back-check the status of transactional messages endlessly. The default time is 15. If the transaction status is still unknown after 15 times, RocketMQ will roll back the message by default.
# Features
## Subscribe and Publish
Message publication refers to that a producer sends messages to a topic; Message subscription means a consumer follows a topic with certain tags and then consumes data from that topic.
## Message Ordering
Message ordering refers to that a group of messages can be consumed orderly as they are published. For example, an order generates three messages: order creation, order payment, and order completion. It only makes sense to consume them in their generated order, but orders can be consumed in parallel at the same time. RocketMQ can strictly guarantee these messages are in order.
Orderly message are divided into global orderly message and partitioned orderly message. Global order means that all messages under a certain topic must be in order, partitioned order only requires each group of messages are consumed orderly.
- Global message ordering:
For a given Topic, all messages are published and consumed in strict first-in-first-out (FIFO) order.
Applicable scenario: the performance requirement is not high, and all messages are published and consumed according to FIFO principle strictly.
- Partitioned message ordering:
For a given Topic, all messages are partitioned according to sharding key. Messages within the same partition are published and consumed in strict FIFO order. Sharding key is the key field to distinguish message's partition, which is a completely different concept from the key of ordinary messages.
Applicable scenario: high performance requirement, with sharding key as the partition field, messages within the same partition are published and consumed according to FIFO principle strictly.
## Message Filter
Consumers of RocketMQ can filter messages based on tags as well as support for user-defined attribute filtering. Message filter is currently implemented on the Broker side, with the advantage of reducing the network transmission of useless messages for Consumer and the disadvantage of increasing the burden on the Broker and relatively complex implementation.
## Message Reliability
RocketMQ supports high reliability of messages in several situations:
1) Broker shutdown normally
2) Broker abnormal crash
3) OS Crash
4) The machine is out of power, but it can be recovered immediately
5) The machine cannot be started up (the CPU, motherboard, memory and other key equipment may be damaged)
6) Disk equipment damaged
In the four cases of 1), 2), 3), and 4) where the hardware resource can be recovered immediately, RocketMQ guarantees that the message will not be lost or a small amount of data will be lost (depending on whether the flush disk type is synchronous or asynchronous)
5) and 6) are single point of failure and cannot be recovered. Once it happens, all messages on the single point will be lost. In both cases, RocketMQ ensures that 99% of the messages are not lost through asynchronous replication, but a very few number of messages may still be lost. Synchronous double write mode can completely avoid single point of failure, which will surely affect the performance and suitable for the occasion of high demand for message reliability, such as money related applications. Note: RocketMQ supports synchronous double writes since version 3.0.
## At Least Once
At least Once refers to that every message will be delivered at least once. RocketMQ supports this feature because the Consumer pulls the message locally and does not send an ack back to the server until it has consumed it.
## Backtracking Consumption
Backtracking consumption refers to that the Consumer has consumed the message successfully, but the business needs to consume again. To support this function, the message still needs to be retained after the Broker sends the message to the Consumer successfully. The re-consumption is normally based on time dimension. For example, after the recovery of the Consumer system failure, the data one hour ago needs to be re-consumed, then the Broker needs to provide a mechanism to reverse the consumption progress according to the time dimension. RocketMQ supports backtracking consumption by time trace, with the time dimension down to milliseconds.
## Transactional Message
RocketMQ transactional message refers to the fact that the application of a local transaction and the sending of a Message operation can be defined in a global transaction which means both succeed or fail simultaneously. RocketMQ transactional message provides distributed transaction functionality similar to X/Open XA, enabling the ultimate consistency of distributed transactions through transactional message.
## Scheduled Message
Scheduled message(delay queue) refers to that messages are not consumed immediately after they are sent to the broker, but waiting to be delivered to the real topic after a specific time.
The broker has a configuration item, messageDelayLevel, with default values “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”, 18 levels. Users can configure a custom messageDelayLevel. Note that messageDelayLevel is a broker's property rather than a topic's. When sending a message, just set the delayLevel level: msg.setDelayLevel(level). There are three types of levels:
- level == 0, The message is not a delayed message
- 1<=level<=maxLevel, Message delay specific time, such as level==1, delay for 1s
- level > maxLevel, than level== maxLevel, such as level==20, delay for 2h
Scheduled messages are temporarily saved in a topic named SCHEDULE_TOPIC_XXXX, and saved in a specific queue according to delayTimeLevel, queueId = delayTimeLevel - 1, that is, only messages with the same delay are saved in a queue, ensuring that messages with the same sending delay can be consumed orderly. The broker consumes SCHEDULE_TOPIC_XXXX on schedule and writes messages to the real topic.
Note that Scheduled messages are counted both the first time they are written and the time they are scheduled to be written to the real topic, so both the send number and the TPS are increased.
## Message Retry
When the Consumer fails to consume the message, a retry mechanism is needed to make the message to be consumed again. Consumer's consume failure can usually be classified as follows:
- due to the reasons of the message itself, such as deserialization failure, the message data itself cannot be processed (for example, the phone number of the current message is cancelled and cannot be charged), etc. This kind of error usually requires skipping this message and consuming others since immediately retry would be failed 99%, so it is better to provide a timed retry mechanism that retries after 10 seconds.
- due to the reasons of dependent downstream application services are not available, such as db connection is not usable, perimeter network is not unreachable, etc. When this kind of error is encountered, consuming other messages will also result in an error even if the current failed message is skipped. In this case, it is recommended to sleep for 30s before consuming the next message, which will reduce the pressure on the broker to retry the message.
RocketMQ will set up a retry queue named “%RETRY%+consumerGroup” for each consumer group(Note that the retry queue for this topic is for consumer groups, not for each topic) to temporarily save messages cannot be consumed by customer due to all kinds of reasons. Considering that it takes some time for the exception to recover, multiple retry levels are set for the retry queue, and each retry level has a corresponding re-deliver delay. The more retries, the greater the deliver delay. RocketMQ first save retry messages to the delay queue which topic is named “SCHEDULE_TOPIC_XXXX”, then background schedule task will save the messages to “%RETRY%+consumerGroup” retry queue according to their corresponding delay.
## Message Resend
When a producer sends a message, the synchronous message will be resent if fails, the asynchronous message will retry and oneway message is without any guarantee. Message resend ensures that messages are sent successfully and without lost as much as possible, but it can lead to message duplication, which is an unavoidable problem in RocketMQ. Under normal circumstances, message duplication will not occur, but when there is a large number of messages and network jitter, message duplication will be a high-probability event. In addition, producer initiative messages resend and the consumer load changes will also result in duplicate messages. The message retry policy can be set as follows:
- retryTimesWhenSendFailed: Synchronous message retry times when send failed, default value is 2, so the producer will try to send retryTimesWhenSendFailed + 1 times at most. To ensure that the message is not lost, producer will try sending the message to another broker instead of selecting the broker that failed last time. An exception will be thrown if it reaches the retry limit, and the client should guarantee that the message will not be lost. Messages will resend when RemotingException, MQClientException, and partial MQBrokerException occur.
- retryTimesWhenSendAsyncFailed: Asynchronous message retry times when send failed, asynchronous retry sends message to the same broker instead of selecting another one and does not guarantee that the message wont lost.
- retryAnotherBrokerWhenNotStoreOK: Message flush disk (master or slave) timeout or slave not available (return status is not SEND_OK), whether to try to send to another broker, default value is false. Very important messages can set to true.
## Flow Control
Producer flow control, because broker processing capacity reaches a bottleneck; Consumer flow control, because the consumption capacity reaches a bottleneck.
Producer flow control:
- When commitLog file locked time exceeds osPageCacheBusyTimeOutMills, default value of osPageCacheBusyTimeOutMills is 1000 ms, then return flow control.
- If transientStorePoolEnable == true, and the broker is asynchronous flush disk type, and resources are insufficient in the transientStorePool, reject the current send request and return flow control.
- The broker checks the head request wait time of the send request queue every 10ms. If the wait time exceeds waitTimeMillsInSendQueue, which default value is 200ms, the current send request is rejected and the flow control is returned.
- The broker implements flow control by rejecting send requests.
Consumer flow control:
- When consumer local cache messages number exceeds pullThresholdForQueue, default value is 1000.
- When consumer local cache messages size exceeds pullThresholdSizeForQueue, default value is 100MB.
- When consumer local cache messages span exceeds consumeConcurrentlyMaxSpan, default value is 2000.
The result of consumer flow control is to reduce the pull frequency.
## Dead Letter Queue
Dead letter queue is used to deal messages that cannot be consumed normally. When a message is consumed failed at first time, the message queue will automatically resend the message. If the consumption still fails after the maximum number retry, it indicates that the consumer cannot properly consume the message under normal circumstances. At this time, the message queue will not immediately abandon the message, but send it to the special queue corresponding to the consumer.
RocketMQ defines the messages that could not be consumed under normal circumstances as Dead-Letter Messages, and the special queue in which the Dead-Letter Messages are saved as Dead-Letter Queues. In RocketMQ, the consumer instance can consume again by resending messages in the Dead-Letter Queue using console.
\ No newline at end of file
## Consumer
----
### 2.1 Consumption process idempotent
RocketMQ cannot avoid Exactly-Once, so if the business is very sensitive to consumption repetition, it is important to perform deduplication at the business level. Deduplication can be done with a relational database. First, you need to determine the unique key of the message, which can be either msgId or a unique identifier field in the message content, such as the order Id. Determine if a unique key exists in the relational database before consumption. If it does not exist, insert it and consume it, otherwise skip it. (The actual process should consider the atomic problem, determine whether there is an attempt to insert, if the primary key conflicts, the insertion fails, skip directly)
### 2.2 Slow message processing
#### 2.2.1 Increase consumption parallelism
Most messages consumption behaviors are IO-intensive, That is, it may be to operate the database, or call RPC. The consumption speed of such consumption behavior lies in the throughput of the back-end database or the external system. By increasing the consumption parallelism, the total consumption throughput can be increased, but the degree of parallelism is increased to a certain extent. Instead it will fall.Therefore, the application must set a reasonable degree of parallelism. There are several ways to modify the degree of parallelism of consumption as follows:
* Under the same ConsumerGroup, increase the degree of parallelism by increasing the number of Consumer instances (note that the Consumer instance that exceeds the number of subscription queues is invalid). Can be done by adding machines, or by starting multiple processes on an existing machine.
* Improve the consumption parallel thread of a single Consumer by modifying the parameters consumeThreadMin and consumeThreadMax.
#### 2.2.2 Batch mode consumption
Some business processes can increase consumption throughput to a large extent if they support batch mode consumption. For example, order deduction application, it takes 1s to process one order at a time, and it takes only 2s to process 10 orders at a time. In this way, the throughput of consumption can be greatly improved. By setting the consumer's consumeMessageBatchMaxSize to return a parameter, the default is 1, that is, only one message is consumed at a time, for example, set to N, then the number of messages consumed each time is less than or equal to N.
#### 2.2.3 Skip non-critical messages
When a message is accumulated, if the consumption speed cannot keep up with the transmission speed, if the service does not require high data, you can choose to discard the unimportant message. For example, when the number of messages in a queue is more than 100,000 , try to discard some or all of the messages, so that you can quickly catch up with the speed of sending messages. The sample code is as follows:
```java
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
long offest = msgs.get(0).getQueueOffset();
String maxOffset =
msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if(diff > 100000){
//TODO Special handling of message accumulation
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//TODO Normal consumption process
return ConcumeConcurrentlyStatus.CONSUME_SUCCESS;
}
```
#### 2.2.4 Optimize each message consumption process
For example, the consumption process of a message is as follows:
* Query from DB according to the message [data 1]
* Query from DB according to the message [data 2]
* Complex business calculations
* Insert [Data 3] into the DB
* Insert [Data 4] into the DB
There are 4 interactions with the DB in the consumption process of this message. If it is calculated by 5ms each time, it takes a total of 20ms. If the business calculation takes 5ms, then the total time is 25ms, So if you can optimize 4 DB interactions to 2 times, the total time can be optimized to 15ms, which means the overall performance is increased by 40%. Therefore, if the application is sensitive to delay, the DB can be deployed on the SSD hard disk. Compared with the SCSI disk, the former RT will be much smaller.
### 2.3 Print Log
If the amount of messages is small, it is recommended to print the message in the consumption entry method, consume time, etc., to facilitate subsequent troubleshooting.
```java
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context){
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
//TODO Normal consumption process
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
```
If you can print the time spent on each message, it will be more convenient when troubleshooting online problems such as slow consumption.
### 2.4 Other consumption suggestions
#### 2.4.1、Consumer Group and Subscriptions
The first thing you should be aware of is that different Consumer Group can consume the same topic independently, and each of them will have their own consuming offsets. Please make sure each Consumer within the same Group to subscribe the same topics.
#### 2.4.2、Orderly
The Consumer will lock each MessageQueue to make sure it is consumed one by one in order. This will cause a performance loss, but it is useful when you care about the order of the messages. It is not recommended to throw exceptions, you can return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead.
#### 2.4.3、Concurrently
As the name tells, the Consumer will consume the messages concurrently. It is recommended to use this for good performance. It is not recommended to throw exceptions, you can return ConsumeConcurrentlyStatus.RECONSUME_LATER instead.
#### 2.4.4、Consume Status
For MessageListenerConcurrently, you can return RECONSUME_LATER to tell the consumer that you can not consume it right now and want to reconsume it later. Then you can continue to consume other messages. For MessageListenerOrderly, because you care about the order, you can not jump over the message, but you can return SUSPEND_CURRENT_QUEUE_A_MOMENT to tell the consumer to wait for a moment.
#### 2.4.5、Blocking
It is not recommend to block the Listener, because it will block the thread pool, and eventually may stop the consuming process.
#### 2.4.6、Thread Number
The consumer use a ThreadPoolExecutor to process consuming internally, so you can change it by setting setConsumeThreadMin or setConsumeThreadMax.
#### 2.4.7、ConsumeFromWhere
When a new Consumer Group is established, it will need to decide whether it needs to consume the historical messages which had already existed in the Broker. CONSUME_FROM_LAST_OFFSET will ignore the historical messages, and consume anything produced after that. CONSUME_FROM_FIRST_OFFSET will consume every message existed in the Broker. You can also use CONSUME_FROM_TIMESTAMP to consume messages produced after the specified timestamp.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.index.IndexFile;
import org.apache.rocketmq.store.index.IndexService;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
public class StoreTestUtil {
private static final InternalLogger log = InternalLoggerFactory.getLogger(StoreTestUtil.class);
public static boolean isCommitLogAvailable(DefaultMessageStore store) {
try {
Field serviceField = store.getClass().getDeclaredField("reputMessageService");
serviceField.setAccessible(true);
DefaultMessageStore.ReputMessageService reputService =
(DefaultMessageStore.ReputMessageService) serviceField.get(store);
Method method = DefaultMessageStore.ReputMessageService.class.getDeclaredMethod("isCommitLogAvailable");
method.setAccessible(true);
return (boolean) method.invoke(reputService);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
public static void flushConsumeQueue(DefaultMessageStore store) throws Exception {
Field field = store.getClass().getDeclaredField("flushConsumeQueueService");
field.setAccessible(true);
DefaultMessageStore.FlushConsumeQueueService flushService = (DefaultMessageStore.FlushConsumeQueueService) field.get(store);
final int RETRY_TIMES_OVER = 3;
Method method = DefaultMessageStore.FlushConsumeQueueService.class.getDeclaredMethod("doFlush", int.class);
method.setAccessible(true);
method.invoke(flushService, RETRY_TIMES_OVER);
}
public static void waitCommitLogReput(DefaultMessageStore store) {
for (int i = 0; i < 500 && isCommitLogAvailable(store); i++) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
if (isCommitLogAvailable(store)) {
log.warn("isCommitLogAvailable expected false ,but true");
}
}
public static void flushConsumeIndex(DefaultMessageStore store) throws NoSuchFieldException, Exception {
Field field = store.getClass().getDeclaredField("indexService");
field.setAccessible(true);
IndexService indexService = (IndexService) field.get(store);
Field field2 = indexService.getClass().getDeclaredField("indexFileList");
field2.setAccessible(true);
ArrayList<IndexFile> indexFileList = (ArrayList<IndexFile>) field2.get(indexService);
for (IndexFile f : indexFileList) {
indexService.flush(f);
}
}
}
...@@ -50,7 +50,7 @@ public class ScheduleMessageServiceTest { ...@@ -50,7 +50,7 @@ public class ScheduleMessageServiceTest {
* t * t
* defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" * defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
*/ */
String testMessageDelayLevel = "2s 3s"; String testMessageDelayLevel = "5s 8s";
/** /**
* choose delay level * choose delay level
*/ */
...@@ -111,7 +111,7 @@ public class ScheduleMessageServiceTest { ...@@ -111,7 +111,7 @@ public class ScheduleMessageServiceTest {
@Test @Test
public void deliverDelayedMessageTimerTaskTest() throws InterruptedException { public void deliverDelayedMessageTimerTaskTest() throws Exception {
MessageExtBrokerInner msg = buildMessage(); MessageExtBrokerInner msg = buildMessage();
int realQueueId = msg.getQueueId(); int realQueueId = msg.getQueueId();
// set delayLevel,and send delay message // set delayLevel,and send delay message
...@@ -119,6 +119,8 @@ public class ScheduleMessageServiceTest { ...@@ -119,6 +119,8 @@ public class ScheduleMessageServiceTest {
PutMessageResult result = messageStore.putMessage(msg); PutMessageResult result = messageStore.putMessage(msg);
assertThat(result.isOk()).isTrue(); assertThat(result.isOk()).isTrue();
// make sure consumerQueue offset = commitLog offset
StoreTestUtil.waitCommitLogReput(messageStore);
// consumer message // consumer message
int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel); int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
...@@ -132,7 +134,7 @@ public class ScheduleMessageServiceTest { ...@@ -132,7 +134,7 @@ public class ScheduleMessageServiceTest {
// timer run maybe delay, then consumer message again // timer run maybe delay, then consumer message again
// and wait offsetTable // and wait offsetTable
TimeUnit.SECONDS.sleep(3); TimeUnit.SECONDS.sleep(10);
scheduleMessageService.buildRunningStats(new HashMap<String, String>()); scheduleMessageService.buildRunningStats(new HashMap<String, String>());
messageResult = getMessage(realQueueId, offset); messageResult = getMessage(realQueueId, offset);
...@@ -188,7 +190,6 @@ public class ScheduleMessageServiceTest { ...@@ -188,7 +190,6 @@ public class ScheduleMessageServiceTest {
@After @After
public void shutdown() throws InterruptedException { public void shutdown() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
messageStore.shutdown(); messageStore.shutdown();
messageStore.destroy(); messageStore.destroy();
File file = new File(messageStoreConfig.getStorePathRootDir()); File file = new File(messageStoreConfig.getStorePathRootDir());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.namesrv;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.MQClientManager;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import org.apache.rocketmq.tools.command.SubCommandException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.lang.reflect.Field;
import static org.mockito.Mockito.mock;
public class UpdateKvConfigCommandTest {
private static DefaultMQAdminExt defaultMQAdminExt;
private static DefaultMQAdminExtImpl defaultMQAdminExtImpl;
private static MQClientInstance mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(new ClientConfig());
private static MQClientAPIImpl mQClientAPIImpl;
@BeforeClass
public static void init() throws NoSuchFieldException, IllegalAccessException {
mQClientAPIImpl = mock(MQClientAPIImpl.class);
defaultMQAdminExt = new DefaultMQAdminExt();
defaultMQAdminExtImpl = new DefaultMQAdminExtImpl(defaultMQAdminExt, 1000);
Field field = DefaultMQAdminExtImpl.class.getDeclaredField("mqClientInstance");
field.setAccessible(true);
field.set(defaultMQAdminExtImpl, mqClientInstance);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mqClientInstance, mQClientAPIImpl);
field = DefaultMQAdminExt.class.getDeclaredField("defaultMQAdminExtImpl");
field.setAccessible(true);
field.set(defaultMQAdminExt, defaultMQAdminExtImpl);
}
@AfterClass
public static void terminate() {
defaultMQAdminExt.shutdown();
}
@Ignore
@Test
public void testExecute() throws SubCommandException {
UpdateKvConfigCommand cmd = new UpdateKvConfigCommand();
Options options = ServerUtil.buildCommandlineOptions(new Options());
String[] subargs = new String[]{"-s namespace", "-k topicname", "-v unit_test"};
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName() + cmd.commandDesc(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, null);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册