From 2988a1e6caa8d21413ef6ff60cc04a8cfb3dae20 Mon Sep 17 00:00:00 2001 From: "shutian.lzh" Date: Sun, 15 Apr 2018 17:00:13 +0800 Subject: [PATCH] Make unit tests compilable --- .../example/openmessaging/SimpleProducer.java | 4 +- .../consumer/PullConsumerImplTest.java | 19 ++-- .../consumer/PushConsumerImplTest.java | 13 ++- .../rocketmq/producer/ProducerImplTest.java | 14 +-- .../producer/SequenceProducerImplTest.java | 86 ------------------- .../rocketmq/promise/DefaultPromiseTest.java | 38 +++----- 6 files changed, 33 insertions(+), 141 deletions(-) delete mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index 2884797d..c7855048 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -27,8 +27,8 @@ import java.nio.charset.Charset; public class SimpleProducer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = + OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final Producer producer = messagingAccessPoint.createProducer(); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 843ddb78..7e81b40b 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -18,12 +18,9 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PropertyKeys; -import io.openmessaging.PullConsumer; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.lang.reflect.Field; @@ -50,11 +47,11 @@ public class PullConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); - consumer = messagingAccessPoint.createPullConsumer(queueName, - OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + consumer.attachQueue(queueName); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); field.setAccessible(true); @@ -83,18 +80,18 @@ public class PullConsumerImplTest { when(localMessageCache.poll()).thenReturn(consumedMsg); - Message message = consumer.poll(); - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + Message message = consumer.receive(); + assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); } @Test public void testPoll_WithTimeout() { //There is a default timeout value, @see ClientConfig#omsOperationTimeout. - Message message = consumer.poll(); + Message message = consumer.receive(); assertThat(message).isNull(); - message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100)); + message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100)); assertThat(message).isNull(); } } \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 882e57ea..5caa2b69 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -18,13 +18,10 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.MessageListener; +import io.openmessaging.consumer.MessageListener; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PushConsumer; -import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.lang.reflect.Field; import java.util.Collections; @@ -49,7 +46,7 @@ public class PushConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); consumer = messagingAccessPoint.createPushConsumer( OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); @@ -75,8 +72,8 @@ public class PushConsumerImplTest { consumedMsg.setTopic("HELLO_QUEUE"); consumer.attachQueue("HELLO_QUEUE", new MessageListener() { @Override - public void onMessage(final Message message, final ReceivedMessageContext context) { - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + public void onReceived(Message message, Context context) { + assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); context.ack(); } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java index 1db80c3e..7b361792 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -17,9 +17,9 @@ package io.openmessaging.rocketmq.producer; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; -import io.openmessaging.Producer; +import io.openmessaging.OMS; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.producer.Producer; import java.lang.reflect.Field; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -49,7 +49,7 @@ public class ProducerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); producer = messagingAccessPoint.createProducer(); @@ -67,8 +67,8 @@ public class ProducerImplTest { sendResult.setMsgId("TestMsgID"); sendResult.setSendStatus(SendStatus.SEND_OK); when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); - io.openmessaging.SendResult omsResult = - producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + io.openmessaging.producer.SendResult omsResult = + producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); } @@ -80,7 +80,7 @@ public class ProducerImplTest { when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); try { - producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); failBecauseExceptionWasNotThrown(OMSRuntimeException.class); } catch (Exception e) { assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); @@ -91,7 +91,7 @@ public class ProducerImplTest { public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); try { - producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); failBecauseExceptionWasNotThrown(OMSRuntimeException.class); } catch (Exception e) { assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java deleted file mode 100644 index 823fe015..00000000 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.rocketmq.producer; - -import io.openmessaging.BytesMessage; -import io.openmessaging.MessageHeader; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; -import io.openmessaging.SequenceProducer; -import java.lang.reflect.Field; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class SequenceProducerImplTest { - - private SequenceProducer producer; - - @Mock - private DefaultMQProducer rocketmqProducer; - - @Before - public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); - producer = messagingAccessPoint.createSequenceProducer(); - - Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); - field.setAccessible(true); - field.set(producer, rocketmqProducer); - - messagingAccessPoint.startup(); - producer.startup(); - } - - @Test - public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - SendResult sendResult = new SendResult(); - sendResult.setMsgId("TestMsgID"); - sendResult.setSendStatus(SendStatus.SEND_OK); - when(rocketmqProducer.send(ArgumentMatchers.anyList())).thenReturn(sendResult); - when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); - final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); - producer.send(message); - producer.commit(); - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID"); - } - - @Test - public void testRollback() { - when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); - final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); - producer.send(message); - producer.rollback(); - producer.commit(); //Commit nothing. - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null); - } -} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java index 2240ff2d..f226edef 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java @@ -16,8 +16,9 @@ */ package io.openmessaging.rocketmq.promise; +import io.openmessaging.Future; +import io.openmessaging.FutureListener; import io.openmessaging.Promise; -import io.openmessaging.PromiseListener; import io.openmessaging.exception.OMSRuntimeException; import org.junit.Before; import org.junit.Test; @@ -63,14 +64,10 @@ public class DefaultPromiseTest { @Test public void testAddListener() throws Exception { - promise.addListener(new PromiseListener() { + promise.addListener(new FutureListener() { @Override - public void operationCompleted(final Promise promise) { + public void operationComplete(Future future) { assertThat(promise.get()).isEqualTo("Done"); - } - - @Override - public void operationFailed(final Promise promise) { } }); @@ -80,15 +77,10 @@ public class DefaultPromiseTest { @Test public void testAddListener_ListenerAfterSet() throws Exception { promise.set("Done"); - promise.addListener(new PromiseListener() { - @Override - public void operationCompleted(final Promise promise) { - assertThat(promise.get()).isEqualTo("Done"); - } - + promise.addListener(new FutureListener() { @Override - public void operationFailed(final Promise promise) { - + public void operationComplete(Future future) { + assertThat(future.get()).isEqualTo("Done"); } }); } @@ -97,13 +89,9 @@ public class DefaultPromiseTest { public void testAddListener_WithException_ListenerAfterSet() throws Exception { final Throwable exception = new OMSRuntimeException("-1", "Test Error"); promise.setFailure(exception); - promise.addListener(new PromiseListener() { - @Override - public void operationCompleted(final Promise promise) { - } - + promise.addListener(new FutureListener() { @Override - public void operationFailed(final Promise promise) { + public void operationComplete(Future future) { assertThat(promise.getThrowable()).isEqualTo(exception); } }); @@ -112,13 +100,9 @@ public class DefaultPromiseTest { @Test public void testAddListener_WithException() throws Exception { final Throwable exception = new OMSRuntimeException("-1", "Test Error"); - promise.addListener(new PromiseListener() { - @Override - public void operationCompleted(final Promise promise) { - } - + promise.addListener(new FutureListener() { @Override - public void operationFailed(final Promise promise) { + public void operationComplete(Future future) { assertThat(promise.getThrowable()).isEqualTo(exception); } }); -- GitLab