提交 2988a1e6 编写于 作者: S shutian.lzh

Make unit tests compilable

上级 b1e77b4c
......@@ -27,8 +27,8 @@ import java.nio.charset.Charset;
public class SimpleProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer();
......
......@@ -18,12 +18,9 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
......@@ -50,11 +47,11 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPullConsumer(queueName,
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
consumer.attachQueue(queueName);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
......@@ -83,18 +80,18 @@ public class PullConsumerImplTest {
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.poll();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
Message message = consumer.receive();
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout.
Message message = consumer.poll();
Message message = consumer.receive();
assertThat(message).isNull();
message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
assertThat(message).isNull();
}
}
\ No newline at end of file
......@@ -18,13 +18,10 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
......@@ -49,7 +46,7 @@ public class PushConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
......@@ -75,8 +72,8 @@ public class PushConsumerImplTest {
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
public void onReceived(Message message, Context context) {
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
context.ack();
}
......
......@@ -17,9 +17,9 @@
package io.openmessaging.rocketmq.producer;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.OMS;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.producer.Producer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -49,7 +49,7 @@ public class ProducerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
......@@ -67,8 +67,8 @@ public class ProducerImplTest {
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.SendResult omsResult =
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
io.openmessaging.producer.SendResult omsResult =
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
......@@ -80,7 +80,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......@@ -91,7 +91,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.SequenceProducer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SequenceProducerImplTest {
private SequenceProducer producer;
@Mock
private DefaultMQProducer rocketmqProducer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createSequenceProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
}
@Test
public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.commit();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
}
@Test
public void testRollback() {
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.rollback();
producer.commit(); //Commit nothing.
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
}
}
\ No newline at end of file
......@@ -16,8 +16,9 @@
*/
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.junit.Before;
import org.junit.Test;
......@@ -63,14 +64,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener() throws Exception {
promise.addListener(new PromiseListener<String>() {
promise.addListener(new FutureListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(promise.get()).isEqualTo("Done");
}
@Override
public void operationFailed(final Promise<String> promise) {
}
});
......@@ -80,15 +77,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_ListenerAfterSet() throws Exception {
promise.set("Done");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
assertThat(promise.get()).isEqualTo("Done");
}
promise.addListener(new FutureListener<String>() {
@Override
public void operationFailed(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(future.get()).isEqualTo("Done");
}
});
}
......@@ -97,13 +89,9 @@ public class DefaultPromiseTest {
public void testAddListener_WithException_ListenerAfterSet() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.setFailure(exception);
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
promise.addListener(new FutureListener<String>() {
@Override
public void operationFailed(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
......@@ -112,13 +100,9 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
promise.addListener(new FutureListener<String>() {
@Override
public void operationFailed(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册