diff --git a/distribution/release-client.xml b/distribution/release-client.xml
index 46563eb449c094919ab998641e41479ea2e88f40..84d33a01708c0e3e4101cd78393979e34b4b16c9 100644
--- a/distribution/release-client.xml
+++ b/distribution/release-client.xml
@@ -47,6 +47,7 @@
trueorg.apache.rocketmq:rocketmq-client
+ org.apache.rocketmq:rocketmq-openmessaging./
diff --git a/distribution/release.xml b/distribution/release.xml
index 9e4ef2a017a9c874721f81f436dd6870cf0ac88f..c67d23e13b08160ebc4e7fd73d9abe8c4a3ab05a 100644
--- a/distribution/release.xml
+++ b/distribution/release.xml
@@ -68,6 +68,7 @@
org.apache.rocketmq:rocketmq-filtersrvorg.apache.rocketmq:rocketmq-exampleorg.apache.rocketmq:rocketmq-filter
+ org.apache.rocketmq:rocketmq-openmessaginglib/
diff --git a/example/pom.xml b/example/pom.xml
index 785a4ca856eb6be16d60c0c1b354ba8370c5424a..840fa36f16c53bf70eadc52e0871976dea53a9b5 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -48,5 +48,14 @@
org.javassistjavassist
+
+ io.openmessaging
+ openmessaging-api
+
+
+ org.apache.rocketmq
+ rocketmq-openmessaging
+ 4.1.0-incubating-SNAPSHOT
+
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..9d162ac1463a048e457d9e342fc490785a1f75d8
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.SendResult;
+import java.nio.charset.Charset;
+
+public class SimpleProducer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final Producer producer = messagingAccessPoint.createProducer();
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ producer.startup();
+ System.out.printf("Producer startup OK%n");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ producer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ {
+ Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ SendResult sendResult = producer.send(message);
+ //final Void aVoid = result.get(3000L);
+ System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
+ }
+
+ {
+ final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ result.addListener(new PromiseListener() {
+ @Override
+ public void operationCompleted(Promise promise) {
+ System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
+ }
+
+ @Override
+ public void operationFailed(Promise promise) {
+ System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
+ }
+ });
+ }
+
+ {
+ producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ System.out.printf("Send oneway message OK%n");
+ }
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..8e067724dee51049f58b20a19f6a73f53a80f975
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePullConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
+ OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ consumer.startup();
+ System.out.printf("Consumer startup OK%n");
+
+ while (true) {
+ Message message = consumer.poll();
+ if (message != null) {
+ String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
+ System.out.printf("Received one message: %s%n", msgId);
+ consumer.ack(msgId);
+ }
+ }
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
new file mode 100644
index 0000000000000000000000000000000000000000..b0935d4c74306513d999b5c60b23bec348b52999
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.openmessaging;
+
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.MessageListener;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.OMS;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class SimplePushConsumer {
+ public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
+
+ final PushConsumer consumer = messagingAccessPoint.
+ createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
+
+ messagingAccessPoint.startup();
+ System.out.printf("MessagingAccessPoint startup OK%n");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ consumer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
+ @Override
+ public void onMessage(final Message message, final ReceivedMessageContext context) {
+ System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
+ context.ack();
+ }
+ });
+
+ consumer.startup();
+ System.out.printf("Consumer startup OK%n");
+ }
+}
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
new file mode 100644
index 0000000000000000000000000000000000000000..e853642d23734de9153e4cb953aa1a2949aed528
--- /dev/null
+++ b/openmessaging/pom.xml
@@ -0,0 +1,42 @@
+
+
+
+
+
+ rocketmq-all
+ org.apache.rocketmq
+ 4.1.0-incubating-SNAPSHOT
+
+ 4.0.0
+
+ rocketmq-openmessaging
+ rocketmq-openmessaging ${project.version}
+
+
+
+ io.openmessaging
+ openmessaging-api
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+
\ No newline at end of file
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..65caf84084c859d7ae0990ac8d44a52e197a384e
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq;
+
+import io.openmessaging.IterableConsumer;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.Producer;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ResourceManager;
+import io.openmessaging.SequenceProducer;
+import io.openmessaging.ServiceEndPoint;
+import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.observer.Observer;
+import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
+import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
+import io.openmessaging.rocketmq.producer.ProducerImpl;
+import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+
+public class MessagingAccessPointImpl implements MessagingAccessPoint {
+ private final KeyValue accessPointProperties;
+
+ public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
+ this.accessPointProperties = accessPointProperties;
+ }
+
+ @Override
+ public KeyValue properties() {
+ return accessPointProperties;
+ }
+
+ @Override
+ public Producer createProducer() {
+ return new ProducerImpl(this.accessPointProperties);
+ }
+
+ @Override
+ public Producer createProducer(KeyValue properties) {
+ return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+ }
+
+ @Override
+ public SequenceProducer createSequenceProducer() {
+ return new SequenceProducerImpl(this.accessPointProperties);
+ }
+
+ @Override
+ public SequenceProducer createSequenceProducer(KeyValue properties) {
+ return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+ }
+
+ @Override
+ public PushConsumer createPushConsumer() {
+ return new PushConsumerImpl(accessPointProperties);
+ }
+
+ @Override
+ public PushConsumer createPushConsumer(KeyValue properties) {
+ return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+ }
+
+ @Override
+ public PullConsumer createPullConsumer(String queueName) {
+ return new PullConsumerImpl(queueName, accessPointProperties);
+ }
+
+ @Override
+ public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
+ return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
+ }
+
+ @Override
+ public IterableConsumer createIterableConsumer(String queueName) {
+ throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
+ }
+
+ @Override
+ public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) {
+ throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
+ }
+
+ @Override
+ public ResourceManager getResourceManager() {
+ throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
+ }
+
+ @Override
+ public ServiceEndPoint createServiceEndPoint() {
+ throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
+ }
+
+ @Override
+ public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
+ throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
+ }
+
+ @Override
+ public void addObserver(Observer observer) {
+ //Ignore
+ }
+
+ @Override
+ public void deleteObserver(Observer observer) {
+ //Ignore
+ }
+
+ @Override
+ public void startup() {
+ //Ignore
+ }
+
+ @Override
+ public void shutdown() {
+ //Ignore
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..7077c6dc997b0aa87806cf4483a9cf390e5aec9c
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.config;
+
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+
+public class ClientConfig implements PropertyKeys, NonStandardKeys {
+ private String omsDriverImpl;
+ private String omsAccessPoints;
+ private String omsNamespace;
+ private String omsProducerId;
+ private String omsConsumerId;
+ private int omsOperationTimeout = 5000;
+ private String omsRoutingName;
+ private String omsOperatorName;
+ private String omsDstQueue;
+ private String omsSrcTopic;
+ private String rmqConsumerGroup;
+ private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
+ private int rmqMaxRedeliveryTimes = 16;
+ private int rmqMessageConsumeTimeout = 15; //In minutes
+ private int rmqMaxConsumeThreadNums = 64;
+ private int rmqMinConsumeThreadNums = 20;
+ private String rmqMessageDestination;
+ private int rmqPullMessageBatchNums = 32;
+ private int rmqPullMessageCacheCapacity = 1000;
+
+ public String getOmsDriverImpl() {
+ return omsDriverImpl;
+ }
+
+ public void setOmsDriverImpl(final String omsDriverImpl) {
+ this.omsDriverImpl = omsDriverImpl;
+ }
+
+ public String getOmsAccessPoints() {
+ return omsAccessPoints;
+ }
+
+ public void setOmsAccessPoints(final String omsAccessPoints) {
+ this.omsAccessPoints = omsAccessPoints;
+ }
+
+ public String getOmsNamespace() {
+ return omsNamespace;
+ }
+
+ public void setOmsNamespace(final String omsNamespace) {
+ this.omsNamespace = omsNamespace;
+ }
+
+ public String getOmsProducerId() {
+ return omsProducerId;
+ }
+
+ public void setOmsProducerId(final String omsProducerId) {
+ this.omsProducerId = omsProducerId;
+ }
+
+ public String getOmsConsumerId() {
+ return omsConsumerId;
+ }
+
+ public void setOmsConsumerId(final String omsConsumerId) {
+ this.omsConsumerId = omsConsumerId;
+ }
+
+ public int getOmsOperationTimeout() {
+ return omsOperationTimeout;
+ }
+
+ public void setOmsOperationTimeout(final int omsOperationTimeout) {
+ this.omsOperationTimeout = omsOperationTimeout;
+ }
+
+ public String getOmsRoutingName() {
+ return omsRoutingName;
+ }
+
+ public void setOmsRoutingName(final String omsRoutingName) {
+ this.omsRoutingName = omsRoutingName;
+ }
+
+ public String getOmsOperatorName() {
+ return omsOperatorName;
+ }
+
+ public void setOmsOperatorName(final String omsOperatorName) {
+ this.omsOperatorName = omsOperatorName;
+ }
+
+ public String getOmsDstQueue() {
+ return omsDstQueue;
+ }
+
+ public void setOmsDstQueue(final String omsDstQueue) {
+ this.omsDstQueue = omsDstQueue;
+ }
+
+ public String getOmsSrcTopic() {
+ return omsSrcTopic;
+ }
+
+ public void setOmsSrcTopic(final String omsSrcTopic) {
+ this.omsSrcTopic = omsSrcTopic;
+ }
+
+ public String getRmqConsumerGroup() {
+ return rmqConsumerGroup;
+ }
+
+ public void setRmqConsumerGroup(final String rmqConsumerGroup) {
+ this.rmqConsumerGroup = rmqConsumerGroup;
+ }
+
+ public String getRmqProducerGroup() {
+ return rmqProducerGroup;
+ }
+
+ public void setRmqProducerGroup(final String rmqProducerGroup) {
+ this.rmqProducerGroup = rmqProducerGroup;
+ }
+
+ public int getRmqMaxRedeliveryTimes() {
+ return rmqMaxRedeliveryTimes;
+ }
+
+ public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) {
+ this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
+ }
+
+ public int getRmqMessageConsumeTimeout() {
+ return rmqMessageConsumeTimeout;
+ }
+
+ public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) {
+ this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
+ }
+
+ public int getRmqMaxConsumeThreadNums() {
+ return rmqMaxConsumeThreadNums;
+ }
+
+ public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) {
+ this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
+ }
+
+ public int getRmqMinConsumeThreadNums() {
+ return rmqMinConsumeThreadNums;
+ }
+
+ public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) {
+ this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
+ }
+
+ public String getRmqMessageDestination() {
+ return rmqMessageDestination;
+ }
+
+ public void setRmqMessageDestination(final String rmqMessageDestination) {
+ this.rmqMessageDestination = rmqMessageDestination;
+ }
+
+ public int getRmqPullMessageBatchNums() {
+ return rmqPullMessageBatchNums;
+ }
+
+ public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) {
+ this.rmqPullMessageBatchNums = rmqPullMessageBatchNums;
+ }
+
+ public int getRmqPullMessageCacheCapacity() {
+ return rmqPullMessageCacheCapacity;
+ }
+
+ public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
+ this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
new file mode 100644
index 0000000000000000000000000000000000000000..90f9e03ed3db804056c36a21f5c136831f4ad59f
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.slf4j.Logger;
+
+class LocalMessageCache implements ServiceLifecycle {
+ private final BlockingQueue consumeRequestCache;
+ private final Map consumedRequest;
+ private final ConcurrentHashMap pullOffsetTable;
+ private final DefaultMQPullConsumer rocketmqPullConsumer;
+ private final ClientConfig clientConfig;
+ private final ScheduledExecutorService cleanExpireMsgExecutors;
+
+ private final static Logger log = ClientLogger.getLog();
+
+ LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
+ consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
+ this.consumedRequest = new ConcurrentHashMap<>();
+ this.pullOffsetTable = new ConcurrentHashMap<>();
+ this.rocketmqPullConsumer = rocketmqPullConsumer;
+ this.clientConfig = clientConfig;
+ this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
+ "OMS_CleanExpireMsgScheduledThread_"));
+ }
+
+ int nextPullBatchNums() {
+ return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity());
+ }
+
+ long nextPullOffset(MessageQueue remoteQueue) {
+ if (!pullOffsetTable.containsKey(remoteQueue)) {
+ try {
+ pullOffsetTable.putIfAbsent(remoteQueue,
+ rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
+ } catch (MQClientException e) {
+ log.error("A error occurred in fetch consume offset process.", e);
+ }
+ }
+ return pullOffsetTable.get(remoteQueue);
+ }
+
+ void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
+ pullOffsetTable.put(remoteQueue, nextPullOffset);
+ }
+
+ void submitConsumeRequest(ConsumeRequest consumeRequest) {
+ try {
+ consumeRequestCache.put(consumeRequest);
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ MessageExt poll() {
+ return poll(clientConfig.getOmsOperationTimeout());
+ }
+
+ MessageExt poll(final KeyValue properties) {
+ int currentPollTimeout = clientConfig.getOmsOperationTimeout();
+ if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
+ currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+ }
+ return poll(currentPollTimeout);
+ }
+
+ private MessageExt poll(long timeout) {
+ try {
+ ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
+ if (consumeRequest != null) {
+ MessageExt messageExt = consumeRequest.getMessageExt();
+ consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
+ MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
+ consumedRequest.put(messageExt.getMsgId(), consumeRequest);
+ return messageExt;
+ }
+ } catch (InterruptedException ignore) {
+ }
+ return null;
+ }
+
+ void ack(final String messageId) {
+ ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
+ if (consumeRequest != null) {
+ long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
+ try {
+ rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
+ } catch (MQClientException e) {
+ log.error("A error occurred in update consume offset process.", e);
+ }
+ }
+ }
+
+ void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
+ consumedRequest.remove(messageExt.getMsgId());
+ long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
+ try {
+ rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset);
+ } catch (MQClientException e) {
+ log.error("A error occurred in update consume offset process.", e);
+ }
+ }
+
+ @Override
+ public void startup() {
+ this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ cleanExpireMsg();
+ }
+ }, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
+ }
+
+ @Override
+ public void shutdown() {
+ ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
+ }
+
+ private void cleanExpireMsg() {
+ for (final Map.Entry next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
+ .getRebalanceImpl().getProcessQueueTable().entrySet()) {
+ ProcessQueue pq = next.getValue();
+ MessageQueue mq = next.getKey();
+ ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
+ if (lockTreeMap == null) {
+ log.error("Gets tree map lock in process queue error, may be has compatibility issue");
+ return;
+ }
+
+ TreeMap msgTreeMap = pq.getMsgTreeMap();
+
+ int loop = msgTreeMap.size();
+ for (int i = 0; i < loop; i++) {
+ MessageExt msg = null;
+ try {
+ lockTreeMap.readLock().lockInterruptibly();
+ try {
+ if (!msgTreeMap.isEmpty()) {
+ msg = msgTreeMap.firstEntry().getValue();
+ if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
+ > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
+ //Expired, ack and remove it.
+ } else {
+ break;
+ }
+ } else {
+ break;
+ }
+ } finally {
+ lockTreeMap.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("Gets expired message exception", e);
+ }
+
+ try {
+ rocketmqPullConsumer.sendMessageBack(msg, 3);
+ log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
+ msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
+ ack(mq, pq, msg);
+ } catch (Exception e) {
+ log.error("Send back expired msg exception", e);
+ }
+ }
+ }
+ }
+
+ private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
+ try {
+ return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
+ } catch (IllegalAccessException e) {
+ return null;
+ }
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..8d396d43c5f88d7582f729b025dba216eed3e0be
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PullConsumer;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.ConsumeRequest;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullTaskCallback;
+import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.slf4j.Logger;
+
+public class PullConsumerImpl implements PullConsumer {
+ private final DefaultMQPullConsumer rocketmqPullConsumer;
+ private final KeyValue properties;
+ private boolean started = false;
+ private String targetQueueName;
+ private final MQPullConsumerScheduleService pullConsumerScheduleService;
+ private final LocalMessageCache localMessageCache;
+ private final ClientConfig clientConfig;
+
+ final static Logger log = ClientLogger.getLog();
+
+ public PullConsumerImpl(final String queueName, final KeyValue properties) {
+ this.properties = properties;
+ this.targetQueueName = queueName;
+
+ this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+ String consumerGroup = clientConfig.getRmqConsumerGroup();
+ if (null == consumerGroup || consumerGroup.isEmpty()) {
+ throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
+ }
+ pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
+
+ this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
+
+ String accessPoints = clientConfig.getOmsAccessPoints();
+ if (accessPoints == null || accessPoints.isEmpty()) {
+ throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+ }
+ this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+ this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
+
+ int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
+ this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
+
+ String consumerId = OMSUtil.buildInstanceName();
+ this.rocketmqPullConsumer.setInstanceName(consumerId);
+ properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+ this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public Message poll() {
+ MessageExt rmqMsg = localMessageCache.poll();
+ return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
+ }
+
+ @Override
+ public Message poll(final KeyValue properties) {
+ MessageExt rmqMsg = localMessageCache.poll(properties);
+ return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
+ }
+
+ @Override
+ public void ack(final String messageId) {
+ localMessageCache.ack(messageId);
+ }
+
+ @Override
+ public void ack(final String messageId, final KeyValue properties) {
+ localMessageCache.ack(messageId);
+ }
+
+ @Override
+ public synchronized void startup() {
+ if (!started) {
+ try {
+ registerPullTaskCallback();
+ this.pullConsumerScheduleService.start();
+ this.localMessageCache.startup();
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException("-1", e);
+ }
+ }
+ this.started = true;
+ }
+
+ private void registerPullTaskCallback() {
+ this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
+ @Override
+ public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
+ MQPullConsumer consumer = context.getPullConsumer();
+ try {
+ long offset = localMessageCache.nextPullOffset(mq);
+
+ PullResult pullResult = consumer.pull(mq, "*",
+ offset, localMessageCache.nextPullBatchNums());
+ ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
+ .getProcessQueueTable().get(mq);
+ switch (pullResult.getPullStatus()) {
+ case FOUND:
+ if (pq != null) {
+ pq.putMessage(pullResult.getMsgFoundList());
+ for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
+ localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
+ } catch (Exception e) {
+ log.error("A error occurred in pull message process.", e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (this.started) {
+ this.localMessageCache.shutdown();
+ this.pullConsumerScheduleService.shutdown();
+ this.rocketmqPullConsumer.shutdown();
+ }
+ this.started = false;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..f9b8058e04af862da90edb074c6d3d7b52ff603f
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.consumer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageListener;
+import io.openmessaging.OMS;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.PushConsumer;
+import io.openmessaging.ReceivedMessageContext;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.NonStandardKeys;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class PushConsumerImpl implements PushConsumer {
+ private final DefaultMQPushConsumer rocketmqPushConsumer;
+ private final KeyValue properties;
+ private boolean started = false;
+ private final Map subscribeTable = new ConcurrentHashMap<>();
+ private final ClientConfig clientConfig;
+
+ public PushConsumerImpl(final KeyValue properties) {
+ this.rocketmqPushConsumer = new DefaultMQPushConsumer();
+ this.properties = properties;
+ this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+ String accessPoints = clientConfig.getOmsAccessPoints();
+ if (accessPoints == null || accessPoints.isEmpty()) {
+ throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+ }
+ this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
+
+ String consumerGroup = clientConfig.getRmqConsumerGroup();
+ if (null == consumerGroup || consumerGroup.isEmpty()) {
+ throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
+ }
+ this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
+ this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
+ this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());
+ this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
+ this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
+
+ String consumerId = OMSUtil.buildInstanceName();
+ this.rocketmqPushConsumer.setInstanceName(consumerId);
+ properties.put(PropertyKeys.CONSUMER_ID, consumerId);
+
+ this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public void resume() {
+ this.rocketmqPushConsumer.resume();
+ }
+
+ @Override
+ public void suspend() {
+ this.rocketmqPushConsumer.suspend();
+ }
+
+ @Override
+ public boolean isSuspended() {
+ return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
+ }
+
+ @Override
+ public PushConsumer attachQueue(final String queueName, final MessageListener listener) {
+ this.subscribeTable.put(queueName, listener);
+ try {
+ this.rocketmqPushConsumer.subscribe(queueName, "*");
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName));
+ }
+ return this;
+ }
+
+ @Override
+ public synchronized void startup() {
+ if (!started) {
+ try {
+ this.rocketmqPushConsumer.start();
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException("-1", e);
+ }
+ }
+ this.started = true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (this.started) {
+ this.rocketmqPushConsumer.shutdown();
+ }
+ this.started = false;
+ }
+
+ class MessageListenerImpl implements MessageListenerConcurrently {
+
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList,
+ ConsumeConcurrentlyContext contextRMQ) {
+ MessageExt rmqMsg = rmqMsgList.get(0);
+ BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);
+
+ MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic());
+
+ if (listener == null) {
+ throw new OMSRuntimeException("-1",
+ String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
+ }
+
+ final KeyValue contextProperties = OMS.newKeyValue();
+ final CountDownLatch sync = new CountDownLatch(1);
+
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
+
+ ReceivedMessageContext context = new ReceivedMessageContext() {
+ @Override
+ public KeyValue properties() {
+ return contextProperties;
+ }
+
+ @Override
+ public void ack() {
+ sync.countDown();
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
+ }
+
+ @Override
+ public void ack(final KeyValue properties) {
+ sync.countDown();
+ contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
+ properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+ }
+ };
+ long begin = System.currentTimeMillis();
+ listener.onMessage(omsMsg, context);
+ long costs = System.currentTimeMillis() - begin;
+ long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
+ try {
+ sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignore) {
+ }
+
+ return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
+ }
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..43f80ae5be47895393097e3c6d5047c02054f22d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.OMS;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+
+public class BytesMessageImpl implements BytesMessage {
+ private KeyValue headers;
+ private KeyValue properties;
+ private byte[] body;
+
+ public BytesMessageImpl() {
+ this.headers = OMS.newKeyValue();
+ this.properties = OMS.newKeyValue();
+ }
+
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ @Override
+ public BytesMessage setBody(final byte[] body) {
+ this.body = body;
+ return this;
+ }
+
+ @Override
+ public KeyValue headers() {
+ return headers;
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final int value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final long value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final double value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final String value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final int value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final long value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final double value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final String value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
new file mode 100644
index 0000000000000000000000000000000000000000..7ce4a9b4236e4324e9fa2d3b9a62aa1f903f9aff
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+public class ConsumeRequest {
+ private final MessageExt messageExt;
+ private final MessageQueue messageQueue;
+ private final ProcessQueue processQueue;
+ private long startConsumeTimeMillis;
+
+ public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
+ final ProcessQueue processQueue) {
+ this.messageExt = messageExt;
+ this.messageQueue = messageQueue;
+ this.processQueue = processQueue;
+ }
+
+ public MessageExt getMessageExt() {
+ return messageExt;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public ProcessQueue getProcessQueue() {
+ return processQueue;
+ }
+
+ public long getStartConsumeTimeMillis() {
+ return startConsumeTimeMillis;
+ }
+
+ public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
+ this.startConsumeTimeMillis = startConsumeTimeMillis;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
new file mode 100644
index 0000000000000000000000000000000000000000..3639a3f836b8b7446e1021740081ab360a4da79a
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+public interface NonStandardKeys {
+ String CONSUMER_GROUP = "rmq.consumer.group";
+ String PRODUCER_GROUP = "rmq.producer.group";
+ String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times";
+ String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout";
+ String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums";
+ String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
+ String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
+ String MESSAGE_DESTINATION = "rmq.message.destination";
+ String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
+ String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..228a9f0b5956dba2fd039e55f330395dde8325dc
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.SendResult;
+
+public class SendResultImpl implements SendResult {
+ private String messageId;
+ private KeyValue properties;
+
+ public SendResultImpl(final String messageId, final KeyValue properties) {
+ this.messageId = messageId;
+ this.properties = properties;
+ }
+
+ @Override
+ public String messageId() {
+ return messageId;
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..8246bcd294ac1987dedcebdea398bceaa597ef67
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageFactory;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.exception.OMSTimeOutException;
+import io.openmessaging.rocketmq.config.ClientConfig;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import io.openmessaging.rocketmq.utils.BeanUtils;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.slf4j.Logger;
+
+import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
+
+abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
+ final static Logger log = ClientLogger.getLog();
+ final KeyValue properties;
+ final DefaultMQProducer rocketmqProducer;
+ private boolean started = false;
+ final ClientConfig clientConfig;
+
+ AbstractOMSProducer(final KeyValue properties) {
+ this.properties = properties;
+ this.rocketmqProducer = new DefaultMQProducer();
+ this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
+
+ String accessPoints = clientConfig.getOmsAccessPoints();
+ if (accessPoints == null || accessPoints.isEmpty()) {
+ throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+ }
+ this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
+ this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
+
+ String producerId = buildInstanceName();
+ this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
+ this.rocketmqProducer.setInstanceName(producerId);
+ this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+ properties.put(PropertyKeys.PRODUCER_ID, producerId);
+ }
+
+ @Override
+ public synchronized void startup() {
+ if (!started) {
+ try {
+ this.rocketmqProducer.start();
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException("-1", e);
+ }
+ }
+ this.started = true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (this.started) {
+ this.rocketmqProducer.shutdown();
+ }
+ this.started = false;
+ }
+
+ OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
+ if (e instanceof MQClientException) {
+ if (e.getCause() != null) {
+ if (e.getCause() instanceof RemotingTimeoutException) {
+ return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
+ this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
+ } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
+ MQBrokerException brokerException = (MQBrokerException) e.getCause();
+ return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
+ topic, msgId, brokerException.getErrorMessage()), e);
+ }
+ }
+ // Exception thrown by local.
+ else {
+ MQClientException clientException = (MQClientException) e;
+ if (-1 == clientException.getResponseCode()) {
+ return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
+ topic, msgId), e);
+ } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
+ return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
+ topic, msgId), e);
+ }
+ }
+ }
+ return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
+ }
+
+ protected void checkMessageType(Message message) {
+ if (!(message instanceof BytesMessage)) {
+ throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
+ }
+ }
+
+ @Override
+ public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
+ BytesMessage bytesMessage = new BytesMessageImpl();
+ bytesMessage.setBody(body);
+ bytesMessage.headers().put(MessageHeader.TOPIC, topic);
+ return bytesMessage;
+ }
+
+ @Override
+ public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
+ BytesMessage bytesMessage = new BytesMessageImpl();
+ bytesMessage.setBody(body);
+ bytesMessage.headers().put(MessageHeader.QUEUE, queue);
+ return bytesMessage;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..2c00c60ebc476137dd0ff39d4f495034f82b6882
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.SendResult;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.promise.DefaultPromise;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendStatus;
+
+import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
+
+public class ProducerImpl extends AbstractOMSProducer implements Producer {
+
+ public ProducerImpl(final KeyValue properties) {
+ super(properties);
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public SendResult send(final Message message) {
+ return send(message, this.rocketmqProducer.getSendMsgTimeout());
+ }
+
+ @Override
+ public SendResult send(final Message message, final KeyValue properties) {
+ long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+ ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+ return send(message, timeout);
+ }
+
+ private SendResult send(final Message message, long timeout) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ try {
+ org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
+ if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
+ log.error(String.format("Send message to RocketMQ failed, %s", message));
+ throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
+ }
+ message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+ return OMSUtil.sendResultConvert(rmqResult);
+ } catch (Exception e) {
+ log.error(String.format("Send message to RocketMQ failed, %s", message), e);
+ throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+ }
+ }
+
+ @Override
+ public Promise sendAsync(final Message message) {
+ return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
+ }
+
+ @Override
+ public Promise sendAsync(final Message message, final KeyValue properties) {
+ long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+ ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+ return sendAsync(message, timeout);
+ }
+
+ private Promise sendAsync(final Message message, long timeout) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ final Promise promise = new DefaultPromise<>();
+ try {
+ this.rocketmqProducer.send(rmqMessage, new SendCallback() {
+ @Override
+ public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
+ message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+ promise.set(OMSUtil.sendResultConvert(rmqResult));
+ }
+
+ @Override
+ public void onException(final Throwable e) {
+ promise.setFailure(e);
+ }
+ }, timeout);
+ } catch (Exception e) {
+ promise.setFailure(e);
+ }
+ return promise;
+ }
+
+ @Override
+ public void sendOneway(final Message message) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ try {
+ this.rocketmqProducer.sendOneway(rmqMessage);
+ } catch (Exception ignore) { //Ignore the oneway exception.
+ }
+ }
+
+ @Override
+ public void sendOneway(final Message message, final KeyValue properties) {
+ sendOneway(message);
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..05225cc5058aa1d31fadc335ff0d2a86f0e03337
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.SequenceProducer;
+import io.openmessaging.rocketmq.utils.OMSUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendResult;
+
+public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
+
+ private BlockingQueue msgCacheQueue;
+
+ public SequenceProducerImpl(final KeyValue properties) {
+ super(properties);
+ this.msgCacheQueue = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public void send(final Message message) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
+ try {
+ Validators.checkMessage(rmqMessage, this.rocketmqProducer);
+ } catch (MQClientException e) {
+ throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+ }
+ msgCacheQueue.add(message);
+ }
+
+ @Override
+ public void send(final Message message, final KeyValue properties) {
+ send(message);
+ }
+
+ @Override
+ public synchronized void commit() {
+ List messages = new ArrayList<>();
+ msgCacheQueue.drainTo(messages);
+
+ List rmqMessages = new ArrayList<>();
+
+ for (Message message : messages) {
+ rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
+ }
+
+ if (rmqMessages.size() == 0) {
+ return;
+ }
+
+ try {
+ SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
+ String[] msgIdArray = sendResult.getMsgId().split(",");
+ for (int i = 0; i < messages.size(); i++) {
+ Message message = messages.get(i);
+ message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
+ }
+ } catch (Exception e) {
+ throw checkProducerException("", "", e);
+ }
+ }
+
+ @Override
+ public synchronized void rollback() {
+ msgCacheQueue.clear();
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
new file mode 100644
index 0000000000000000000000000000000000000000..c863ccf6ade2848eb61cc2cca789cc9bb2d50366
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultPromise implements Promise {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
+ private final Object lock = new Object();
+ private volatile FutureState state = FutureState.DOING;
+ private V result = null;
+ private long timeout;
+ private long createTime;
+ private Throwable exception = null;
+ private List> promiseListenerList;
+
+ public DefaultPromise() {
+ createTime = System.currentTimeMillis();
+ promiseListenerList = new ArrayList<>();
+ timeout = 5000;
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state.isCancelledState();
+ }
+
+ @Override
+ public boolean isDone() {
+ return state.isDoneState();
+ }
+
+ @Override
+ public V get() {
+ return result;
+ }
+
+ @Override
+ public V get(final long timeout) {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return getValueOrThrowable();
+ }
+
+ if (timeout <= 0) {
+ try {
+ lock.wait();
+ } catch (Exception e) {
+ cancel(e);
+ }
+ return getValueOrThrowable();
+ } else {
+ long waitTime = timeout - (System.currentTimeMillis() - createTime);
+ if (waitTime > 0) {
+ for (;; ) {
+ try {
+ lock.wait(waitTime);
+ } catch (InterruptedException e) {
+ LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+ }
+
+ if (!isDoing()) {
+ break;
+ } else {
+ waitTime = timeout - (System.currentTimeMillis() - createTime);
+ if (waitTime <= 0) {
+ break;
+ }
+ }
+ }
+ }
+
+ if (isDoing()) {
+ timeoutSoCancel();
+ }
+ }
+ return getValueOrThrowable();
+ }
+ }
+
+ @Override
+ public boolean set(final V value) {
+ if (value == null)
+ return false;
+ this.result = value;
+ return done();
+ }
+
+ @Override
+ public boolean setFailure(final Throwable cause) {
+ if (cause == null)
+ return false;
+ this.exception = cause;
+ return done();
+ }
+
+ @Override
+ public void addListener(final PromiseListener listener) {
+ if (listener == null) {
+ throw new NullPointerException("FutureListener is null");
+ }
+
+ boolean notifyNow = false;
+ synchronized (lock) {
+ if (!isDoing()) {
+ notifyNow = true;
+ } else {
+ if (promiseListenerList == null) {
+ promiseListenerList = new ArrayList<>();
+ }
+ promiseListenerList.add(listener);
+ }
+ }
+
+ if (notifyNow) {
+ notifyListener(listener);
+ }
+ }
+
+ @Override
+ public Throwable getThrowable() {
+ return exception;
+ }
+
+ private void notifyListeners() {
+ if (promiseListenerList != null) {
+ for (PromiseListener listener : promiseListenerList) {
+ notifyListener(listener);
+ }
+ }
+ }
+
+ private boolean isSuccess() {
+ return isDone() && (exception == null);
+ }
+
+ private void timeoutSoCancel() {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return;
+ }
+ state = FutureState.CANCELLED;
+ exception = new RuntimeException("Get request result is timeout or interrupted");
+ lock.notifyAll();
+ }
+ notifyListeners();
+ }
+
+ private V getValueOrThrowable() {
+ if (exception != null) {
+ Throwable e = exception.getCause() != null ? exception.getCause() : exception;
+ throw new OMSRuntimeException("-1", e);
+ }
+ notifyListeners();
+ return result;
+ }
+
+ private boolean isDoing() {
+ return state.isDoingState();
+ }
+
+ private boolean done() {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return false;
+ }
+
+ state = FutureState.DONE;
+ lock.notifyAll();
+ }
+
+ notifyListeners();
+ return true;
+ }
+
+ private void notifyListener(final PromiseListener listener) {
+ try {
+ if (exception != null)
+ listener.operationFailed(this);
+ else
+ listener.operationCompleted(this);
+ } catch (Throwable t) {
+ LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
+ }
+ }
+
+ private boolean cancel(Exception e) {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return false;
+ }
+
+ state = FutureState.CANCELLED;
+ exception = e;
+ lock.notifyAll();
+ }
+
+ notifyListeners();
+ return true;
+ }
+}
+
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
new file mode 100644
index 0000000000000000000000000000000000000000..84b6c2d74efb4df2125e0406977a17e27bce726a
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.rocketmq.promise;
+
+public enum FutureState {
+ /**
+ * the task is doing
+ **/
+ DOING(0),
+ /**
+ * the task is done
+ **/
+ DONE(1),
+ /**
+ * ths task is cancelled
+ **/
+ CANCELLED(2);
+
+ public final int value;
+
+ private FutureState(int value) {
+ this.value = value;
+ }
+
+ public boolean isCancelledState() {
+ return this == CANCELLED;
+ }
+
+ public boolean isDoneState() {
+ return this == DONE;
+ }
+
+ public boolean isDoingState() {
+ return this == DOING;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
new file mode 100644
index 0000000000000000000000000000000000000000..104d3d964fa87f758ecbd5fa39848626a6dab447
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.utils;
+
+import io.openmessaging.KeyValue;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.slf4j.Logger;
+
+public final class BeanUtils {
+ final static Logger log = ClientLogger.getLog();
+
+ /**
+ * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
+ */
+ private static Map, Class>> primitiveWrapperMap = new HashMap, Class>>();
+
+ static {
+ primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
+ primitiveWrapperMap.put(Byte.TYPE, Byte.class);
+ primitiveWrapperMap.put(Character.TYPE, Character.class);
+ primitiveWrapperMap.put(Short.TYPE, Short.class);
+ primitiveWrapperMap.put(Integer.TYPE, Integer.class);
+ primitiveWrapperMap.put(Long.TYPE, Long.class);
+ primitiveWrapperMap.put(Double.TYPE, Double.class);
+ primitiveWrapperMap.put(Float.TYPE, Float.class);
+ primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
+ }
+
+ private static Map, Class>> wrapperMap = new HashMap, Class>>();
+
+ static {
+ for (final Class> primitiveClass : primitiveWrapperMap.keySet()) {
+ final Class> wrapperClass = primitiveWrapperMap.get(primitiveClass);
+ if (!primitiveClass.equals(wrapperClass)) {
+ wrapperMap.put(wrapperClass, primitiveClass);
+ }
+ }
+ wrapperMap.put(String.class, String.class);
+ }
+
+ /**
+ *
Populate the JavaBeans properties of the specified bean, based on
+ * the specified name/value pairs. This method uses Java reflection APIs
+ * to identify corresponding "property setter" method names, and deals
+ * with setter arguments of type String, boolean,
+ * int, long, float, and
+ * double.
+ *
+ *
The particular setter method to be called for each property is
+ * determined using the usual JavaBeans introspection mechanisms. Thus,
+ * you may identify custom setter methods using a BeanInfo class that is
+ * associated with the class of the bean itself. If no such BeanInfo
+ * class is available, the standard method name conversion ("set" plus
+ * the capitalized name of the property in question) is used.
+ *
+ *
NOTE: It is contrary to the JavaBeans Specification
+ * to have more than one setter method (with different argument
+ * signatures) for the same property.
+ *
+ * @param clazz JavaBean class whose properties are being populated
+ * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
+ * @param Class type
+ * @return Class instance
+ */
+ public static T populate(final Properties properties, final Class clazz) {
+ T obj = null;
+ try {
+ obj = clazz.newInstance();
+ return populate(properties, obj);
+ } catch (Throwable e) {
+ log.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ public static T populate(final KeyValue properties, final Class clazz) {
+ T obj = null;
+ try {
+ obj = clazz.newInstance();
+ return populate(properties, obj);
+ } catch (Throwable e) {
+ log.warn("Error occurs !", e);
+ }
+ return obj;
+ }
+
+ public static Class> getMethodClass(Class> clazz, String methodName) {
+ Method[] methods = clazz.getMethods();
+ for (Method method : methods) {
+ if (method.getName().equalsIgnoreCase(methodName)) {
+ return method.getParameterTypes()[0];
+ }
+ }
+ return null;
+ }
+
+ public static void setProperties(Class> clazz, Object obj, String methodName,
+ Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+ Class> parameterClass = getMethodClass(clazz, methodName);
+ Method setterMethod = clazz.getMethod(methodName, parameterClass);
+ if (parameterClass == Boolean.TYPE) {
+ setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
+ } else if (parameterClass == Integer.TYPE) {
+ setterMethod.invoke(obj, Integer.valueOf(value.toString()));
+ } else if (parameterClass == Double.TYPE) {
+ setterMethod.invoke(obj, Double.valueOf(value.toString()));
+ } else if (parameterClass == Float.TYPE) {
+ setterMethod.invoke(obj, Float.valueOf(value.toString()));
+ } else if (parameterClass == Long.TYPE) {
+ setterMethod.invoke(obj, Long.valueOf(value.toString()));
+ } else
+ setterMethod.invoke(obj, value);
+ }
+
+ public static T populate(final Properties properties, final T obj) {
+ Class> clazz = obj.getClass();
+ try {
+
+ Set> entries = properties.entrySet();
+ for (Map.Entry