diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index 8b2ddd26c9d5b0af46a4801ef2754fc00c149234..f644e7d487d3be24f4f752fa461d26d095ec42e6 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -62,7 +62,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { log.error(String.format("Send message to RocketMQ failed, %s", message)); - throw new OMSRuntimeException("-1", "Send message to RocketMQ failed."); + throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); } message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); return OMSUtil.sendResultConvert(rmqResult); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java index 58b1a122a2711f858384b1eadb4ec83f7f146a5d..f03826e858e3c804ab57b09fed70a10ace82a2ac 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -72,6 +72,10 @@ public class SequenceProducerImpl extends AbstractOMSProducer implements Sequenc rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); } + if (rmqMessages.size() == 0) { + return; + } + try { SendResult sendResult = this.rocketmqProducer.send(rmqMessages); String [] msgIdArray = sendResult.getMsgId().split(","); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index 43f96ce85f6b47df7b3a500fa1c4503e7f456d3c..3e4bd266ce117fb52dd78b3cf0caa9f624e29972 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -32,7 +32,7 @@ public class DefaultPromise implements Promise { private long timeout; private long createTime; private Throwable exception = null; - private List promiseListenerList; + private List> promiseListenerList; public DefaultPromise() { createTime = System.currentTimeMillis(); @@ -120,7 +120,7 @@ public class DefaultPromise implements Promise { } @Override - public void addListener(final PromiseListener listener) { + public void addListener(final PromiseListener listener) { if (listener == null) { throw new NullPointerException("FutureListener is null"); } @@ -149,7 +149,7 @@ public class DefaultPromise implements Promise { private void notifyListeners() { if (promiseListenerList != null) { - for (PromiseListener listener : promiseListenerList) { + for (PromiseListener listener : promiseListenerList) { notifyListener(listener); } } @@ -165,7 +165,7 @@ public class DefaultPromise implements Promise { return; } state = FutureState.CANCELLED; - exception = new RuntimeException("get request result is timeout or interrupted"); + exception = new RuntimeException("Get request result is timeout or interrupted"); lock.notifyAll(); } notifyListeners(); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java new file mode 100644 index 0000000000000000000000000000000000000000..ae4d3ed537bde5c0bd02660046647a88ecf93498 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class LocalMessageCacheTest { + private LocalMessageCache localMessageCache; + @Mock + private DefaultMQPullConsumer rocketmqPullConsume; + @Mock + private ConsumeRequest consumeRequest; + + @Before + public void init() { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setRmqPullMessageBatchNums(512); + clientConfig.setRmqPullMessageCacheCapacity(1024); + localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig); + } + + @Test + public void testNextPullBatchNums() throws Exception { + assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512); + for (int i = 0; i < 513; i++) { + localMessageCache.submitConsumeRequest(consumeRequest); + } + assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511); + } + + @Test + public void testNextPullOffset() throws Exception { + MessageQueue messageQueue = new MessageQueue(); + when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())) + .thenReturn(123L); + assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L); + } + + @Test + public void testUpdatePullOffset() throws Exception { + MessageQueue messageQueue = new MessageQueue(); + localMessageCache.updatePullOffset(messageQueue, 124L); + assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L); + } + + @Test + public void testSubmitConsumeRequest() throws Exception { + byte [] body = new byte[]{'1', '2', '3'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(body); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + + when(consumeRequest.getMessageExt()).thenReturn(consumedMsg); + localMessageCache.submitConsumeRequest(consumeRequest); + assertThat(localMessageCache.poll()).isEqualTo(consumedMsg); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..277a5c65f27072162787c1096e4356eff08558d5 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PullConsumer; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PullConsumerImplTest { + private PullConsumer consumer; + private String queueName = "HELLO_QUEUE"; + + @Mock + private DefaultMQPullConsumer rocketmqPullConsumer; + private LocalMessageCache localMessageCache = + spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig())); + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + consumer = messagingAccessPoint.createPullConsumer(queueName, + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + + Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); + field.setAccessible(true); + field.set(consumer, rocketmqPullConsumer); //Replace + + field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); + field.setAccessible(true); + field.set(consumer, localMessageCache); + + messagingAccessPoint.startup(); + consumer.startup(); + } + + @Test + public void testPoll() { + final byte[] testBody = new byte[] {'a', 'b'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic(queueName); + + when(localMessageCache.poll()).thenReturn(consumedMsg); + + Message message = consumer.poll(); + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + } + + @Test + public void testPoll_WithTimeout() { + //There is a default timeout value, @see ClientConfig#omsOperationTimeout. + Message message = consumer.poll(); + assertThat(message).isNull(); + + message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100)); + assertThat(message).isNull(); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..882e57ea5ee5807259a00283105793cbe58410b5 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessageListener; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import java.util.Collections; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PushConsumerImplTest { + private PushConsumer consumer; + + @Mock + private DefaultMQPushConsumer rocketmqPushConsumer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + consumer = messagingAccessPoint.createPushConsumer( + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + + Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); + field.setAccessible(true); + DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer); + field.set(consumer, rocketmqPushConsumer); //Replace + + when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); + messagingAccessPoint.startup(); + consumer.startup(); + } + + @Test + public void testConsumeMessage() { + final byte[] testBody = new byte[] {'a', 'b'}; + + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + consumer.attachQueue("HELLO_QUEUE", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + context.ack(); + } + }); + ((MessageListenerConcurrently) rocketmqPushConsumer + .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..1db80c3efac41b32a5f0b09407012e864d104b0e --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.Producer; +import io.openmessaging.exception.OMSRuntimeException; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ProducerImplTest { + private Producer producer; + + @Mock + private DefaultMQProducer rocketmqProducer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + producer = messagingAccessPoint.createProducer(); + + Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); + field.setAccessible(true); + field.set(producer, rocketmqProducer); + + messagingAccessPoint.startup(); + producer.startup(); + } + + @Test + public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("TestMsgID"); + sendResult.setSendStatus(SendStatus.SEND_OK); + when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); + io.openmessaging.SendResult omsResult = + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + + assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); + } + + @Test + public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT); + + when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); + try { + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (Exception e) { + assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); + } + } + + @Test + public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); + try { + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (Exception e) { + assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); + } + } + +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..823fe015cca59fc67dcac3fa9348a0f98ba2fb28 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.SequenceProducer; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SequenceProducerImplTest { + + private SequenceProducer producer; + + @Mock + private DefaultMQProducer rocketmqProducer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + producer = messagingAccessPoint.createSequenceProducer(); + + Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); + field.setAccessible(true); + field.set(producer, rocketmqProducer); + + messagingAccessPoint.startup(); + producer.startup(); + } + + @Test + public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("TestMsgID"); + sendResult.setSendStatus(SendStatus.SEND_OK); + when(rocketmqProducer.send(ArgumentMatchers.anyList())).thenReturn(sendResult); + when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); + final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); + producer.send(message); + producer.commit(); + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID"); + } + + @Test + public void testRollback() { + when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); + final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); + producer.send(message); + producer.rollback(); + producer.commit(); //Commit nothing. + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java new file mode 100644 index 0000000000000000000000000000000000000000..2240ff2dd1138d487db448a515e488c3e8f557ec --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.promise; + +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.exception.OMSRuntimeException; +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; + +public class DefaultPromiseTest { + private Promise promise; + + @Before + public void init() { + promise = new DefaultPromise<>(); + } + + @Test + public void testIsCancelled() throws Exception { + assertThat(promise.isCancelled()).isEqualTo(false); + } + + @Test + public void testIsDone() throws Exception { + assertThat(promise.isDone()).isEqualTo(false); + promise.set("Done"); + assertThat(promise.isDone()).isEqualTo(true); + } + + @Test + public void testGet() throws Exception { + promise.set("Done"); + assertThat(promise.get()).isEqualTo("Done"); + } + + @Test + public void testGet_WithTimeout() throws Exception { + try { + promise.get(100); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (OMSRuntimeException e) { + assertThat(e).hasMessageContaining("Get request result is timeout or interrupted"); + } + } + + @Test + public void testAddListener() throws Exception { + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + assertThat(promise.get()).isEqualTo("Done"); + } + + @Override + public void operationFailed(final Promise promise) { + + } + }); + promise.set("Done"); + } + + @Test + public void testAddListener_ListenerAfterSet() throws Exception { + promise.set("Done"); + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + assertThat(promise.get()).isEqualTo("Done"); + } + + @Override + public void operationFailed(final Promise promise) { + + } + }); + } + + @Test + public void testAddListener_WithException_ListenerAfterSet() throws Exception { + final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.setFailure(exception); + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + } + + @Override + public void operationFailed(final Promise promise) { + assertThat(promise.getThrowable()).isEqualTo(exception); + } + }); + } + + @Test + public void testAddListener_WithException() throws Exception { + final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + } + + @Override + public void operationFailed(final Promise promise) { + assertThat(promise.getThrowable()).isEqualTo(exception); + } + }); + promise.setFailure(exception); + } + + @Test + public void getThrowable() throws Exception { + assertThat(promise.getThrowable()).isNull(); + Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.setFailure(exception); + assertThat(promise.getThrowable()).isEqualTo(exception); + } + +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java new file mode 100644 index 0000000000000000000000000000000000000000..71ca11ccfdd38d02d3425b14618e85cb5cf1ab10 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import io.openmessaging.OMS; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BeanUtilsTest { + private KeyValue properties = OMS.newKeyValue(); + + public static class CustomizedConfig extends ClientConfig { + final static String STRING_TEST = "string.test"; + String stringTest = "foobar"; + + final static String DOUBLE_TEST = "double.test"; + double doubleTest = 123.0; + + final static String LONG_TEST = "long.test"; + long longTest = 123L; + + String getStringTest() { + return stringTest; + } + + public void setStringTest(String stringTest) { + this.stringTest = stringTest; + } + + double getDoubleTest() { + return doubleTest; + } + + public void setDoubleTest(final double doubleTest) { + this.doubleTest = doubleTest; + } + + long getLongTest() { + return longTest; + } + + public void setLongTest(final long longTest) { + this.longTest = longTest; + } + + CustomizedConfig() { + } + } + + @Before + public void init() { + properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120); + properties.put(CustomizedConfig.STRING_TEST, "kaka"); + properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group"); + properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101); + + properties.put(CustomizedConfig.LONG_TEST, 1234567890L); + properties.put(CustomizedConfig.DOUBLE_TEST, 10.234); + } + + @Test + public void testPopulate() { + CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class); + + //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120); + Assert.assertEquals(config.getStringTest(), "kaka"); + Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group"); + Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101); + Assert.assertEquals(config.getLongTest(), 1234567890L); + Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001); + } + + @Test + public void testPopulate_ExistObj() { + CustomizedConfig config = new CustomizedConfig(); + config.setOmsConsumerId("NewConsumerId"); + + Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); + + config = BeanUtils.populate(properties, config); + + //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120); + Assert.assertEquals(config.getStringTest(), "kaka"); + Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group"); + Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101); + Assert.assertEquals(config.getLongTest(), 1234567890L); + Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001); + } + +} \ No newline at end of file