提交 f7de85d3 编写于 作者: K kavin

Merge remote-tracking branch 'upstream/develop' into develop

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.body;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumerConnectionTest {
@Test
public void testFromJson() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connections = new HashSet<Connection>();
Connection conn = new Connection();
connections.add(conn);
ConcurrentHashMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();
SubscriptionData subscriptionData = new SubscriptionData();
subscriptionTable.put("topicA", subscriptionData);
ConsumeType consumeType = ConsumeType.CONSUME_ACTIVELY;
MessageModel messageModel = MessageModel.CLUSTERING;
ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET;
consumerConnection.setConnectionSet(connections);
consumerConnection.setSubscriptionTable(subscriptionTable);
consumerConnection.setConsumeType(consumeType);
consumerConnection.setMessageModel(messageModel);
consumerConnection.setConsumeFromWhere(consumeFromWhere);
String json = RemotingSerializable.toJson(consumerConnection, true);
ConsumerConnection fromJson = RemotingSerializable.fromJson(json, ConsumerConnection.class);
assertThat(fromJson.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
assertThat(fromJson.getMessageModel()).isEqualTo(MessageModel.CLUSTERING);
HashSet<Connection> connectionSet = fromJson.getConnectionSet();
assertThat(connectionSet).isInstanceOf(Set.class);
SubscriptionData data = fromJson.getSubscriptionTable().get("topicA");
assertThat(data).isExactlyInstanceOf(SubscriptionData.class);
}
@Test
public void testComputeMinVersion() {
ConsumerConnection consumerConnection = new ConsumerConnection();
HashSet<Connection> connections = new HashSet<Connection>();
Connection conn1 = new Connection();
conn1.setVersion(1);
connections.add(conn1);
Connection conn2 = new Connection();
conn2.setVersion(10);
connections.add(conn2);
consumerConnection.setConnectionSet(connections);
int version = consumerConnection.computeMinVersion();
assertThat(version).isEqualTo(1);
}
}
......@@ -4,7 +4,7 @@ RocketMQ provides ordered messages using FIFO order. All related messages need t
The following demonstrates ordered messages by ensuring order of create, pay, send and finish steps of sales order process.
## 2.1 produce ordered messages
## 1 produce ordered messages
```
package org.apache.rocketmq.example.order2
......
......@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.schedule;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
......@@ -45,17 +46,17 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ScheduleMessageServiceTest {
/**t
/**
* t
* defaultMessageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
*/
String testMessageDelayLevel = "5s 10s";
String testMessageDelayLevel = "2s 3s";
/**
* choose delay level
* 1 = 5s
*/
int delayLevel = 1;
int delayLevel = 2;
private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test"+ UUID.randomUUID();
private static final String storePath = System.getProperty("user.home") + File.separator + "schedule_test#" + UUID.randomUUID();
private static final int commitLogFileSize = 1024;
private static final int cqFileSize = 10;
private static final int cqExtFileSize = 10 * (ConsumeQueueExt.CqExtUnit.MIN_EXT_UNIT_SIZE + 64);
......@@ -67,7 +68,9 @@ public class ScheduleMessageServiceTest {
BrokerConfig brokerConfig;
ScheduleMessageService scheduleMessageService;
static String sendMessage = " ------- schedule message test -------";
static String sendMessage = " ------- schedule message test -------";
static String topic = "schedule_topic_test";
static String messageGroup = "delayGroupTest";
static {
......@@ -107,67 +110,39 @@ public class ScheduleMessageServiceTest {
}
@Test
public void buildRunningStatsTest() throws InterruptedException {
MessageExtBrokerInner msg = buildMessage();
msg.setDelayTimeLevel(delayLevel);
messageStore.putMessage(msg);
// wait offsetTable
TimeUnit.SECONDS.sleep(1);
scheduleMessageService.buildRunningStats(new HashMap<String, String>() );
}
@Test
public void computeDeliverTimestampTest() {
// testMessageDelayLevel just "5s 10s"
long storeTime = System.currentTimeMillis();
long time1 = scheduleMessageService.computeDeliverTimestamp(1, storeTime);
assertThat(time1).isEqualTo(storeTime + 5 * 1000);
long time2 = scheduleMessageService.computeDeliverTimestamp(2, storeTime);
assertThat(time2).isEqualTo(storeTime + 10 * 1000);
}
@Test
public void delayLevel2QueueIdTest() {
int queueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
assertThat(queueId).isEqualTo(delayLevel - 1);
queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel);
assertThat(queueId).isEqualTo(delayLevel + 1);
}
@Test
public void deliverDelayedMessageTimerTaskTest() throws InterruptedException {
MessageExtBrokerInner msg = buildMessage();
int realQueueId = msg.getQueueId();
// set delayLevel,and send delay message
msg.setDelayTimeLevel(delayLevel);
PutMessageResult result = messageStore.putMessage(msg);
assertThat(result.isOk()).isTrue();
// consumer message
int delayQueueId = ScheduleMessageService.delayLevel2QueueId(delayLevel);
assertThat(delayQueueId).isEqualTo(delayLevel - 1);
Long offset = result.getAppendMessageResult().getLogicsOffset();
String messageGroup = "delayGroupTest";
GetMessageResult messageResult = messageStore.getMessage(messageGroup,msg.getTopic(),
msg.getQueueId(),offset,1,null);
// now, no message in queue,must wait > 5 seconds
// now, no message in queue,must wait > delayTime
GetMessageResult messageResult = getMessage(realQueueId, offset);
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
// timer run maybe delay, then consumer message again
// and wait offsetTable
TimeUnit.SECONDS.sleep(3);
scheduleMessageService.buildRunningStats(new HashMap<String, String>());
TimeUnit.SECONDS.sleep(6);
messageResult = messageStore.getMessage(messageGroup,msg.getTopic(),
msg.getQueueId(),offset,1,null);
messageResult = getMessage(realQueueId, offset);
// now,found the message
assertThat(messageResult.getStatus()).isEqualTo(GetMessageStatus.FOUND);
// get the message body
ByteBuffer byteBuffer = ByteBuffer.allocate(messageResult.getBufferTotalSize());
List<ByteBuffer> byteBufferList = messageResult.getMessageBufferList();
List<ByteBuffer> byteBufferList = messageResult.getMessageBufferList();
for (ByteBuffer bb : byteBufferList) {
byteBuffer.put(bb);
}
......@@ -178,22 +153,42 @@ public class ScheduleMessageServiceTest {
String retryMsg = new String(msgList.get(0).getBody());
assertThat(sendMessage).isEqualTo(retryMsg);
// method will wait 10s,so I run it by myself
scheduleMessageService.persist();
// add mapFile release
messageResult.release();
}
/**
* add some [error/no use] code test
*/
@Test
public void persist(){
// because of the method will wait 10s
scheduleMessageService.persist();
public void otherTest() {
// the method no use ,why need ?
int queueId = ScheduleMessageService.queueId2DelayLevel(delayLevel);
assertThat(queueId).isEqualTo(delayLevel + 1);
// error delayLevelTest
Long time = scheduleMessageService.computeDeliverTimestamp(999, 0);
assertThat(time).isEqualTo(1000);
// just decode
scheduleMessageService.decode(new DelayOffsetSerializeWrapper().toJson());
}
private GetMessageResult getMessage(int queueId, Long offset) {
return messageStore.getMessage(messageGroup, topic,
queueId, offset, 1, null);
}
@After
public void shutdown() throws InterruptedException {
TimeUnit.SECONDS.sleep(1);
scheduleMessageService.shutdown();
messageStore.shutdown();
messageStore.destroy();
File file = new File(messageStoreConfig.getStorePathRootDir());
......@@ -205,11 +200,10 @@ public class ScheduleMessageServiceTest {
byte[] msgBody = sendMessage.getBytes();
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("schedule_topic_test");
msg.setTopic(topic);
msg.setTags("schedule_tag");
msg.setKeys("schedule_key");
msg.setBody(msgBody);
msg.setQueueId(0);
msg.setSysFlag(0);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(storeHost);
......@@ -226,6 +220,4 @@ public class ScheduleMessageServiceTest {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册