From 1d966b50c2ec189ca4f1bf81420959a33159a8ad Mon Sep 17 00:00:00 2001 From: yukon Date: Wed, 24 May 2017 16:50:51 +0800 Subject: [PATCH] [ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version --- distribution/release-client.xml | 1 + distribution/release.xml | 1 + example/pom.xml | 9 + .../example/openmessaging/SimpleProducer.java | 76 ++++++ .../openmessaging/SimplePullConsumer.java | 58 +++++ .../openmessaging/SimplePushConsumer.java | 59 +++++ openmessaging/pom.xml | 42 ++++ .../rocketmq/MessagingAccessPointImpl.java | 132 ++++++++++ .../rocketmq/config/ClientConfig.java | 194 +++++++++++++++ .../rocketmq/consumer/LocalMessageCache.java | 213 ++++++++++++++++ .../rocketmq/consumer/PullConsumerImpl.java | 166 +++++++++++++ .../rocketmq/consumer/PushConsumerImpl.java | 181 ++++++++++++++ .../rocketmq/domain/BytesMessageImpl.java | 108 +++++++++ .../rocketmq/domain/ConsumeRequest.java | 55 +++++ .../rocketmq/domain/NonStandardKeys.java | 30 +++ .../rocketmq/domain/SendResultImpl.java | 40 +++ .../producer/AbstractOMSProducer.java | 138 +++++++++++ .../rocketmq/producer/ProducerImpl.java | 124 ++++++++++ .../producer/SequenceProducerImpl.java | 95 ++++++++ .../rocketmq/promise/DefaultPromise.java | 227 ++++++++++++++++++ .../rocketmq/promise/FutureState.java | 51 ++++ .../rocketmq/utils/BeanUtils.java | 185 ++++++++++++++ .../openmessaging/rocketmq/utils/OMSUtil.java | 182 ++++++++++++++ .../consumer/LocalMessageCacheTest.java | 89 +++++++ .../consumer/PullConsumerImplTest.java | 96 ++++++++ .../consumer/PushConsumerImplTest.java | 87 +++++++ .../rocketmq/producer/ProducerImplTest.java | 101 ++++++++ .../producer/SequenceProducerImplTest.java | 86 +++++++ .../rocketmq/promise/DefaultPromiseTest.java | 136 +++++++++++ .../rocketmq/utils/BeanUtilsTest.java | 110 +++++++++ pom.xml | 6 + 31 files changed, 3078 insertions(+) create mode 100644 example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java create mode 100644 example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java create mode 100644 example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java create mode 100644 openmessaging/pom.xml create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java create mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java diff --git a/distribution/release-client.xml b/distribution/release-client.xml index 46563eb4..84d33a01 100644 --- a/distribution/release-client.xml +++ b/distribution/release-client.xml @@ -47,6 +47,7 @@ true org.apache.rocketmq:rocketmq-client + org.apache.rocketmq:rocketmq-openmessaging ./ diff --git a/distribution/release.xml b/distribution/release.xml index 9e4ef2a0..c67d23e1 100644 --- a/distribution/release.xml +++ b/distribution/release.xml @@ -68,6 +68,7 @@ org.apache.rocketmq:rocketmq-filtersrv org.apache.rocketmq:rocketmq-example org.apache.rocketmq:rocketmq-filter + org.apache.rocketmq:rocketmq-openmessaging lib/ diff --git a/example/pom.xml b/example/pom.xml index 785a4ca8..840fa36f 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -48,5 +48,14 @@ org.javassist javassist + + io.openmessaging + openmessaging-api + + + org.apache.rocketmq + rocketmq-openmessaging + 4.1.0-incubating-SNAPSHOT + diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java new file mode 100644 index 00000000..9d162ac1 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.Producer; +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.SendResult; +import java.nio.charset.Charset; + +public class SimpleProducer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final Producer producer = messagingAccessPoint.createProducer(); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + producer.startup(); + System.out.printf("Producer startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + producer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + { + Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + SendResult sendResult = producer.send(message); + //final Void aVoid = result.get(3000L); + System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); + } + + { + final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new PromiseListener() { + @Override + public void operationCompleted(Promise promise) { + System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); + } + + @Override + public void operationFailed(Promise promise) { + System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + } + }); + } + + { + producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + System.out.printf("Send oneway message OK%n"); + } + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java new file mode 100644 index 00000000..8e067724 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PullConsumer; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class SimplePullConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + + while (true) { + Message message = consumer.poll(); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.printf("Received one message: %s%n", msgId); + consumer.ack(msgId); + } + } + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java new file mode 100644 index 00000000..b0935d4c --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessageListener; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class SimplePushConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + + final PushConsumer consumer = messagingAccessPoint. + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.printf("MessagingAccessPoint startup OK%n"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + context.ack(); + } + }); + + consumer.startup(); + System.out.printf("Consumer startup OK%n"); + } +} diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml new file mode 100644 index 00000000..e853642d --- /dev/null +++ b/openmessaging/pom.xml @@ -0,0 +1,42 @@ + + + + + + rocketmq-all + org.apache.rocketmq + 4.1.0-incubating-SNAPSHOT + + 4.0.0 + + rocketmq-openmessaging + rocketmq-openmessaging ${project.version} + + + + io.openmessaging + openmessaging-api + + + org.apache.rocketmq + rocketmq-client + + + \ No newline at end of file diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java new file mode 100644 index 00000000..65caf840 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq; + +import io.openmessaging.IterableConsumer; +import io.openmessaging.KeyValue; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.Producer; +import io.openmessaging.PullConsumer; +import io.openmessaging.PushConsumer; +import io.openmessaging.ResourceManager; +import io.openmessaging.SequenceProducer; +import io.openmessaging.ServiceEndPoint; +import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.observer.Observer; +import io.openmessaging.rocketmq.consumer.PullConsumerImpl; +import io.openmessaging.rocketmq.consumer.PushConsumerImpl; +import io.openmessaging.rocketmq.producer.ProducerImpl; +import io.openmessaging.rocketmq.producer.SequenceProducerImpl; +import io.openmessaging.rocketmq.utils.OMSUtil; + +public class MessagingAccessPointImpl implements MessagingAccessPoint { + private final KeyValue accessPointProperties; + + public MessagingAccessPointImpl(final KeyValue accessPointProperties) { + this.accessPointProperties = accessPointProperties; + } + + @Override + public KeyValue properties() { + return accessPointProperties; + } + + @Override + public Producer createProducer() { + return new ProducerImpl(this.accessPointProperties); + } + + @Override + public Producer createProducer(KeyValue properties) { + return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public SequenceProducer createSequenceProducer() { + return new SequenceProducerImpl(this.accessPointProperties); + } + + @Override + public SequenceProducer createSequenceProducer(KeyValue properties) { + return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public PushConsumer createPushConsumer() { + return new PushConsumerImpl(accessPointProperties); + } + + @Override + public PushConsumer createPushConsumer(KeyValue properties) { + return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public PullConsumer createPullConsumer(String queueName) { + return new PullConsumerImpl(queueName, accessPointProperties); + } + + @Override + public PullConsumer createPullConsumer(String queueName, KeyValue properties) { + return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + } + + @Override + public IterableConsumer createIterableConsumer(String queueName) { + throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + } + + @Override + public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) { + throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + } + + @Override + public ResourceManager getResourceManager() { + throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version."); + } + + @Override + public ServiceEndPoint createServiceEndPoint() { + throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); + } + + @Override + public ServiceEndPoint createServiceEndPoint(KeyValue properties) { + throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); + } + + @Override + public void addObserver(Observer observer) { + //Ignore + } + + @Override + public void deleteObserver(Observer observer) { + //Ignore + } + + @Override + public void startup() { + //Ignore + } + + @Override + public void shutdown() { + //Ignore + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java new file mode 100644 index 00000000..7077c6dc --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.config; + +import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class ClientConfig implements PropertyKeys, NonStandardKeys { + private String omsDriverImpl; + private String omsAccessPoints; + private String omsNamespace; + private String omsProducerId; + private String omsConsumerId; + private int omsOperationTimeout = 5000; + private String omsRoutingName; + private String omsOperatorName; + private String omsDstQueue; + private String omsSrcTopic; + private String rmqConsumerGroup; + private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; + private int rmqMaxRedeliveryTimes = 16; + private int rmqMessageConsumeTimeout = 15; //In minutes + private int rmqMaxConsumeThreadNums = 64; + private int rmqMinConsumeThreadNums = 20; + private String rmqMessageDestination; + private int rmqPullMessageBatchNums = 32; + private int rmqPullMessageCacheCapacity = 1000; + + public String getOmsDriverImpl() { + return omsDriverImpl; + } + + public void setOmsDriverImpl(final String omsDriverImpl) { + this.omsDriverImpl = omsDriverImpl; + } + + public String getOmsAccessPoints() { + return omsAccessPoints; + } + + public void setOmsAccessPoints(final String omsAccessPoints) { + this.omsAccessPoints = omsAccessPoints; + } + + public String getOmsNamespace() { + return omsNamespace; + } + + public void setOmsNamespace(final String omsNamespace) { + this.omsNamespace = omsNamespace; + } + + public String getOmsProducerId() { + return omsProducerId; + } + + public void setOmsProducerId(final String omsProducerId) { + this.omsProducerId = omsProducerId; + } + + public String getOmsConsumerId() { + return omsConsumerId; + } + + public void setOmsConsumerId(final String omsConsumerId) { + this.omsConsumerId = omsConsumerId; + } + + public int getOmsOperationTimeout() { + return omsOperationTimeout; + } + + public void setOmsOperationTimeout(final int omsOperationTimeout) { + this.omsOperationTimeout = omsOperationTimeout; + } + + public String getOmsRoutingName() { + return omsRoutingName; + } + + public void setOmsRoutingName(final String omsRoutingName) { + this.omsRoutingName = omsRoutingName; + } + + public String getOmsOperatorName() { + return omsOperatorName; + } + + public void setOmsOperatorName(final String omsOperatorName) { + this.omsOperatorName = omsOperatorName; + } + + public String getOmsDstQueue() { + return omsDstQueue; + } + + public void setOmsDstQueue(final String omsDstQueue) { + this.omsDstQueue = omsDstQueue; + } + + public String getOmsSrcTopic() { + return omsSrcTopic; + } + + public void setOmsSrcTopic(final String omsSrcTopic) { + this.omsSrcTopic = omsSrcTopic; + } + + public String getRmqConsumerGroup() { + return rmqConsumerGroup; + } + + public void setRmqConsumerGroup(final String rmqConsumerGroup) { + this.rmqConsumerGroup = rmqConsumerGroup; + } + + public String getRmqProducerGroup() { + return rmqProducerGroup; + } + + public void setRmqProducerGroup(final String rmqProducerGroup) { + this.rmqProducerGroup = rmqProducerGroup; + } + + public int getRmqMaxRedeliveryTimes() { + return rmqMaxRedeliveryTimes; + } + + public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) { + this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes; + } + + public int getRmqMessageConsumeTimeout() { + return rmqMessageConsumeTimeout; + } + + public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) { + this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout; + } + + public int getRmqMaxConsumeThreadNums() { + return rmqMaxConsumeThreadNums; + } + + public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) { + this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums; + } + + public int getRmqMinConsumeThreadNums() { + return rmqMinConsumeThreadNums; + } + + public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) { + this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums; + } + + public String getRmqMessageDestination() { + return rmqMessageDestination; + } + + public void setRmqMessageDestination(final String rmqMessageDestination) { + this.rmqMessageDestination = rmqMessageDestination; + } + + public int getRmqPullMessageBatchNums() { + return rmqPullMessageBatchNums; + } + + public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) { + this.rmqPullMessageBatchNums = rmqPullMessageBatchNums; + } + + public int getRmqPullMessageCacheCapacity() { + return rmqPullMessageCacheCapacity; + } + + public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { + this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java new file mode 100644 index 00000000..90f9e03e --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.utils.ThreadUtils; +import org.slf4j.Logger; + +class LocalMessageCache implements ServiceLifecycle { + private final BlockingQueue consumeRequestCache; + private final Map consumedRequest; + private final ConcurrentHashMap pullOffsetTable; + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final ClientConfig clientConfig; + private final ScheduledExecutorService cleanExpireMsgExecutors; + + private final static Logger log = ClientLogger.getLog(); + + LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { + consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity()); + this.consumedRequest = new ConcurrentHashMap<>(); + this.pullOffsetTable = new ConcurrentHashMap<>(); + this.rocketmqPullConsumer = rocketmqPullConsumer; + this.clientConfig = clientConfig; + this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( + "OMS_CleanExpireMsgScheduledThread_")); + } + + int nextPullBatchNums() { + return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity()); + } + + long nextPullOffset(MessageQueue remoteQueue) { + if (!pullOffsetTable.containsKey(remoteQueue)) { + try { + pullOffsetTable.putIfAbsent(remoteQueue, + rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false)); + } catch (MQClientException e) { + log.error("A error occurred in fetch consume offset process.", e); + } + } + return pullOffsetTable.get(remoteQueue); + } + + void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { + pullOffsetTable.put(remoteQueue, nextPullOffset); + } + + void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + } catch (InterruptedException ignore) { + } + } + + MessageExt poll() { + return poll(clientConfig.getOmsOperationTimeout()); + } + + MessageExt poll(final KeyValue properties) { + int currentPollTimeout = clientConfig.getOmsOperationTimeout(); + if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { + currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + } + return poll(currentPollTimeout); + } + + private MessageExt poll(long timeout) { + try { + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + if (consumeRequest != null) { + MessageExt messageExt = consumeRequest.getMessageExt(); + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); + consumedRequest.put(messageExt.getMsgId(), consumeRequest); + return messageExt; + } + } catch (InterruptedException ignore) { + } + return null; + } + + void ack(final String messageId) { + ConsumeRequest consumeRequest = consumedRequest.remove(messageId); + if (consumeRequest != null) { + long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt())); + try { + rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + } + + void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) { + consumedRequest.remove(messageExt.getMsgId()); + long offset = processQueue.removeMessage(Collections.singletonList(messageExt)); + try { + rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + + @Override + public void startup() { + this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + cleanExpireMsg(); + } + }, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES); + } + + @Override + public void shutdown() { + ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS); + } + + private void cleanExpireMsg() { + for (final Map.Entry next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl() + .getRebalanceImpl().getProcessQueueTable().entrySet()) { + ProcessQueue pq = next.getValue(); + MessageQueue mq = next.getKey(); + ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); + if (lockTreeMap == null) { + log.error("Gets tree map lock in process queue error, may be has compatibility issue"); + return; + } + + TreeMap msgTreeMap = pq.getMsgTreeMap(); + + int loop = msgTreeMap.size(); + for (int i = 0; i < loop; i++) { + MessageExt msg = null; + try { + lockTreeMap.readLock().lockInterruptibly(); + try { + if (!msgTreeMap.isEmpty()) { + msg = msgTreeMap.firstEntry().getValue(); + if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) + > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { + //Expired, ack and remove it. + } else { + break; + } + } else { + break; + } + } finally { + lockTreeMap.readLock().unlock(); + } + } catch (InterruptedException e) { + log.error("Gets expired message exception", e); + } + + try { + rocketmqPullConsumer.sendMessageBack(msg, 3); + log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", + msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + ack(mq, pq, msg); + } catch (Exception e) { + log.error("Send back expired msg exception", e); + } + } + } + } + + private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) { + try { + return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true); + } catch (IllegalAccessException e) { + return null; + } + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java new file mode 100644 index 00000000..8d396d43 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PullConsumer; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +public class PullConsumerImpl implements PullConsumer { + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final KeyValue properties; + private boolean started = false; + private String targetQueueName; + private final MQPullConsumerScheduleService pullConsumerScheduleService; + private final LocalMessageCache localMessageCache; + private final ClientConfig clientConfig; + + final static Logger log = ClientLogger.getLog(); + + public PullConsumerImpl(final String queueName, final KeyValue properties) { + this.properties = properties; + this.targetQueueName = queueName; + + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String consumerGroup = clientConfig.getRmqConsumerGroup(); + if (null == consumerGroup || consumerGroup.isEmpty()) { + throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); + } + pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup); + + this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); + + String accessPoints = clientConfig.getOmsAccessPoints(); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); + + this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); + + int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes(); + this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); + + String consumerId = OMSUtil.buildInstanceName(); + this.rocketmqPullConsumer.setInstanceName(consumerId); + properties.put(PropertyKeys.CONSUMER_ID, consumerId); + + this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public Message poll() { + MessageExt rmqMsg = localMessageCache.poll(); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); + } + + @Override + public Message poll(final KeyValue properties) { + MessageExt rmqMsg = localMessageCache.poll(properties); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); + } + + @Override + public void ack(final String messageId) { + localMessageCache.ack(messageId); + } + + @Override + public void ack(final String messageId, final KeyValue properties) { + localMessageCache.ack(messageId); + } + + @Override + public synchronized void startup() { + if (!started) { + try { + registerPullTaskCallback(); + this.pullConsumerScheduleService.start(); + this.localMessageCache.startup(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + private void registerPullTaskCallback() { + this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { + @Override + public void doPullTask(final MessageQueue mq, final PullTaskContext context) { + MQPullConsumer consumer = context.getPullConsumer(); + try { + long offset = localMessageCache.nextPullOffset(mq); + + PullResult pullResult = consumer.pull(mq, "*", + offset, localMessageCache.nextPullBatchNums()); + ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().get(mq); + switch (pullResult.getPullStatus()) { + case FOUND: + if (pq != null) { + pq.putMessage(pullResult.getMsgFoundList()); + for (final MessageExt messageExt : pullResult.getMsgFoundList()) { + localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); + } + } + break; + default: + break; + } + localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset()); + } catch (Exception e) { + log.error("A error occurred in pull message process.", e); + } + } + }); + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.localMessageCache.shutdown(); + this.pullConsumerScheduleService.shutdown(); + this.rocketmqPullConsumer.shutdown(); + } + this.started = false; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java new file mode 100644 index 00000000..f9b8058e --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageListener; +import io.openmessaging.OMS; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +public class PushConsumerImpl implements PushConsumer { + private final DefaultMQPushConsumer rocketmqPushConsumer; + private final KeyValue properties; + private boolean started = false; + private final Map subscribeTable = new ConcurrentHashMap<>(); + private final ClientConfig clientConfig; + + public PushConsumerImpl(final KeyValue properties) { + this.rocketmqPushConsumer = new DefaultMQPushConsumer(); + this.properties = properties; + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String accessPoints = clientConfig.getOmsAccessPoints(); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); + + String consumerGroup = clientConfig.getRmqConsumerGroup(); + if (null == consumerGroup || consumerGroup.isEmpty()) { + throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); + } + this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); + this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes()); + this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout()); + this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums()); + this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums()); + + String consumerId = OMSUtil.buildInstanceName(); + this.rocketmqPushConsumer.setInstanceName(consumerId); + properties.put(PropertyKeys.CONSUMER_ID, consumerId); + + this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public void resume() { + this.rocketmqPushConsumer.resume(); + } + + @Override + public void suspend() { + this.rocketmqPushConsumer.suspend(); + } + + @Override + public boolean isSuspended() { + return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); + } + + @Override + public PushConsumer attachQueue(final String queueName, final MessageListener listener) { + this.subscribeTable.put(queueName, listener); + try { + this.rocketmqPushConsumer.subscribe(queueName, "*"); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName)); + } + return this; + } + + @Override + public synchronized void startup() { + if (!started) { + try { + this.rocketmqPushConsumer.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.rocketmqPushConsumer.shutdown(); + } + this.started = false; + } + + class MessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, + ConsumeConcurrentlyContext contextRMQ) { + MessageExt rmqMsg = rmqMsgList.get(0); + BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); + + MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic()); + + if (listener == null) { + throw new OMSRuntimeException("-1", + String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); + } + + final KeyValue contextProperties = OMS.newKeyValue(); + final CountDownLatch sync = new CountDownLatch(1); + + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); + + ReceivedMessageContext context = new ReceivedMessageContext() { + @Override + public KeyValue properties() { + return contextProperties; + } + + @Override + public void ack() { + sync.countDown(); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + } + + @Override + public void ack(final KeyValue properties) { + sync.countDown(); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); + } + }; + long begin = System.currentTimeMillis(); + listener.onMessage(omsMsg, context); + long costs = System.currentTimeMillis() - begin; + long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; + try { + sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS); + } catch (InterruptedException ignore) { + } + + return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); + } + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java new file mode 100644 index 00000000..43f80ae5 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.OMS; +import org.apache.commons.lang3.builder.ToStringBuilder; + +public class BytesMessageImpl implements BytesMessage { + private KeyValue headers; + private KeyValue properties; + private byte[] body; + + public BytesMessageImpl() { + this.headers = OMS.newKeyValue(); + this.properties = OMS.newKeyValue(); + } + + @Override + public byte[] getBody() { + return body; + } + + @Override + public BytesMessage setBody(final byte[] body) { + this.body = body; + return this; + } + + @Override + public KeyValue headers() { + return headers; + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public Message putHeaders(final String key, final int value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final long value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final double value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final String value) { + headers.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final int value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final long value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final double value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final String value) { + properties.put(key, value); + return this; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java new file mode 100644 index 00000000..7ce4a9b4 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public class ConsumeRequest { + private final MessageExt messageExt; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + private long startConsumeTimeMillis; + + public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExt = messageExt; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public MessageExt getMessageExt() { + return messageExt; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + public long getStartConsumeTimeMillis() { + return startConsumeTimeMillis; + } + + public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) { + this.startConsumeTimeMillis = startConsumeTimeMillis; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java new file mode 100644 index 00000000..3639a3f8 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +public interface NonStandardKeys { + String CONSUMER_GROUP = "rmq.consumer.group"; + String PRODUCER_GROUP = "rmq.producer.group"; + String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times"; + String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout"; + String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums"; + String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums"; + String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status"; + String MESSAGE_DESTINATION = "rmq.message.destination"; + String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; + String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java new file mode 100644 index 00000000..228a9f0b --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.KeyValue; +import io.openmessaging.SendResult; + +public class SendResultImpl implements SendResult { + private String messageId; + private KeyValue properties; + + public SendResultImpl(final String messageId, final KeyValue properties) { + this.messageId = messageId; + this.properties = properties; + } + + @Override + public String messageId() { + return messageId; + } + + @Override + public KeyValue properties() { + return properties; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java new file mode 100644 index 00000000..8246bcd2 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageFactory; +import io.openmessaging.MessageHeader; +import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.exception.OMSTimeOutException; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.utils.BeanUtils; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.slf4j.Logger; + +import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; + +abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { + final static Logger log = ClientLogger.getLog(); + final KeyValue properties; + final DefaultMQProducer rocketmqProducer; + private boolean started = false; + final ClientConfig clientConfig; + + AbstractOMSProducer(final KeyValue properties) { + this.properties = properties; + this.rocketmqProducer = new DefaultMQProducer(); + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String accessPoints = clientConfig.getOmsAccessPoints(); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); + this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); + + String producerId = buildInstanceName(); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); + this.rocketmqProducer.setInstanceName(producerId); + this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); + properties.put(PropertyKeys.PRODUCER_ID, producerId); + } + + @Override + public synchronized void startup() { + if (!started) { + try { + this.rocketmqProducer.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.rocketmqProducer.shutdown(); + } + this.started = false; + } + + OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) { + if (e instanceof MQClientException) { + if (e.getCause() != null) { + if (e.getCause() instanceof RemotingTimeoutException) { + return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", + this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); + } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { + MQBrokerException brokerException = (MQBrokerException) e.getCause(); + return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", + topic, msgId, brokerException.getErrorMessage()), e); + } + } + // Exception thrown by local. + else { + MQClientException clientException = (MQClientException) e; + if (-1 == clientException.getResponseCode()) { + return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s", + topic, msgId), e); + } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) { + return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s", + topic, msgId), e); + } + } + } + return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e); + } + + protected void checkMessageType(Message message) { + if (!(message instanceof BytesMessage)) { + throw new OMSNotSupportedException("-1", "Only BytesMessage is supported."); + } + } + + @Override + public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) { + BytesMessage bytesMessage = new BytesMessageImpl(); + bytesMessage.setBody(body); + bytesMessage.headers().put(MessageHeader.TOPIC, topic); + return bytesMessage; + } + + @Override + public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) { + BytesMessage bytesMessage = new BytesMessageImpl(); + bytesMessage.setBody(body); + bytesMessage.headers().put(MessageHeader.QUEUE, queue); + return bytesMessage; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java new file mode 100644 index 00000000..2c00c60e --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.Producer; +import io.openmessaging.Promise; +import io.openmessaging.PropertyKeys; +import io.openmessaging.SendResult; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.promise.DefaultPromise; +import io.openmessaging.rocketmq.utils.OMSUtil; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendStatus; + +import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert; + +public class ProducerImpl extends AbstractOMSProducer implements Producer { + + public ProducerImpl(final KeyValue properties) { + super(properties); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public SendResult send(final Message message) { + return send(message, this.rocketmqProducer.getSendMsgTimeout()); + } + + @Override + public SendResult send(final Message message, final KeyValue properties) { + long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) + ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + return send(message, timeout); + } + + private SendResult send(final Message message, long timeout) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + try { + org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); + if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { + log.error(String.format("Send message to RocketMQ failed, %s", message)); + throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); + } + message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + return OMSUtil.sendResultConvert(rmqResult); + } catch (Exception e) { + log.error(String.format("Send message to RocketMQ failed, %s", message), e); + throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + } + } + + @Override + public Promise sendAsync(final Message message) { + return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); + } + + @Override + public Promise sendAsync(final Message message, final KeyValue properties) { + long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) + ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + return sendAsync(message, timeout); + } + + private Promise sendAsync(final Message message, long timeout) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + final Promise promise = new DefaultPromise<>(); + try { + this.rocketmqProducer.send(rmqMessage, new SendCallback() { + @Override + public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { + message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + promise.set(OMSUtil.sendResultConvert(rmqResult)); + } + + @Override + public void onException(final Throwable e) { + promise.setFailure(e); + } + }, timeout); + } catch (Exception e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public void sendOneway(final Message message) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + try { + this.rocketmqProducer.sendOneway(rmqMessage); + } catch (Exception ignore) { //Ignore the oneway exception. + } + } + + @Override + public void sendOneway(final Message message, final KeyValue properties) { + sendOneway(message); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java new file mode 100644 index 00000000..05225cc5 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.SequenceProducer; +import io.openmessaging.rocketmq.utils.OMSUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; + +public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer { + + private BlockingQueue msgCacheQueue; + + public SequenceProducerImpl(final KeyValue properties) { + super(properties); + this.msgCacheQueue = new LinkedBlockingQueue<>(); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public void send(final Message message) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); + try { + Validators.checkMessage(rmqMessage, this.rocketmqProducer); + } catch (MQClientException e) { + throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + } + msgCacheQueue.add(message); + } + + @Override + public void send(final Message message, final KeyValue properties) { + send(message); + } + + @Override + public synchronized void commit() { + List messages = new ArrayList<>(); + msgCacheQueue.drainTo(messages); + + List rmqMessages = new ArrayList<>(); + + for (Message message : messages) { + rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); + } + + if (rmqMessages.size() == 0) { + return; + } + + try { + SendResult sendResult = this.rocketmqProducer.send(rmqMessages); + String[] msgIdArray = sendResult.getMsgId().split(","); + for (int i = 0; i < messages.size(); i++) { + Message message = messages.get(i); + message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); + } + } catch (Exception e) { + throw checkProducerException("", "", e); + } + } + + @Override + public synchronized void rollback() { + msgCacheQueue.clear(); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java new file mode 100644 index 00000000..c863ccf6 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.promise; + +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.exception.OMSRuntimeException; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultPromise implements Promise { + private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class); + private final Object lock = new Object(); + private volatile FutureState state = FutureState.DOING; + private V result = null; + private long timeout; + private long createTime; + private Throwable exception = null; + private List> promiseListenerList; + + public DefaultPromise() { + createTime = System.currentTimeMillis(); + promiseListenerList = new ArrayList<>(); + timeout = 5000; + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return state.isCancelledState(); + } + + @Override + public boolean isDone() { + return state.isDoneState(); + } + + @Override + public V get() { + return result; + } + + @Override + public V get(final long timeout) { + synchronized (lock) { + if (!isDoing()) { + return getValueOrThrowable(); + } + + if (timeout <= 0) { + try { + lock.wait(); + } catch (Exception e) { + cancel(e); + } + return getValueOrThrowable(); + } else { + long waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime > 0) { + for (;; ) { + try { + lock.wait(waitTime); + } catch (InterruptedException e) { + LOG.error("promise get value interrupted,excepiton:{}", e.getMessage()); + } + + if (!isDoing()) { + break; + } else { + waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime <= 0) { + break; + } + } + } + } + + if (isDoing()) { + timeoutSoCancel(); + } + } + return getValueOrThrowable(); + } + } + + @Override + public boolean set(final V value) { + if (value == null) + return false; + this.result = value; + return done(); + } + + @Override + public boolean setFailure(final Throwable cause) { + if (cause == null) + return false; + this.exception = cause; + return done(); + } + + @Override + public void addListener(final PromiseListener listener) { + if (listener == null) { + throw new NullPointerException("FutureListener is null"); + } + + boolean notifyNow = false; + synchronized (lock) { + if (!isDoing()) { + notifyNow = true; + } else { + if (promiseListenerList == null) { + promiseListenerList = new ArrayList<>(); + } + promiseListenerList.add(listener); + } + } + + if (notifyNow) { + notifyListener(listener); + } + } + + @Override + public Throwable getThrowable() { + return exception; + } + + private void notifyListeners() { + if (promiseListenerList != null) { + for (PromiseListener listener : promiseListenerList) { + notifyListener(listener); + } + } + } + + private boolean isSuccess() { + return isDone() && (exception == null); + } + + private void timeoutSoCancel() { + synchronized (lock) { + if (!isDoing()) { + return; + } + state = FutureState.CANCELLED; + exception = new RuntimeException("Get request result is timeout or interrupted"); + lock.notifyAll(); + } + notifyListeners(); + } + + private V getValueOrThrowable() { + if (exception != null) { + Throwable e = exception.getCause() != null ? exception.getCause() : exception; + throw new OMSRuntimeException("-1", e); + } + notifyListeners(); + return result; + } + + private boolean isDoing() { + return state.isDoingState(); + } + + private boolean done() { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.DONE; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } + + private void notifyListener(final PromiseListener listener) { + try { + if (exception != null) + listener.operationFailed(this); + else + listener.operationCompleted(this); + } catch (Throwable t) { + LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); + } + } + + private boolean cancel(Exception e) { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.CANCELLED; + exception = e; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } +} + diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java new file mode 100644 index 00000000..84b6c2d7 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.rocketmq.promise; + +public enum FutureState { + /** + * the task is doing + **/ + DOING(0), + /** + * the task is done + **/ + DONE(1), + /** + * ths task is cancelled + **/ + CANCELLED(2); + + public final int value; + + private FutureState(int value) { + this.value = value; + } + + public boolean isCancelledState() { + return this == CANCELLED; + } + + public boolean isDoneState() { + return this == DONE; + } + + public boolean isDoingState() { + return this == DOING; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java new file mode 100644 index 00000000..104d3d96 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.log.ClientLogger; +import org.slf4j.Logger; + +public final class BeanUtils { + final static Logger log = ClientLogger.getLog(); + + /** + * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. + */ + private static Map, Class> primitiveWrapperMap = new HashMap, Class>(); + + static { + primitiveWrapperMap.put(Boolean.TYPE, Boolean.class); + primitiveWrapperMap.put(Byte.TYPE, Byte.class); + primitiveWrapperMap.put(Character.TYPE, Character.class); + primitiveWrapperMap.put(Short.TYPE, Short.class); + primitiveWrapperMap.put(Integer.TYPE, Integer.class); + primitiveWrapperMap.put(Long.TYPE, Long.class); + primitiveWrapperMap.put(Double.TYPE, Double.class); + primitiveWrapperMap.put(Float.TYPE, Float.class); + primitiveWrapperMap.put(Void.TYPE, Void.TYPE); + } + + private static Map, Class> wrapperMap = new HashMap, Class>(); + + static { + for (final Class primitiveClass : primitiveWrapperMap.keySet()) { + final Class wrapperClass = primitiveWrapperMap.get(primitiveClass); + if (!primitiveClass.equals(wrapperClass)) { + wrapperMap.put(wrapperClass, primitiveClass); + } + } + wrapperMap.put(String.class, String.class); + } + + /** + *

Populate the JavaBeans properties of the specified bean, based on + * the specified name/value pairs. This method uses Java reflection APIs + * to identify corresponding "property setter" method names, and deals + * with setter arguments of type String, boolean, + * int, long, float, and + * double.

+ * + *

The particular setter method to be called for each property is + * determined using the usual JavaBeans introspection mechanisms. Thus, + * you may identify custom setter methods using a BeanInfo class that is + * associated with the class of the bean itself. If no such BeanInfo + * class is available, the standard method name conversion ("set" plus + * the capitalized name of the property in question) is used.

+ * + *

NOTE: It is contrary to the JavaBeans Specification + * to have more than one setter method (with different argument + * signatures) for the same property.

+ * + * @param clazz JavaBean class whose properties are being populated + * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set + * @param Class type + * @return Class instance + */ + public static T populate(final Properties properties, final Class clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static T populate(final KeyValue properties, final Class clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static Class getMethodClass(Class clazz, String methodName) { + Method[] methods = clazz.getMethods(); + for (Method method : methods) { + if (method.getName().equalsIgnoreCase(methodName)) { + return method.getParameterTypes()[0]; + } + } + return null; + } + + public static void setProperties(Class clazz, Object obj, String methodName, + Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class parameterClass = getMethodClass(clazz, methodName); + Method setterMethod = clazz.getMethod(methodName, parameterClass); + if (parameterClass == Boolean.TYPE) { + setterMethod.invoke(obj, Boolean.valueOf(value.toString())); + } else if (parameterClass == Integer.TYPE) { + setterMethod.invoke(obj, Integer.valueOf(value.toString())); + } else if (parameterClass == Double.TYPE) { + setterMethod.invoke(obj, Double.valueOf(value.toString())); + } else if (parameterClass == Float.TYPE) { + setterMethod.invoke(obj, Float.valueOf(value.toString())); + } else if (parameterClass == Long.TYPE) { + setterMethod.invoke(obj, Long.valueOf(value.toString())); + } else + setterMethod.invoke(obj, value); + } + + public static T populate(final Properties properties, final T obj) { + Class clazz = obj.getClass(); + try { + + Set> entries = properties.entrySet(); + for (Map.Entry entry : entries) { + String entryKey = entry.getKey().toString(); + String[] keyGroup = entryKey.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static T populate(final KeyValue properties, final T obj) { + Class clazz = obj.getClass(); + try { + + final Set keySet = properties.keySet(); + for (String key : keySet) { + String[] keyGroup = key.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } +} + diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java new file mode 100644 index 00000000..60c84081 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageHeader; +import io.openmessaging.OMS; +import io.openmessaging.SendResult; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.domain.SendResultImpl; +import java.lang.reflect.Field; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageAccessor; + +public class OMSUtil { + + /** + * Builds a OMS client instance name. + * + * @return a unique instance name + */ + public static String buildInstanceName() { + return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); + } + + public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { + org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); + rmqMessage.setBody(omsMessage.getBody()); + + KeyValue headers = omsMessage.headers(); + KeyValue properties = omsMessage.properties(); + + //All destinations in RocketMQ use Topic + if (headers.containsKey(MessageHeader.TOPIC)) { + rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + } else { + rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); + } + + for (String key : properties.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); + } + + //Headers has a high priority + for (String key : headers.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); + } + + return rmqMessage; + } + + public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { + BytesMessage omsMsg = new BytesMessageImpl(); + omsMsg.setBody(rmqMsg.getBody()); + + KeyValue headers = omsMsg.headers(); + KeyValue properties = omsMsg.properties(); + + final Set> entries = rmqMsg.getProperties().entrySet(); + + for (final Map.Entry entry : entries) { + if (isOMSHeader(entry.getKey())) { + headers.put(entry.getKey(), entry.getValue()); + } else { + properties.put(entry.getKey(), entry.getValue()); + } + } + + omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); + if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || + rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { + omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); + } else { + omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); + } + omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); + omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); + omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); + omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); + omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); + return omsMsg; + } + + public static boolean isOMSHeader(String value) { + for (Field field : MessageHeader.class.getDeclaredFields()) { + try { + if (field.get(MessageHeader.class).equals(value)) { + return true; + } + } catch (IllegalAccessException e) { + return false; + } + } + return false; + } + + /** + * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. + */ + public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { + assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK); + return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue()); + } + + public static KeyValue buildKeyValue(KeyValue... keyValues) { + KeyValue keyValue = OMS.newKeyValue(); + for (KeyValue properties : keyValues) { + for (String key : properties.keySet()) { + keyValue.put(key, properties.getString(key)); + } + } + return keyValue; + } + + /** + * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. + */ + public static Iterator cycle(final Iterable iterable) { + return new Iterator() { + Iterator iterator = new Iterator() { + @Override + public synchronized boolean hasNext() { + return false; + } + + @Override + public synchronized T next() { + throw new NoSuchElementException(); + } + + @Override + public synchronized void remove() { + //Ignore + } + }; + + @Override + public synchronized boolean hasNext() { + return iterator.hasNext() || iterable.iterator().hasNext(); + } + + @Override + public synchronized T next() { + if (!iterator.hasNext()) { + iterator = iterable.iterator(); + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + } + return iterator.next(); + } + + @Override + public synchronized void remove() { + iterator.remove(); + } + }; + } +} diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java new file mode 100644 index 00000000..ae4d3ed5 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class LocalMessageCacheTest { + private LocalMessageCache localMessageCache; + @Mock + private DefaultMQPullConsumer rocketmqPullConsume; + @Mock + private ConsumeRequest consumeRequest; + + @Before + public void init() { + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setRmqPullMessageBatchNums(512); + clientConfig.setRmqPullMessageCacheCapacity(1024); + localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig); + } + + @Test + public void testNextPullBatchNums() throws Exception { + assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512); + for (int i = 0; i < 513; i++) { + localMessageCache.submitConsumeRequest(consumeRequest); + } + assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511); + } + + @Test + public void testNextPullOffset() throws Exception { + MessageQueue messageQueue = new MessageQueue(); + when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean())) + .thenReturn(123L); + assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L); + } + + @Test + public void testUpdatePullOffset() throws Exception { + MessageQueue messageQueue = new MessageQueue(); + localMessageCache.updatePullOffset(messageQueue, 124L); + assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L); + } + + @Test + public void testSubmitConsumeRequest() throws Exception { + byte [] body = new byte[]{'1', '2', '3'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(body); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + + when(consumeRequest.getMessageExt()).thenReturn(consumedMsg); + localMessageCache.submitConsumeRequest(consumeRequest); + assertThat(localMessageCache.poll()).isEqualTo(consumedMsg); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java new file mode 100644 index 00000000..277a5c65 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PullConsumer; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PullConsumerImplTest { + private PullConsumer consumer; + private String queueName = "HELLO_QUEUE"; + + @Mock + private DefaultMQPullConsumer rocketmqPullConsumer; + private LocalMessageCache localMessageCache = + spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig())); + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + consumer = messagingAccessPoint.createPullConsumer(queueName, + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + + Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); + field.setAccessible(true); + field.set(consumer, rocketmqPullConsumer); //Replace + + field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); + field.setAccessible(true); + field.set(consumer, localMessageCache); + + messagingAccessPoint.startup(); + consumer.startup(); + } + + @Test + public void testPoll() { + final byte[] testBody = new byte[] {'a', 'b'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic(queueName); + + when(localMessageCache.poll()).thenReturn(consumedMsg); + + Message message = consumer.poll(); + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + } + + @Test + public void testPoll_WithTimeout() { + //There is a default timeout value, @see ClientConfig#omsOperationTimeout. + Message message = consumer.poll(); + assertThat(message).isNull(); + + message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100)); + assertThat(message).isNull(); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java new file mode 100644 index 00000000..882e57ea --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessageListener; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import java.util.Collections; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.common.message.MessageExt; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class PushConsumerImplTest { + private PushConsumer consumer; + + @Mock + private DefaultMQPushConsumer rocketmqPushConsumer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + consumer = messagingAccessPoint.createPushConsumer( + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + + Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); + field.setAccessible(true); + DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer); + field.set(consumer, rocketmqPushConsumer); //Replace + + when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); + messagingAccessPoint.startup(); + consumer.startup(); + } + + @Test + public void testConsumeMessage() { + final byte[] testBody = new byte[] {'a', 'b'}; + + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + consumer.attachQueue("HELLO_QUEUE", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); + context.ack(); + } + }); + ((MessageListenerConcurrently) rocketmqPushConsumer + .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java new file mode 100644 index 00000000..1db80c3e --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.Producer; +import io.openmessaging.exception.OMSRuntimeException; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ProducerImplTest { + private Producer producer; + + @Mock + private DefaultMQProducer rocketmqProducer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + producer = messagingAccessPoint.createProducer(); + + Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); + field.setAccessible(true); + field.set(producer, rocketmqProducer); + + messagingAccessPoint.startup(); + producer.startup(); + } + + @Test + public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("TestMsgID"); + sendResult.setSendStatus(SendStatus.SEND_OK); + when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); + io.openmessaging.SendResult omsResult = + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + + assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); + } + + @Test + public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT); + + when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); + try { + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (Exception e) { + assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); + } + } + + @Test + public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); + try { + producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (Exception e) { + assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); + } + } + +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java new file mode 100644 index 00000000..823fe015 --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.SequenceProducer; +import java.lang.reflect.Field; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SequenceProducerImplTest { + + private SequenceProducer producer; + + @Mock + private DefaultMQProducer rocketmqProducer; + + @Before + public void init() throws NoSuchFieldException, IllegalAccessException { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + producer = messagingAccessPoint.createSequenceProducer(); + + Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); + field.setAccessible(true); + field.set(producer, rocketmqProducer); + + messagingAccessPoint.startup(); + producer.startup(); + } + + @Test + public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("TestMsgID"); + sendResult.setSendStatus(SendStatus.SEND_OK); + when(rocketmqProducer.send(ArgumentMatchers.anyList())).thenReturn(sendResult); + when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); + final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); + producer.send(message); + producer.commit(); + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID"); + } + + @Test + public void testRollback() { + when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); + final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); + producer.send(message); + producer.rollback(); + producer.commit(); //Commit nothing. + assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null); + } +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java new file mode 100644 index 00000000..2240ff2d --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.promise; + +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.exception.OMSRuntimeException; +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown; + +public class DefaultPromiseTest { + private Promise promise; + + @Before + public void init() { + promise = new DefaultPromise<>(); + } + + @Test + public void testIsCancelled() throws Exception { + assertThat(promise.isCancelled()).isEqualTo(false); + } + + @Test + public void testIsDone() throws Exception { + assertThat(promise.isDone()).isEqualTo(false); + promise.set("Done"); + assertThat(promise.isDone()).isEqualTo(true); + } + + @Test + public void testGet() throws Exception { + promise.set("Done"); + assertThat(promise.get()).isEqualTo("Done"); + } + + @Test + public void testGet_WithTimeout() throws Exception { + try { + promise.get(100); + failBecauseExceptionWasNotThrown(OMSRuntimeException.class); + } catch (OMSRuntimeException e) { + assertThat(e).hasMessageContaining("Get request result is timeout or interrupted"); + } + } + + @Test + public void testAddListener() throws Exception { + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + assertThat(promise.get()).isEqualTo("Done"); + } + + @Override + public void operationFailed(final Promise promise) { + + } + }); + promise.set("Done"); + } + + @Test + public void testAddListener_ListenerAfterSet() throws Exception { + promise.set("Done"); + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + assertThat(promise.get()).isEqualTo("Done"); + } + + @Override + public void operationFailed(final Promise promise) { + + } + }); + } + + @Test + public void testAddListener_WithException_ListenerAfterSet() throws Exception { + final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.setFailure(exception); + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + } + + @Override + public void operationFailed(final Promise promise) { + assertThat(promise.getThrowable()).isEqualTo(exception); + } + }); + } + + @Test + public void testAddListener_WithException() throws Exception { + final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.addListener(new PromiseListener() { + @Override + public void operationCompleted(final Promise promise) { + } + + @Override + public void operationFailed(final Promise promise) { + assertThat(promise.getThrowable()).isEqualTo(exception); + } + }); + promise.setFailure(exception); + } + + @Test + public void getThrowable() throws Exception { + assertThat(promise.getThrowable()).isNull(); + Throwable exception = new OMSRuntimeException("-1", "Test Error"); + promise.setFailure(exception); + assertThat(promise.getThrowable()).isEqualTo(exception); + } + +} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java new file mode 100644 index 00000000..71ca11cc --- /dev/null +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import io.openmessaging.OMS; +import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class BeanUtilsTest { + private KeyValue properties = OMS.newKeyValue(); + + public static class CustomizedConfig extends ClientConfig { + final static String STRING_TEST = "string.test"; + String stringTest = "foobar"; + + final static String DOUBLE_TEST = "double.test"; + double doubleTest = 123.0; + + final static String LONG_TEST = "long.test"; + long longTest = 123L; + + String getStringTest() { + return stringTest; + } + + public void setStringTest(String stringTest) { + this.stringTest = stringTest; + } + + double getDoubleTest() { + return doubleTest; + } + + public void setDoubleTest(final double doubleTest) { + this.doubleTest = doubleTest; + } + + long getLongTest() { + return longTest; + } + + public void setLongTest(final long longTest) { + this.longTest = longTest; + } + + CustomizedConfig() { + } + } + + @Before + public void init() { + properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120); + properties.put(CustomizedConfig.STRING_TEST, "kaka"); + properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group"); + properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101); + + properties.put(CustomizedConfig.LONG_TEST, 1234567890L); + properties.put(CustomizedConfig.DOUBLE_TEST, 10.234); + } + + @Test + public void testPopulate() { + CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class); + + //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120); + Assert.assertEquals(config.getStringTest(), "kaka"); + Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group"); + Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101); + Assert.assertEquals(config.getLongTest(), 1234567890L); + Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001); + } + + @Test + public void testPopulate_ExistObj() { + CustomizedConfig config = new CustomizedConfig(); + config.setOmsConsumerId("NewConsumerId"); + + Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); + + config = BeanUtils.populate(properties, config); + + //RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class); + Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120); + Assert.assertEquals(config.getStringTest(), "kaka"); + Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group"); + Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101); + Assert.assertEquals(config.getLongTest(), 1234567890L); + Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001); + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 05ead639..25e4c841 100644 --- a/pom.xml +++ b/pom.xml @@ -181,6 +181,7 @@ filter test distribution + openmessaging @@ -617,6 +618,11 @@ guava 19.0 + + io.openmessaging + openmessaging-api + 0.1.0-alpha + -- GitLab