提交 07a8862c 编写于 作者: Y yukon

ROCKETMQ-18 Delete unused unit tests.

上级 de6f9416
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* @author yubao.fyb@taoboa.com
* @version $id$
*/
package org.apache.rocketmq.client.consumer.loadbalance;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
/**
* @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24
*/
public class AllocateMessageQueueAveragelyTest {
private AllocateMessageQueueStrategy allocateMessageQueueAveragely;
private String currentCID;
private String topic;
private List<MessageQueue> messageQueueList;
private List<String> consumerIdList;
@Before
public void init() {
allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
topic = "topic_test";
}
@Test
public void testConsumer1() {
currentCID = "0";
createConsumerIdList(1);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer1");
Assert.assertEquals(result.size(), 5);
Assert.assertEquals(result.containsAll(getMessageQueueList()), true);
}
public void createConsumerIdList(int size) {
consumerIdList = new ArrayList<String>(size);
for (int i = 0; i < size; i++) {
consumerIdList.add(String.valueOf(i));
}
}
public void createMessageQueueList(int size) {
messageQueueList = new ArrayList<MessageQueue>(size);
for (int i = 0; i < size; i++) {
MessageQueue mq = new MessageQueue(topic, "brokerName", i);
messageQueueList.add(mq);
}
}
public void printMessageQueue(List<MessageQueue> messageQueueList, String name) {
if (messageQueueList == null || messageQueueList.size() < 1)
return;
System.out.println(name + ".......................................start");
for (MessageQueue messageQueue : messageQueueList) {
System.out.println(messageQueue);
}
System.out.println(name + ".......................................end");
}
public List<MessageQueue> getMessageQueueList() {
return messageQueueList;
}
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
@Test
public void testConsumer2() {
currentCID = "1";
createConsumerIdList(2);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer2");
Assert.assertEquals(result.size(), 3);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
}
@Test
public void testConsumer3CurrentCID0() {
currentCID = "0";
createConsumerIdList(3);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer3CurrentCID0");
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true);
}
@Test
public void testConsumer3CurrentCID1() {
currentCID = "1";
createConsumerIdList(3);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer3CurrentCID1");
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
}
@Test
public void testConsumer3CurrentCID2() {
currentCID = "2";
createConsumerIdList(3);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer3CurrentCID2");
Assert.assertEquals(result.size(), 3);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true);
}
@Test
public void testConsumer4() {
currentCID = "1";
createConsumerIdList(4);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer4");
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
}
@Test
public void testConsumer5() {
currentCID = "1";
createConsumerIdList(5);
createMessageQueueList(5);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer5");
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true);
}
@Test
public void testConsumer6() {
currentCID = "1";
createConsumerIdList(2);
createMessageQueueList(6);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testConsumer");
Assert.assertEquals(result.size(), 3);
Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true);
}
@Test
public void testCurrentCIDNotExists() {
currentCID = String.valueOf(Integer.MAX_VALUE);
createConsumerIdList(2);
createMessageQueueList(6);
List<MessageQueue> result =
allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList);
printMessageQueue(result, "testCurrentCIDNotExists");
Assert.assertEquals(result.size(), 0);
}
@Test(expected = IllegalArgumentException.class)
public void testCurrentCIDIllegalArgument() {
createConsumerIdList(2);
createMessageQueueList(6);
allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList());
}
public List<String> getConsumerIdList() {
return consumerIdList;
}
public void setConsumerIdList(List<String> consumerIdList) {
this.consumerIdList = consumerIdList;
}
@Test(expected = IllegalArgumentException.class)
public void testMessageQueueIllegalArgument() {
currentCID = "0";
createConsumerIdList(2);
allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList());
}
@Test(expected = IllegalArgumentException.class)
public void testConsumerIdIllegalArgument() {
currentCID = "0";
createMessageQueueList(6);
allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null);
}
@Test
public void testAllocate() {
AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely();
String topic = "topic_test";
String currentCID = "CID";
int queueSize = 19;
int consumerSize = 10;
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
for (int i = 0; i < queueSize; i++) {
MessageQueue mq = new MessageQueue(topic, "brokerName", i);
mqAll.add(mq);
}
List<String> cidAll = new ArrayList<String>();
for (int j = 0; j < consumerSize; j++) {
cidAll.add("CID" + j);
}
System.out.println(mqAll.toString());
System.out.println(cidAll.toString());
for (int i = 0; i < consumerSize; i++) {
List<MessageQueue> rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll);
System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
}
}
@Test
public void testAllocateByCircle() {
AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle();
String topic = "topic_test";
String currentCID = "CID";
int consumerSize = 3;
int queueSize = 13;
List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
for (int i = 0; i < queueSize; i++) {
MessageQueue mq = new MessageQueue(topic, "brokerName", i);
mqAll.add(mq);
}
List<String> cidAll = new ArrayList<String>();
for (int j = 0; j < consumerSize; j++) {
cidAll.add("CID" + j);
}
System.out.println(mqAll.toString());
System.out.println(cidAll.toString());
for (int i = 0; i < consumerSize; i++) {
List<MessageQueue> rs = circle.allocate("", currentCID + i, mqAll, cidAll);
System.out.println("rs[" + currentCID + i + "]:" + rs.toString());
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.filter;
import org.apache.rocketmq.common.filter.impl.Op;
import org.apache.rocketmq.common.filter.impl.PolishExpr;
import junit.framework.Assert;
import org.junit.Test;
import java.util.List;
/**
* @author lansheng.zj
*/
public class PolishExprTest {
private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8 && tag9";
private PolishExpr polishExpr;
public void init() {
polishExpr = new PolishExpr();
}
@Test
public void testReversePolish() {
List<Op> antiPolishExpression = polishExpr.reversePolish(expression);
System.out.println(antiPolishExpression);
}
@Test
public void testReversePolish_Performance() {
// prepare
for (int i = 0; i < 100000; i++) {
polishExpr.reversePolish(expression);
}
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
polishExpr.reversePolish(expression);
}
long cost = System.currentTimeMillis() - start;
System.out.println(cost);
// System.out.println(cost / 100000F);
Assert.assertTrue(cost < 500);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol;
/**
* @author shijia.wxr
*/
public class MQProtosHelperTest {
}
...@@ -233,16 +233,6 @@ ...@@ -233,16 +233,6 @@
<includes> <includes>
<include>**/*Test.java</include> <include>**/*Test.java</include>
</includes> </includes>
<excludes>
<exclude>org/apache/rocketmq/remoting/ExceptionTest.java</exclude>
<exclude>org/apache/rocketmq/remoting/SyncInvokeTest.java</exclude>
<exclude>org/apache/rocketmq/remoting/NettyIdleTest.java</exclude>
<exclude>org/apache/rocketmq/remoting/NettyConnectionTest.java</exclude>
<exclude>org/apache/rocketmq/common/filter/PolishExprTest.java</exclude>
<exclude>org/apache/rocketmq/common/protocol/MQProtosHelperTest.java</exclude>
<exclude>org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java</exclude>
<exclude>org/apache/rocketmq/store/RecoverTest.java</exclude>
</excludes>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: ExceptionTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
*/
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.*;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.ChannelHandlerContext;
import org.junit.Test;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertTrue;
/**
* @author shijia.wxr
*/
public class ExceptionTest {
private static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
RemotingServer client = new NettyRemotingServer(config);
client.registerProcessor(0, new NettyRequestProcessor() {
private int i = 0;
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
System.out.println("processRequest=" + request + " " + (i++));
request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
client.start();
return client;
}
@Test
public void test_CONNECT_EXCEPTION() {
RemotingClient client = createRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand response = null;
try {
response = client.invokeSync("localhost:8888", request, 1000 * 3);
} catch (RemotingConnectException e) {
e.printStackTrace();
} catch (RemotingSendRequestException e) {
e.printStackTrace();
} catch (RemotingTimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("invoke result = " + response);
assertTrue(null == response);
client.shutdown();
System.out.println("-----------------------------------------------------------------");
}
private static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
RemotingClient client = new NettyRemotingClient(config);
client.start();
return client;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author shijia.wxr
*
*/
public class NettyConnectionTest {
@Test
public void test_connect_timeout() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RemotingClient client = createRemotingClient();
for (int i = 0; i < 100; i++) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
} catch (Exception e) {
e.printStackTrace();
}
}
client.shutdown();
System.out.println("-----------------------------------------------------------------");
}
@Test
public void test_async_timeout() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RemotingClient client = createRemotingClient();
final AtomicInteger ai = new AtomicInteger(0);
final CountDownLatch latch = new CountDownLatch(100);
for (int i = 0; i < 100; i++) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout
@Override
public void operationComplete(ResponseFuture responseFuture) {
if (responseFuture.isTimeout()) {
if (ai.getAndIncrement() == 4) {
try {
System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName());
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("Timeout callback execute,very short." + Thread.currentThread().getName());
}
} else {
System.out.println("Success." + Thread.currentThread().getName());
}
latch.countDown();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
latch.await(1000, TimeUnit.MILLISECONDS);
Assert.assertEquals(1, latch.getCount());//only one should be blocked
client.shutdown();
System.out.println("-----------------------------------------------------------------");
}
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
config.setClientChannelMaxIdleTimeSeconds(15);
RemotingClient client = new NettyRemotingClient(config);
client.start();
return client;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.*;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.Executors;
import static org.junit.Assert.assertTrue;
/**
* @author shijia.wxr
*
*/
public class NettyIdleTest {
// @Test
public void test_idle_event() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
RemotingServer server = createRemotingServer();
RemotingClient client = createRemotingClient();
for (int i = 0; i < 10; i++) {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
System.out.println(i + " invoke result = " + response);
assertTrue(response != null);
Thread.sleep(1000 * 10);
}
Thread.sleep(1000 * 60);
client.shutdown();
server.shutdown();
System.out.println("-----------------------------------------------------------------");
}
public static RemotingServer createRemotingServer() throws InterruptedException {
NettyServerConfig config = new NettyServerConfig();
config.setServerChannelMaxIdleTimeSeconds(30);
RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
private int i = 0;
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
System.out.println("processRequest=" + request + " " + (i++));
request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
return request;
}
@Override
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
remotingServer.start();
return remotingServer;
}
public static RemotingClient createRemotingClient() {
NettyClientConfig config = new NettyClientConfig();
config.setClientChannelMaxIdleTimeSeconds(15);
RemotingClient client = new NettyRemotingClient(config);
client.start();
return client;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: SyncInvokeTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
*/
package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/**
* @author shijia.wxr
*/
public class SyncInvokeTest {
@Test
public void test_RPC_Sync() throws Exception {
RemotingServer server = NettyRPCTest.createRemotingServer();
RemotingClient client = NettyRPCTest.createRemotingClient();
for (int i = 0; i < 100; i++) {
try {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3);
System.out.println(i + "\t" + "invoke result = " + response);
assertTrue(response != null);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
client.shutdown();
server.shutdown();
System.out.println("-----------------------------------------------------------------");
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: RecoverTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
@Ignore("This test need to be fixed!")
public class RecoverTest {
private static final String StoreMessage = "Once, there was a chance for me!aaaaaaaaaaaaaaaaaaaaaaaa";
private static int QUEUE_TOTAL = 10;
private static AtomicInteger QueueId = new AtomicInteger(0);
private static SocketAddress BornHost;
private static SocketAddress StoreHost;
private static byte[] MessageBody;
private MessageStore storeWrite1;
private MessageStore storeWrite2;
private MessageStore storeRead;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
}
@Test
public void test_recover_normally() throws Exception {
this.writeMessage(true, true);
Thread.sleep(1000 * 3);
this.readMessage(1000);
this.destroy();
}
public void writeMessage(boolean normal, boolean first) throws Exception {
System.out.println("================================================================");
long totalMsgs = 100;
QUEUE_TOTAL = 3;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
messageStoreConfig.setMessageIndexEnable(false);
MessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null);
if (first) {
this.storeWrite1 = messageStore;
} else {
this.storeWrite2 = messageStore;
}
boolean loadResult = messageStore.load();
assertTrue(loadResult);
messageStore.start();
for (long i = 0; i < totalMsgs; i++) {
PutMessageResult result = messageStore.putMessage(buildMessage());
System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId());
}
if (normal) {
messageStore.shutdown();
}
System.out.println("========================writeMessage OK========================================");
}
public void readMessage(final long msgCnt) throws Exception {
System.out.println("================================================================");
QUEUE_TOTAL = 3;
MessageBody = StoreMessage.getBytes();
MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32);
messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20);
messageStoreConfig.setMessageIndexEnable(false);
storeRead = new DefaultMessageStore(messageStoreConfig, null, null, null);
boolean loadResult = storeRead.load();
assertTrue(loadResult);
storeRead.start();
long readCnt = 0;
for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) {
for (long offset = 0; ; ) {
GetMessageResult result = storeRead.getMessage("GROUP_A", "TOPIC_A", queueId, offset, 1024 * 1024, null);
if (result.getStatus() == GetMessageStatus.FOUND) {
System.out.println(queueId + "\t" + result.getMessageCount());
this.veryReadMessage(queueId, offset, result.getMessageBufferList());
offset += result.getMessageCount();
readCnt += result.getMessageCount();
result.release();
} else {
break;
}
}
}
System.out.println("readCnt = " + readCnt);
assertTrue(readCnt == msgCnt);
System.out.println("========================readMessage OK========================================");
}
private void destroy() {
if (storeWrite1 != null) {
storeWrite1.shutdown();
storeWrite1.destroy();
}
if (storeWrite2 != null) {
storeWrite2.shutdown();
storeWrite2.destroy();
}
if (storeRead != null) {
storeRead.shutdown();
storeRead.destroy();
}
}
public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic("TOPIC_A");
msg.setTags("TAG1");
msg.setKeys("Hello");
msg.setBody(MessageBody);
msg.setKeys(String.valueOf(System.currentTimeMillis()));
msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL);
msg.setSysFlag(4);
msg.setBornTimestamp(System.currentTimeMillis());
msg.setStoreHost(StoreHost);
msg.setBornHost(BornHost);
return msg;
}
private void veryReadMessage(int queueId, long queueOffset, List<ByteBuffer> byteBuffers) {
for (ByteBuffer byteBuffer : byteBuffers) {
MessageExt msg = MessageDecoder.decode(byteBuffer);
System.out.println("request queueId " + queueId + ", request queueOffset " + queueOffset + " msg queue offset "
+ msg.getQueueOffset());
assertTrue(msg.getQueueOffset() == queueOffset);
queueOffset++;
}
}
@Test
public void test_recover_normally_write() throws Exception {
this.writeMessage(true, true);
Thread.sleep(1000 * 3);
this.writeMessage(true, false);
Thread.sleep(1000 * 3);
this.readMessage(2000);
this.destroy();
}
@Test
public void test_recover_abnormally() throws Exception {
this.writeMessage(false, true);
Thread.sleep(1000 * 3);
this.readMessage(1000);
this.destroy();
}
@Test
public void test_recover_abnormally_write() throws Exception {
this.writeMessage(false, true);
Thread.sleep(1000 * 3);
this.writeMessage(false, false);
Thread.sleep(1000 * 3);
this.readMessage(2000);
this.destroy();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册