提交 881aef5d 编写于 作者: Y yukon

[ROCKETMQ-56] Polish unit tests for rocketmq-store

上级 e9a87ea9
......@@ -24,42 +24,28 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;
public class DefaultMessageStoreTest {
private static final Logger logger = LoggerFactory.getLogger(DefaultMessageStoreTest.class);
private static final String StoreMessage = "Once, there was a chance for me!";
private static int QUEUE_TOTAL = 100;
private static AtomicInteger QueueId = new AtomicInteger(0);
private static SocketAddress BornHost;
private static SocketAddress StoreHost;
private static byte[] MessageBody;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
private final String StoreMessage = "Once, there was a chance for me!";
private int QUEUE_TOTAL = 100;
private AtomicInteger QueueId = new AtomicInteger(0);
private SocketAddress BornHost;
private SocketAddress StoreHost;
private byte[] MessageBody;
@Before
public void init() throws Exception {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void test_write_read() throws Exception {
logger.debug("================================================================");
public void testWriteAndRead() throws Exception {
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
......@@ -77,29 +63,23 @@ public class DefaultMessageStoreTest {
master.start();
try {
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
master.putMessage(buildMessage());
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
logger.debug("result == null " + i);
}
assertTrue(result != null);
assertThat(result).isNotNull();
result.release();
logger.debug("read " + i + " OK");
}
} finally {
master.shutdown();
master.destroy();
}
logger.debug("================================================================");
}
public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("AAA");
msg.setTopic("FooBar");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
......@@ -113,8 +93,7 @@ public class DefaultMessageStoreTest {
}
@Test
public void test_group_commit() throws Exception {
logger.debug("================================================================");
public void testGroupCommit() throws Exception {
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
......@@ -128,32 +107,24 @@ public class DefaultMessageStoreTest {
master.start();
try {
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = master.putMessage(buildMessage());
logger.debug(i + "\t" + result.getAppendMessageResult().getMsgId());
master.putMessage(buildMessage());
}
for (long i = 0; i < totalMsgs; i++) {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
logger.debug("result == null " + i);
}
assertTrue(result != null);
assertThat(result).isNotNull();
result.release();
logger.debug("read " + i + " OK");
}
} finally {
master.shutdown();
master.destroy();
}
logger.debug("================================================================");
}
private class MyMessageArrivingListener implements MessageArrivingListener {
@Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode) {
// Do nothing here
}
}
}
......@@ -15,70 +15,30 @@
* limitations under the License.
*/
/**
* $Id: MappedFileQueueTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.store;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
public class MappedFileQueueTest {
private static final Logger logger = LoggerFactory.getLogger(MappedFileQueueTest.class);
// private static final String StoreMessage =
// "Once, there was a chance for me! but I did not treasure it. if";
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
}
@Test
public void test_getLastMappedFile() {
public void testGetLastMappedFile() {
final String fixedMsg = "0123456789abcdef";
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/a/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
if (!result) {
logger.debug("appendMessage " + i);
}
assertTrue(result);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
}
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.getLastMappedFile() OK");
}
@Test
......@@ -86,162 +46,134 @@ public class MappedFileQueueTest {
// four-byte string.
final String fixedMsg = "abcd";
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/b/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
}
assertEquals(fixedMsg.getBytes().length * 1024, mappedFileQueue.getMappedMemorySize());
assertThat(mappedFileQueue.getMappedMemorySize()).isEqualTo(fixedMsg.getBytes().length * 1024);
MappedFile mappedFile = mappedFileQueue.findMappedFileByOffset(0);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.getFileFromOffset()).isEqualTo(0);
mappedFile = mappedFileQueue.findMappedFileByOffset(100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 0);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.getFileFromOffset()).isEqualTo(0);
mappedFile = mappedFileQueue.findMappedFileByOffset(1024);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024);
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024);
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024 * 2);
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 2 + 100);
assertTrue(mappedFile != null);
assertEquals(mappedFile.getFileFromOffset(), 1024 * 2);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.getFileFromOffset()).isEqualTo(1024 * 2);
// over mapped memory size.
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4);
assertTrue(mappedFile == null);
assertThat(mappedFile).isNull();
mappedFile = mappedFileQueue.findMappedFileByOffset(1024 * 4 + 100);
assertTrue(mappedFile == null);
assertThat(mappedFile).isNull();
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.findMappedFileByOffset() OK");
}
@Test
public void test_commit() {
public void testAppendMessage() {
final String fixedMsg = "0123456789abcdef";
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/c/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
}
boolean result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 1, mappedFileQueue.getFlushedWhere());
assertThat(mappedFileQueue.flush(0)).isFalse();
assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024);
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 2, mappedFileQueue.getFlushedWhere());
assertThat(mappedFileQueue.flush(0)).isFalse();
assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 2);
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 3, mappedFileQueue.getFlushedWhere());
assertThat(mappedFileQueue.flush(0)).isFalse();
assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 3);
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 4, mappedFileQueue.getFlushedWhere());
assertThat(mappedFileQueue.flush(0)).isFalse();
assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 4);
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 5, mappedFileQueue.getFlushedWhere());
assertThat(mappedFileQueue.flush(0)).isFalse();
assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 5);
result = mappedFileQueue.flush(0);
assertFalse(result);
assertEquals(1024 * 6, mappedFileQueue.getFlushedWhere());
assertThat(mappedFileQueue.flush(0)).isFalse();
assertThat(mappedFileQueue.getFlushedWhere()).isEqualTo(1024 * 6);
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.flush() OK");
}
@Test
public void test_getMappedMemorySize() {
public void testGetMappedMemorySize() {
final String fixedMsg = "abcd";
logger.debug("================================================================");
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/d/", 1024, null);
for (int i = 0; i < 1024; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertTrue(mappedFile != null);
boolean result = mappedFile.appendMessage(fixedMsg.getBytes());
assertTrue(result);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
}
assertEquals(fixedMsg.length() * 1024, mappedFileQueue.getMappedMemorySize());
assertThat(mappedFileQueue.getMappedMemorySize()).isEqualTo(fixedMsg.length() * 1024);
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.getMappedMemorySize() OK");
}
@Test
public void test_deleteExpiredFileByOffset() {
logger.debug("================================================================");
public void testDeleteExpiredFileByOffset() {
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/e", 5120, null);
for (int i = 0; i < 2048; i++) {
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertNotNull(mappedFile);
assertThat(mappedFile).isNotNull();
ByteBuffer byteBuffer = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
byteBuffer.putLong(i);
byte[] padding = new byte[12];
Arrays.fill(padding, (byte)'0');
Arrays.fill(padding, (byte) '0');
byteBuffer.put(padding);
byteBuffer.flip();
boolean result = mappedFile.appendMessage(byteBuffer.array());
assertTrue(result);
assertThat(mappedFile.appendMessage(byteBuffer.array())).isTrue();
}
MappedFile first = mappedFileQueue.getFirstMappedFile();
first.hold();
int count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE);
assertEquals(0, count);
assertThat(mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE)).isEqualTo(0);
first.release();
count = mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE);
assertTrue(count > 0);
assertThat(mappedFileQueue.deleteExpiredFileByOffset(20480, ConsumeQueue.CQ_STORE_UNIT_SIZE)).isGreaterThan(0);
first = mappedFileQueue.getFirstMappedFile();
assertTrue(first.getFileFromOffset() > 0);
assertThat(first.getFileFromOffset()).isGreaterThan(0);
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
logger.debug("MappedFileQueue.deleteExpiredFileByOffset() OK");
}
}
......@@ -21,66 +21,30 @@
package org.apache.rocketmq.store;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
public class MappedFileTest {
private static final Logger logger = LoggerFactory.getLogger(MappedFileTest.class);
private static final String StoreMessage = "Once, there was a chance for me!";
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
private final String storeMessage = "Once, there was a chance for me!";
@Test
public void test_write_read() throws IOException {
public void testSelectMappedBuffer() throws IOException {
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
logger.debug("write OK");
boolean result = mappedFile.appendMessage(storeMessage.getBytes());
assertThat(result).isTrue();
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
byte[] data = new byte[StoreMessage.length()];
byte[] data = new byte[storeMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
logger.debug("Read: " + readString);
assertTrue(readString.equals(StoreMessage));
assertThat(readString).isEqualTo(storeMessage);
mappedFile.shutdown(1000);
assertTrue(!mappedFile.isAvailable());
selectMappedBufferResult.release();
assertTrue(mappedFile.isCleanupOver());
assertTrue(mappedFile.destroy(1000));
}
@Ignore
public void test_jvm_crashed() throws IOException {
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/10086", 1024 * 64);
boolean result = mappedFile.appendMessage(StoreMessage.getBytes());
assertTrue(result);
logger.debug("write OK");
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
assertThat(mappedFile.isAvailable()).isFalse();
selectMappedBufferResult.release();
mappedFile.shutdown(1000);
byte[] data = new byte[StoreMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
logger.debug(readString);
assertThat(mappedFile.isCleanupOver()).isTrue();
assertThat(mappedFile.destroy(1000)).isTrue();
}
}
......@@ -21,24 +21,13 @@
package org.apache.rocketmq.store;
import java.io.IOException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
public class StoreCheckpointTest {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void test_write_read() throws IOException {
public void testWriteAndRead() throws IOException {
StoreCheckpoint storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
long physicMsgTimestamp = 0xAABB;
long logicsMsgTimestamp = 0xCCDD;
......@@ -47,10 +36,10 @@ public class StoreCheckpointTest {
storeCheckpoint.flush();
long diff = physicMsgTimestamp - storeCheckpoint.getMinTimestamp();
assertTrue(diff == 3000);
assertThat(diff).isEqualTo(3000);
storeCheckpoint.shutdown();
storeCheckpoint = new StoreCheckpoint("target/checkpoint_test/0000");
assertTrue(physicMsgTimestamp == storeCheckpoint.getPhysicMsgTimestamp());
assertTrue(logicsMsgTimestamp == storeCheckpoint.getLogicsMsgTimestamp());
assertThat(storeCheckpoint.getPhysicMsgTimestamp()).isEqualTo(physicMsgTimestamp);
assertThat(storeCheckpoint.getLogicsMsgTimestamp()).isEqualTo(logicsMsgTimestamp);
}
}
......@@ -24,47 +24,43 @@ import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.assertj.core.api.Assertions.assertThat;
public class IndexFileTest {
private static final int HASH_SLOT_NUM = 100;
private static final int INDEX_NUM = 400;
private final int HASH_SLOT_NUM = 100;
private final int INDEX_NUM = 400;
@Test
public void test_put_index() throws Exception {
public void testPutKey() throws Exception {
IndexFile indexFile = new IndexFile("100", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
assertThat(putResult).isTrue();
}
// put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
assertThat(putResult).isFalse();
indexFile.destroy(0);
}
@Test
public void test_put_get_index() throws Exception {
public void testSelectPhyOffset() throws Exception {
IndexFile indexFile = new IndexFile("200", HASH_SLOT_NUM, INDEX_NUM, 0, 0);
for (long i = 0; i < (INDEX_NUM - 1); i++) {
boolean putResult = indexFile.putKey(Long.toString(i), i, System.currentTimeMillis());
assertTrue(putResult);
assertThat(putResult).isTrue();
}
// put over index file capacity.
boolean putResult = indexFile.putKey(Long.toString(400), 400, System.currentTimeMillis());
assertFalse(putResult);
assertThat(putResult).isFalse();
final List<Long> phyOffsets = new ArrayList<Long>();
indexFile.selectPhyOffset(phyOffsets, "60", 10, 0, Long.MAX_VALUE, true);
assertFalse(phyOffsets.isEmpty());
assertEquals(1, phyOffsets.size());
assertThat(phyOffsets).isNotEmpty();
assertThat(phyOffsets.size()).isEqualTo(1);
indexFile.destroy(0);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: ScheduleMessageTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.store.schedule;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
@Ignore
public class ScheduleMessageTest {
private static final String StoreMessage = "Once, there was a chance for me!";
private static int QUEUE_TOTAL = 100;
private static AtomicInteger QueueId = new AtomicInteger(0);
private static SocketAddress BornHost;
private static SocketAddress StoreHost;
private static byte[] MessageBody;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void test_delay_message() throws Exception {
System.out.println("================================================================");
long totalMsgs = 10000;
QUEUE_TOTAL = 32;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 16);
messageStoreConfig.setMaxHashSlotNum(100);
messageStoreConfig.setMaxIndexNum(1000 * 10);
MessageStore master = new DefaultMessageStore(messageStoreConfig, null, null, null);
boolean load = master.load();
assertTrue(load);
master.start();
for (int i = 0; i < totalMsgs; i++) {
MessageExtBrokerInner msg = buildMessage();
msg.setDelayTimeLevel(i % 4);
PutMessageResult result = master.putMessage(msg);
System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
}
System.out.println("write message over, wait time up");
Thread.sleep(1000 * 20);
for (long i = 0; i < totalMsgs; i++) {
try {
GetMessageResult result = master.getMessage("GROUP_A", "TOPIC_A", 0, i, 1024 * 1024, null);
if (result == null) {
System.out.println("result == null " + i);
}
assertTrue(result != null);
result.release();
System.out.println("read " + i + " OK");
} catch (Exception e) {
e.printStackTrace();
}
}
Thread.sleep(1000 * 15);
master.shutdown();
master.destroy();
System.out.println("================================================================");
}
public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("AAA");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(4);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
return msg;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册