提交 6edeb831 编写于 作者: Y yukon

Add openmessaging unit tests.

上级 9c1dc747
...@@ -62,7 +62,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { ...@@ -62,7 +62,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error(String.format("Send message to RocketMQ failed, %s", message)); log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ failed."); throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
} }
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult); return OMSUtil.sendResultConvert(rmqResult);
......
...@@ -72,6 +72,10 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements Sequenc ...@@ -72,6 +72,10 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements Sequenc
rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
} }
if (rmqMessages.size() == 0) {
return;
}
try { try {
SendResult sendResult = this.rocketmqProducer.send(rmqMessages); SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
String [] msgIdArray = sendResult.getMsgId().split(","); String [] msgIdArray = sendResult.getMsgId().split(",");
......
...@@ -32,7 +32,7 @@ public class DefaultPromise<V> implements Promise<V> { ...@@ -32,7 +32,7 @@ public class DefaultPromise<V> implements Promise<V> {
private long timeout; private long timeout;
private long createTime; private long createTime;
private Throwable exception = null; private Throwable exception = null;
private List<PromiseListener> promiseListenerList; private List<PromiseListener<V>> promiseListenerList;
public DefaultPromise() { public DefaultPromise() {
createTime = System.currentTimeMillis(); createTime = System.currentTimeMillis();
...@@ -120,7 +120,7 @@ public class DefaultPromise<V> implements Promise<V> { ...@@ -120,7 +120,7 @@ public class DefaultPromise<V> implements Promise<V> {
} }
@Override @Override
public void addListener(final PromiseListener listener) { public void addListener(final PromiseListener<V> listener) {
if (listener == null) { if (listener == null) {
throw new NullPointerException("FutureListener is null"); throw new NullPointerException("FutureListener is null");
} }
...@@ -149,7 +149,7 @@ public class DefaultPromise<V> implements Promise<V> { ...@@ -149,7 +149,7 @@ public class DefaultPromise<V> implements Promise<V> {
private void notifyListeners() { private void notifyListeners() {
if (promiseListenerList != null) { if (promiseListenerList != null) {
for (PromiseListener listener : promiseListenerList) { for (PromiseListener<V> listener : promiseListenerList) {
notifyListener(listener); notifyListener(listener);
} }
} }
...@@ -165,7 +165,7 @@ public class DefaultPromise<V> implements Promise<V> { ...@@ -165,7 +165,7 @@ public class DefaultPromise<V> implements Promise<V> {
return; return;
} }
state = FutureState.CANCELLED; state = FutureState.CANCELLED;
exception = new RuntimeException("get request result is timeout or interrupted"); exception = new RuntimeException("Get request result is timeout or interrupted");
lock.notifyAll(); lock.notifyAll();
} }
notifyListeners(); notifyListeners();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class LocalMessageCacheTest {
private LocalMessageCache localMessageCache;
@Mock
private DefaultMQPullConsumer rocketmqPullConsume;
@Mock
private ConsumeRequest consumeRequest;
@Before
public void init() {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setRmqPullMessageBatchNums(512);
clientConfig.setRmqPullMessageCacheCapacity(1024);
localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig);
}
@Test
public void testNextPullBatchNums() throws Exception {
assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512);
for (int i = 0; i < 513; i++) {
localMessageCache.submitConsumeRequest(consumeRequest);
}
assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511);
}
@Test
public void testNextPullOffset() throws Exception {
MessageQueue messageQueue = new MessageQueue();
when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean()))
.thenReturn(123L);
assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L);
}
@Test
public void testUpdatePullOffset() throws Exception {
MessageQueue messageQueue = new MessageQueue();
localMessageCache.updatePullOffset(messageQueue, 124L);
assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L);
}
@Test
public void testSubmitConsumeRequest() throws Exception {
byte [] body = new byte[]{'1', '2', '3'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(body);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
localMessageCache.submitConsumeRequest(consumeRequest);
assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest {
private PullConsumer consumer;
private String queueName = "HELLO_QUEUE";
@Mock
private DefaultMQPullConsumer rocketmqPullConsumer;
private LocalMessageCache localMessageCache =
spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig()));
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPullConsumer(queueName,
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
field.set(consumer, rocketmqPullConsumer); //Replace
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
field.setAccessible(true);
field.set(consumer, localMessageCache);
messagingAccessPoint.startup();
consumer.startup();
}
@Test
public void testPoll() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.poll();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout.
Message message = consumer.poll();
assertThat(message).isNull();
message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
assertThat(message).isNull();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest {
private PushConsumer consumer;
@Mock
private DefaultMQPushConsumer rocketmqPushConsumer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
field.set(consumer, rocketmqPushConsumer); //Replace
when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
messagingAccessPoint.startup();
consumer.startup();
}
@Test
public void testConsumeMessage() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
context.ack();
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.exception.OMSRuntimeException;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ProducerImplTest {
private Producer producer;
@Mock
private DefaultMQProducer rocketmqProducer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
}
@Test
public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.SendResult omsResult =
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
@Test
public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
}
}
@Test
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.SequenceProducer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SequenceProducerImplTest {
private SequenceProducer producer;
@Mock
private DefaultMQProducer rocketmqProducer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createSequenceProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
}
@Test
public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.commit();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
}
@Test
public void testRollback() {
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.rollback();
producer.commit(); //Commit nothing.
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
public class DefaultPromiseTest {
private Promise<String> promise;
@Before
public void init() {
promise = new DefaultPromise<>();
}
@Test
public void testIsCancelled() throws Exception {
assertThat(promise.isCancelled()).isEqualTo(false);
}
@Test
public void testIsDone() throws Exception {
assertThat(promise.isDone()).isEqualTo(false);
promise.set("Done");
assertThat(promise.isDone()).isEqualTo(true);
}
@Test
public void testGet() throws Exception {
promise.set("Done");
assertThat(promise.get()).isEqualTo("Done");
}
@Test
public void testGet_WithTimeout() throws Exception {
try {
promise.get(100);
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (OMSRuntimeException e) {
assertThat(e).hasMessageContaining("Get request result is timeout or interrupted");
}
}
@Test
public void testAddListener() throws Exception {
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
assertThat(promise.get()).isEqualTo("Done");
}
@Override
public void operationFailed(final Promise<String> promise) {
}
});
promise.set("Done");
}
@Test
public void testAddListener_ListenerAfterSet() throws Exception {
promise.set("Done");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
assertThat(promise.get()).isEqualTo("Done");
}
@Override
public void operationFailed(final Promise<String> promise) {
}
});
}
@Test
public void testAddListener_WithException_ListenerAfterSet() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.setFailure(exception);
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
@Override
public void operationFailed(final Promise<String> promise) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
}
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
@Override
public void operationFailed(final Promise<String> promise) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
promise.setFailure(exception);
}
@Test
public void getThrowable() throws Exception {
assertThat(promise.getThrowable()).isNull();
Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.setFailure(exception);
assertThat(promise.getThrowable()).isEqualTo(exception);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.utils;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BeanUtilsTest {
private KeyValue properties = OMS.newKeyValue();
public static class CustomizedConfig extends ClientConfig {
final static String STRING_TEST = "string.test";
String stringTest = "foobar";
final static String DOUBLE_TEST = "double.test";
double doubleTest = 123.0;
final static String LONG_TEST = "long.test";
long longTest = 123L;
String getStringTest() {
return stringTest;
}
public void setStringTest(String stringTest) {
this.stringTest = stringTest;
}
double getDoubleTest() {
return doubleTest;
}
public void setDoubleTest(final double doubleTest) {
this.doubleTest = doubleTest;
}
long getLongTest() {
return longTest;
}
public void setLongTest(final long longTest) {
this.longTest = longTest;
}
CustomizedConfig() {
}
}
@Before
public void init() {
properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120);
properties.put(CustomizedConfig.STRING_TEST, "kaka");
properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group");
properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101);
properties.put(CustomizedConfig.LONG_TEST, 1234567890L);
properties.put(CustomizedConfig.DOUBLE_TEST, 10.234);
}
@Test
public void testPopulate() {
CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class);
//RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
Assert.assertEquals(config.getStringTest(), "kaka");
Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
Assert.assertEquals(config.getLongTest(), 1234567890L);
Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
}
@Test
public void testPopulate_ExistObj() {
CustomizedConfig config = new CustomizedConfig();
config.setOmsConsumerId("NewConsumerId");
Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
config = BeanUtils.populate(properties, config);
//RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
Assert.assertEquals(config.getStringTest(), "kaka");
Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
Assert.assertEquals(config.getLongTest(), 1234567890L);
Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册