diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java deleted file mode 100644 index 7b568c59c3c5403e26a6197f892067e96eaff317..0000000000000000000000000000000000000000 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java +++ /dev/null @@ -1,272 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/* - * @author yubao.fyb@taoboa.com - * @version $id$ - */ -package org.apache.rocketmq.client.consumer.loadbalance; - -import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; -import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle; -import org.apache.rocketmq.common.message.MessageQueue; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - - -/** - * @author yubao.fyb@alibaba-inc.com created on 2013-07-03 16:24 - */ -public class AllocateMessageQueueAveragelyTest { - private AllocateMessageQueueStrategy allocateMessageQueueAveragely; - private String currentCID; - private String topic; - private List messageQueueList; - private List consumerIdList; - - @Before - public void init() { - allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); - topic = "topic_test"; - } - - @Test - public void testConsumer1() { - currentCID = "0"; - createConsumerIdList(1); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer1"); - Assert.assertEquals(result.size(), 5); - Assert.assertEquals(result.containsAll(getMessageQueueList()), true); - } - - public void createConsumerIdList(int size) { - consumerIdList = new ArrayList(size); - for (int i = 0; i < size; i++) { - consumerIdList.add(String.valueOf(i)); - } - } - - public void createMessageQueueList(int size) { - messageQueueList = new ArrayList(size); - for (int i = 0; i < size; i++) { - MessageQueue mq = new MessageQueue(topic, "brokerName", i); - messageQueueList.add(mq); - } - } - - public void printMessageQueue(List messageQueueList, String name) { - if (messageQueueList == null || messageQueueList.size() < 1) - return; - System.out.println(name + ".......................................start"); - for (MessageQueue messageQueue : messageQueueList) { - System.out.println(messageQueue); - } - System.out.println(name + ".......................................end"); - } - - public List getMessageQueueList() { - return messageQueueList; - } - - public void setMessageQueueList(List messageQueueList) { - this.messageQueueList = messageQueueList; - } - - @Test - public void testConsumer2() { - currentCID = "1"; - createConsumerIdList(2); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer2"); - Assert.assertEquals(result.size(), 3); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); - - } - - @Test - public void testConsumer3CurrentCID0() { - currentCID = "0"; - createConsumerIdList(3); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer3CurrentCID0"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(0, 1)), true); - } - - @Test - public void testConsumer3CurrentCID1() { - currentCID = "1"; - createConsumerIdList(3); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer3CurrentCID1"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); - } - - @Test - public void testConsumer3CurrentCID2() { - currentCID = "2"; - createConsumerIdList(3); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer3CurrentCID2"); - Assert.assertEquals(result.size(), 3); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(2, 5)), true); - } - - @Test - public void testConsumer4() { - currentCID = "1"; - createConsumerIdList(4); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer4"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); - } - - @Test - public void testConsumer5() { - currentCID = "1"; - createConsumerIdList(5); - createMessageQueueList(5); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer5"); - Assert.assertEquals(result.size(), 1); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(1, 2)), true); - } - - @Test - public void testConsumer6() { - currentCID = "1"; - createConsumerIdList(2); - createMessageQueueList(6); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testConsumer"); - Assert.assertEquals(result.size(), 3); - Assert.assertEquals(result.containsAll(getMessageQueueList().subList(3, 6)), true); - } - - @Test - public void testCurrentCIDNotExists() { - currentCID = String.valueOf(Integer.MAX_VALUE); - createConsumerIdList(2); - createMessageQueueList(6); - List result = - allocateMessageQueueAveragely.allocate("", currentCID, messageQueueList, consumerIdList); - printMessageQueue(result, "testCurrentCIDNotExists"); - Assert.assertEquals(result.size(), 0); - } - - @Test(expected = IllegalArgumentException.class) - public void testCurrentCIDIllegalArgument() { - createConsumerIdList(2); - createMessageQueueList(6); - allocateMessageQueueAveragely.allocate("", "", getMessageQueueList(), getConsumerIdList()); - } - - public List getConsumerIdList() { - return consumerIdList; - } - - public void setConsumerIdList(List consumerIdList) { - this.consumerIdList = consumerIdList; - } - - @Test(expected = IllegalArgumentException.class) - public void testMessageQueueIllegalArgument() { - currentCID = "0"; - createConsumerIdList(2); - allocateMessageQueueAveragely.allocate("", currentCID, null, getConsumerIdList()); - } - - @Test(expected = IllegalArgumentException.class) - public void testConsumerIdIllegalArgument() { - currentCID = "0"; - createMessageQueueList(6); - allocateMessageQueueAveragely.allocate("", currentCID, getMessageQueueList(), null); - } - - @Test - public void testAllocate() { - AllocateMessageQueueAveragely allocateMessageQueueAveragely = new AllocateMessageQueueAveragely(); - String topic = "topic_test"; - String currentCID = "CID"; - int queueSize = 19; - int consumerSize = 10; - List mqAll = new ArrayList(); - for (int i = 0; i < queueSize; i++) { - MessageQueue mq = new MessageQueue(topic, "brokerName", i); - mqAll.add(mq); - } - - List cidAll = new ArrayList(); - for (int j = 0; j < consumerSize; j++) { - cidAll.add("CID" + j); - } - System.out.println(mqAll.toString()); - System.out.println(cidAll.toString()); - for (int i = 0; i < consumerSize; i++) { - List rs = allocateMessageQueueAveragely.allocate("", currentCID + i, mqAll, cidAll); - System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); - } - } - - - @Test - public void testAllocateByCircle() { - AllocateMessageQueueAveragelyByCircle circle = new AllocateMessageQueueAveragelyByCircle(); - String topic = "topic_test"; - String currentCID = "CID"; - int consumerSize = 3; - int queueSize = 13; - List mqAll = new ArrayList(); - for (int i = 0; i < queueSize; i++) { - MessageQueue mq = new MessageQueue(topic, "brokerName", i); - mqAll.add(mq); - } - - List cidAll = new ArrayList(); - for (int j = 0; j < consumerSize; j++) { - cidAll.add("CID" + j); - } - System.out.println(mqAll.toString()); - System.out.println(cidAll.toString()); - for (int i = 0; i < consumerSize; i++) { - List rs = circle.allocate("", currentCID + i, mqAll, cidAll); - System.out.println("rs[" + currentCID + i + "]:" + rs.toString()); - } - } -} diff --git a/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java b/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java deleted file mode 100644 index f2ba2a3a58762b93eeadc8c934ad94e1fa63c432..0000000000000000000000000000000000000000 --- a/common/src/test/java/org/apache/rocketmq/common/filter/PolishExprTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.common.filter; - -import org.apache.rocketmq.common.filter.impl.Op; -import org.apache.rocketmq.common.filter.impl.PolishExpr; -import junit.framework.Assert; -import org.junit.Test; - -import java.util.List; - - -/** - * @author lansheng.zj - */ -public class PolishExprTest { - - private String expression = "tag1||(tag2&&tag3)&&tag4||tag5&&(tag6 && tag7)|| tag8 && tag9"; - private PolishExpr polishExpr; - - - public void init() { - polishExpr = new PolishExpr(); - } - - - @Test - public void testReversePolish() { - List antiPolishExpression = polishExpr.reversePolish(expression); - System.out.println(antiPolishExpression); - } - - - @Test - public void testReversePolish_Performance() { - // prepare - for (int i = 0; i < 100000; i++) { - polishExpr.reversePolish(expression); - } - - long start = System.currentTimeMillis(); - for (int i = 0; i < 100000; i++) { - polishExpr.reversePolish(expression); - } - long cost = System.currentTimeMillis() - start; - System.out.println(cost); - // System.out.println(cost / 100000F); - - Assert.assertTrue(cost < 500); - } - -} diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java deleted file mode 100644 index cd56627fb4b84b766610d82f9e97d5c777701349..0000000000000000000000000000000000000000 --- a/common/src/test/java/org/apache/rocketmq/common/protocol/MQProtosHelperTest.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.common.protocol; - -/** - * @author shijia.wxr - */ -public class MQProtosHelperTest { - -} diff --git a/pom.xml b/pom.xml index 88122134366dc8ff0bb0519447972c78cddb68b6..39ea763e37fd9c4dd4fe91717c5d68ccbdeda9e9 100644 --- a/pom.xml +++ b/pom.xml @@ -233,16 +233,6 @@ **/*Test.java - - org/apache/rocketmq/remoting/ExceptionTest.java - org/apache/rocketmq/remoting/SyncInvokeTest.java - org/apache/rocketmq/remoting/NettyIdleTest.java - org/apache/rocketmq/remoting/NettyConnectionTest.java - org/apache/rocketmq/common/filter/PolishExprTest.java - org/apache/rocketmq/common/protocol/MQProtosHelperTest.java - org/apache/rocketmq/client/consumer/loadbalance/AllocateMessageQueueAveragelyTest.java - org/apache/rocketmq/store/RecoverTest.java - diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/ExceptionTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/ExceptionTest.java deleted file mode 100644 index d5c1f3764b41b99c9e9c41490d7e4af02081eaaf..0000000000000000000000000000000000000000 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/ExceptionTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: ExceptionTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package org.apache.rocketmq.remoting; - -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.netty.*; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; -import org.junit.Test; - -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertTrue; - - -/** - * @author shijia.wxr - */ -public class ExceptionTest { - private static RemotingServer createRemotingServer() throws InterruptedException { - NettyServerConfig config = new NettyServerConfig(); - RemotingServer client = new NettyRemotingServer(config); - client.registerProcessor(0, new NettyRequestProcessor() { - private int i = 0; - - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { - System.out.println("processRequest=" + request + " " + (i++)); - request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress()); - return request; - } - - @Override - public boolean rejectRequest() { - return false; - } - }, Executors.newCachedThreadPool()); - client.start(); - return client; - } - - @Test - public void test_CONNECT_EXCEPTION() { - RemotingClient client = createRemotingClient(); - - RemotingCommand request = RemotingCommand.createRequestCommand(0, null); - RemotingCommand response = null; - try { - response = client.invokeSync("localhost:8888", request, 1000 * 3); - } catch (RemotingConnectException e) { - e.printStackTrace(); - } catch (RemotingSendRequestException e) { - e.printStackTrace(); - } catch (RemotingTimeoutException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - System.out.println("invoke result = " + response); - assertTrue(null == response); - - client.shutdown(); - System.out.println("-----------------------------------------------------------------"); - } - - private static RemotingClient createRemotingClient() { - NettyClientConfig config = new NettyClientConfig(); - RemotingClient client = new NettyRemotingClient(config); - client.start(); - return client; - } - -} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyConnectionTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyConnectionTest.java deleted file mode 100644 index 3ceba50174cd31e2edb855d1f39c71e6b5227d27..0000000000000000000000000000000000000000 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyConnectionTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting; - -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.netty.NettyClientConfig; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; -import org.apache.rocketmq.remoting.netty.ResponseFuture; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.junit.Assert; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - - * - * @author shijia.wxr - * - */ -public class NettyConnectionTest { - @Test - public void test_connect_timeout() throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException { - RemotingClient client = createRemotingClient(); - - for (int i = 0; i < 100; i++) { - try { - RemotingCommand request = RemotingCommand.createRequestCommand(0, null); - RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); - } catch (Exception e) { - e.printStackTrace(); - } - } - - client.shutdown(); - System.out.println("-----------------------------------------------------------------"); - } - - @Test - public void test_async_timeout() throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException { - RemotingClient client = createRemotingClient(); - final AtomicInteger ai = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(100); - for (int i = 0; i < 100; i++) { - try { - RemotingCommand request = RemotingCommand.createRequestCommand(0, null); - client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout - @Override - public void operationComplete(ResponseFuture responseFuture) { - if (responseFuture.isTimeout()) { - if (ai.getAndIncrement() == 4) { - try { - System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName()); - Thread.sleep(10 * 1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } else { - System.out.println("Timeout callback execute,very short." + Thread.currentThread().getName()); - } - } else { - System.out.println("Success." + Thread.currentThread().getName()); - } - latch.countDown(); - - } - }); - } catch (Exception e) { - e.printStackTrace(); - } - } - - - latch.await(1000, TimeUnit.MILLISECONDS); - Assert.assertEquals(1, latch.getCount());//only one should be blocked - client.shutdown(); - System.out.println("-----------------------------------------------------------------"); - } - - public static RemotingClient createRemotingClient() { - NettyClientConfig config = new NettyClientConfig(); - config.setClientChannelMaxIdleTimeSeconds(15); - RemotingClient client = new NettyRemotingClient(config); - client.start(); - return client; - } -} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyIdleTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyIdleTest.java deleted file mode 100644 index 741dc14136b2f127996bfb5b4335fa155292d3cc..0000000000000000000000000000000000000000 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyIdleTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting; - -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; -import org.apache.rocketmq.remoting.netty.*; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import io.netty.channel.ChannelHandlerContext; - -import java.util.concurrent.Executors; - -import static org.junit.Assert.assertTrue; - - -/** - * @author shijia.wxr - * - */ -public class NettyIdleTest { - // @Test - public void test_idle_event() throws InterruptedException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException { - RemotingServer server = createRemotingServer(); - RemotingClient client = createRemotingClient(); - - for (int i = 0; i < 10; i++) { - RemotingCommand request = RemotingCommand.createRequestCommand(0, null); - RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); - System.out.println(i + " invoke result = " + response); - assertTrue(response != null); - - Thread.sleep(1000 * 10); - } - - Thread.sleep(1000 * 60); - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); - } - - public static RemotingServer createRemotingServer() throws InterruptedException { - NettyServerConfig config = new NettyServerConfig(); - config.setServerChannelMaxIdleTimeSeconds(30); - RemotingServer remotingServer = new NettyRemotingServer(config); - remotingServer.registerProcessor(0, new NettyRequestProcessor() { - private int i = 0; - - - @Override - public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { - System.out.println("processRequest=" + request + " " + (i++)); - request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress()); - return request; - } - - @Override - public boolean rejectRequest() { - return false; - } - }, Executors.newCachedThreadPool()); - remotingServer.start(); - return remotingServer; - } - - public static RemotingClient createRemotingClient() { - NettyClientConfig config = new NettyClientConfig(); - config.setClientChannelMaxIdleTimeSeconds(15); - RemotingClient client = new NettyRemotingClient(config); - client.start(); - return client; - } - -} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/SyncInvokeTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/SyncInvokeTest.java deleted file mode 100644 index 6d454f40d51afcc58c4f30d3a5cefee8dc789567..0000000000000000000000000000000000000000 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/SyncInvokeTest.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: SyncInvokeTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package org.apache.rocketmq.remoting; - -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - - -/** - * @author shijia.wxr - */ -public class SyncInvokeTest { - @Test - public void test_RPC_Sync() throws Exception { - RemotingServer server = NettyRPCTest.createRemotingServer(); - RemotingClient client = NettyRPCTest.createRemotingClient(); - - for (int i = 0; i < 100; i++) { - try { - RemotingCommand request = RemotingCommand.createRequestCommand(0, null); - RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); - System.out.println(i + "\t" + "invoke result = " + response); - assertTrue(response != null); - } catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); - } -} diff --git a/store/src/test/java/org/apache/rocketmq/store/RecoverTest.java b/store/src/test/java/org/apache/rocketmq/store/RecoverTest.java deleted file mode 100644 index 699422c6eaee4573b8347a6115634839d18fac02..0000000000000000000000000000000000000000 --- a/store/src/test/java/org/apache/rocketmq/store/RecoverTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * $Id: RecoverTest.java 1831 2013-05-16 01:39:51Z shijia.wxr $ - */ -package org.apache.rocketmq.store; - -import org.apache.rocketmq.common.message.MessageDecoder; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.store.config.MessageStoreConfig; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertTrue; - -@Ignore("This test need to be fixed!") -public class RecoverTest { - private static final String StoreMessage = "Once, there was a chance for me!aaaaaaaaaaaaaaaaaaaaaaaa"; - - private static int QUEUE_TOTAL = 10; - - private static AtomicInteger QueueId = new AtomicInteger(0); - - private static SocketAddress BornHost; - - private static SocketAddress StoreHost; - - private static byte[] MessageBody; - private MessageStore storeWrite1; - private MessageStore storeWrite2; - private MessageStore storeRead; - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); - BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - } - - @Test - public void test_recover_normally() throws Exception { - this.writeMessage(true, true); - Thread.sleep(1000 * 3); - this.readMessage(1000); - this.destroy(); - } - - public void writeMessage(boolean normal, boolean first) throws Exception { - System.out.println("================================================================"); - long totalMsgs = 100; - QUEUE_TOTAL = 3; - - MessageBody = StoreMessage.getBytes(); - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32); - messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20); - messageStoreConfig.setMessageIndexEnable(false); - - MessageStore messageStore = new DefaultMessageStore(messageStoreConfig, null, null, null); - if (first) { - this.storeWrite1 = messageStore; - } else { - this.storeWrite2 = messageStore; - } - - boolean loadResult = messageStore.load(); - assertTrue(loadResult); - messageStore.start(); - for (long i = 0; i < totalMsgs; i++) { - PutMessageResult result = messageStore.putMessage(buildMessage()); - System.out.println(i + "\t" + result.getAppendMessageResult().getMsgId()); - } - - if (normal) { - messageStore.shutdown(); - } - System.out.println("========================writeMessage OK========================================"); - } - - public void readMessage(final long msgCnt) throws Exception { - System.out.println("================================================================"); - QUEUE_TOTAL = 3; - MessageBody = StoreMessage.getBytes(); - MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); - messageStoreConfig.setMapedFileSizeCommitLog(1024 * 32); - messageStoreConfig.setMapedFileSizeConsumeQueue(100 * 20); - messageStoreConfig.setMessageIndexEnable(false); - storeRead = new DefaultMessageStore(messageStoreConfig, null, null, null); - boolean loadResult = storeRead.load(); - assertTrue(loadResult); - storeRead.start(); - - long readCnt = 0; - for (int queueId = 0; queueId < QUEUE_TOTAL; queueId++) { - for (long offset = 0; ; ) { - GetMessageResult result = storeRead.getMessage("GROUP_A", "TOPIC_A", queueId, offset, 1024 * 1024, null); - if (result.getStatus() == GetMessageStatus.FOUND) { - System.out.println(queueId + "\t" + result.getMessageCount()); - this.veryReadMessage(queueId, offset, result.getMessageBufferList()); - offset += result.getMessageCount(); - readCnt += result.getMessageCount(); - result.release(); - } else { - break; - } - } - } - - System.out.println("readCnt = " + readCnt); - assertTrue(readCnt == msgCnt); - System.out.println("========================readMessage OK========================================"); - } - - private void destroy() { - if (storeWrite1 != null) { - storeWrite1.shutdown(); - storeWrite1.destroy(); - } - - if (storeWrite2 != null) { - storeWrite2.shutdown(); - storeWrite2.destroy(); - } - - if (storeRead != null) { - storeRead.shutdown(); - storeRead.destroy(); - } - } - - public MessageExtBrokerInner buildMessage() { - MessageExtBrokerInner msg = new MessageExtBrokerInner(); - msg.setTopic("TOPIC_A"); - msg.setTags("TAG1"); - msg.setKeys("Hello"); - msg.setBody(MessageBody); - msg.setKeys(String.valueOf(System.currentTimeMillis())); - msg.setQueueId(Math.abs(QueueId.getAndIncrement()) % QUEUE_TOTAL); - msg.setSysFlag(4); - msg.setBornTimestamp(System.currentTimeMillis()); - msg.setStoreHost(StoreHost); - msg.setBornHost(BornHost); - - return msg; - } - - private void veryReadMessage(int queueId, long queueOffset, List byteBuffers) { - for (ByteBuffer byteBuffer : byteBuffers) { - MessageExt msg = MessageDecoder.decode(byteBuffer); - System.out.println("request queueId " + queueId + ", request queueOffset " + queueOffset + " msg queue offset " - + msg.getQueueOffset()); - - assertTrue(msg.getQueueOffset() == queueOffset); - - queueOffset++; - } - } - - @Test - public void test_recover_normally_write() throws Exception { - this.writeMessage(true, true); - Thread.sleep(1000 * 3); - this.writeMessage(true, false); - Thread.sleep(1000 * 3); - this.readMessage(2000); - this.destroy(); - } - - @Test - public void test_recover_abnormally() throws Exception { - this.writeMessage(false, true); - Thread.sleep(1000 * 3); - this.readMessage(1000); - this.destroy(); - } - - @Test - public void test_recover_abnormally_write() throws Exception { - this.writeMessage(false, true); - Thread.sleep(1000 * 3); - this.writeMessage(false, false); - Thread.sleep(1000 * 3); - this.readMessage(2000); - this.destroy(); - } -}