diff --git a/example/pom.xml b/example/pom.xml
index 785a4ca856eb6be16d60c0c1b354ba8370c5424a..840fa36f16c53bf70eadc52e0871976dea53a9b5 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -48,5 +48,14 @@
org.javassist
javassist
+
+ io.openmessaging
+ openmessaging-api
+
+
+ org.apache.rocketmq
+ rocketmq-openmessaging
+ 4.1.0-incubating-SNAPSHOT
+
diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
index 3b71849ed9a1c5f0065478b4495a889862fd2a5d..5a27f5a006e2184798be3a8995a20361a078abe8 100644
--- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java
@@ -16,8 +16,59 @@
*/
package org.apache.rocketmq.example.openmessaging;
+import io.openmessaging.Message;
+import io.openmessaging.MessagingAccessPoint;
+import io.openmessaging.MessagingAccessPointFactory;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.SendResult;
+import java.nio.charset.Charset;
+
public class SimpleProducer {
public static void main(String[] args) {
+ final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
+ .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace");
+
+ final Producer producer = messagingAccessPoint.createProducer();
+
+ messagingAccessPoint.startup();
+ System.out.println("messagingAccessPoint startup OK");
+
+ producer.startup();
+ System.out.println("producer startup OK");
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ producer.shutdown();
+ messagingAccessPoint.shutdown();
+ }
+ }));
+
+ {
+ Message message = producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")));
+ SendResult sendResult = producer.send(message);
+ //final Void aVoid = result.get(3000L);
+ System.out.println("send async message OK, msgId: " + sendResult.messageId());
+ }
+
+ {
+ final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ result.addListener(new PromiseListener() {
+ @Override public void operationCompleted(Promise promise) {
+ System.out.println("Send async message OK, msgId: " + promise.get().messageId());
+ }
+
+ @Override public void operationFailed(Promise promise) {
+ System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage());
+ }
+ });
+ }
+ {
+ producer.sendOneway(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
+ System.out.println("Send oneway message OK");
+ }
}
}
diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml
index d56865044f432e89ce8baadf9389cfd9a500da72..d6498127f584de3a3c354d74ac785e4097db0cfd 100644
--- a/openmessaging/pom.xml
+++ b/openmessaging/pom.xml
@@ -27,11 +27,22 @@
4.0.0
rocketmq-openmessaging
+ rocketmq-openmessaging ${project.version}
io.openmessaging
- messaging-user-level-api
+ openmessaging-api
+
+
+ org.apache.rocketmq
+ rocketmq-client
+
+
+ javax.jms
+ javax.jms-api
+ 2.0.1
+ test
\ No newline at end of file
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
index 2f75686fc8c91664e04a3213935d57a7efa8bd64..c30b7d41f313613b2549c5b5fffa78c424c635c5 100644
--- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java
@@ -26,26 +26,39 @@ import io.openmessaging.ResourceManager;
import io.openmessaging.SequenceProducer;
import io.openmessaging.ServiceEndPoint;
import io.openmessaging.observer.Observer;
+import io.openmessaging.rocketmq.producer.ProducerImpl;
+import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
+ private final KeyValue accessPointProperties;
+
+ public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
+ this.accessPointProperties = accessPointProperties;
+ }
+
+ @Override
+ public KeyValue properties() {
+ return accessPointProperties;
+ }
+
@Override
public Producer createProducer() {
- return null;
+ return new ProducerImpl(this.accessPointProperties);
}
@Override
public Producer createProducer(KeyValue properties) {
- return null;
+ return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public SequenceProducer createSequenceProducer() {
- return null;
+ return new SequenceProducerImpl(this.accessPointProperties);
}
@Override
public SequenceProducer createSequenceProducer(KeyValue properties) {
- return null;
+ return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
@@ -79,7 +92,7 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
- public ResourceManager createResourceManager() {
+ public ResourceManager getResourceManager() {
return null;
}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..061ee6b955abc16efcd6e437e0a30449ed043490
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.OMS;
+import io.openmessaging.SendResult;
+import io.openmessaging.rocketmq.domain.SendResultImpl;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
+
+public class OMSUtil {
+
+ /**
+ * Builds a OMS client instance name.
+ *
+ * @return a unique instance name
+ */
+ public static String buildInstanceName() {
+ return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
+ }
+
+ public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
+ org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
+ rmqMessage.setBody(omsMessage.getBody());
+
+ KeyValue headers = omsMessage.headers();
+ KeyValue properties = omsMessage.properties();
+
+ //All destinations in RocketMQ use Topic
+ rmqMessage.setTopic(headers.containsKey(MessageHeader.TOPIC)
+ ? headers.getString(MessageHeader.TOPIC) : headers.getString(MessageHeader.QUEUE));
+
+ for (String key : properties.keySet()) {
+ MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
+ }
+
+ //Headers has a high priority
+ for (String key : headers.keySet()) {
+ MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
+ }
+
+ return rmqMessage;
+ }
+
+ /**
+ * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
+ */
+ public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
+ assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
+ return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
+ }
+
+ public static KeyValue buildKeyValue(KeyValue ... keyValues) {
+ KeyValue keyValue = OMS.newKeyValue();
+ for (KeyValue properties : keyValues) {
+ for (String key : properties.keySet()) {
+ keyValue.put(key, properties.getString(key));
+ }
+ }
+ return keyValue;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..8140fe2ecaf9880b31d07021ee1004c0bc1f6b4d
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.OMS;
+
+public class BytesMessageImpl implements BytesMessage {
+ private KeyValue headers;
+ private KeyValue properties;
+ private byte[] body;
+
+ public BytesMessageImpl() {
+ this.headers = OMS.newKeyValue();
+ this.properties = OMS.newKeyValue();
+ }
+
+ @Override
+ public byte[] getBody() {
+ return body;
+ }
+
+ @Override
+ public BytesMessage setBody(final byte[] body) {
+ this.body = body;
+ return this;
+ }
+
+ @Override
+ public KeyValue headers() {
+ return headers;
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final int value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final long value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final double value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putHeaders(final String key, final String value) {
+ headers.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final int value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final long value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final double value) {
+ properties.put(key, value);
+ return this;
+ }
+
+ @Override
+ public Message putProperties(final String key, final String value) {
+ properties.put(key, value);
+ return this;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
new file mode 100644
index 0000000000000000000000000000000000000000..cf83cfdc96921c7f7efcd5e693b8da9e59c8523c
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+public class NonStandardKeys {
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..228a9f0b5956dba2fd039e55f330395dde8325dc
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.domain;
+
+import io.openmessaging.KeyValue;
+import io.openmessaging.SendResult;
+
+public class SendResultImpl implements SendResult {
+ private String messageId;
+ private KeyValue properties;
+
+ public SendResultImpl(final String messageId, final KeyValue properties) {
+ this.messageId = messageId;
+ this.properties = properties;
+ }
+
+ @Override
+ public String messageId() {
+ return messageId;
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..9eb735d6188b45dd188022b34b0fb2e5e0e3fe62
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageFactory;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.ServiceLifecycle;
+import io.openmessaging.exception.OMSMessageFormatException;
+import io.openmessaging.exception.OMSNotSupportedException;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.exception.OMSTimeOutException;
+import io.openmessaging.rocketmq.domain.BytesMessageImpl;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.slf4j.Logger;
+
+import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName;
+
+abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
+ final static Logger log = ClientLogger.getLog();
+ final KeyValue properties;
+ final DefaultMQProducer rocketmqProducer;
+ private boolean started = false;
+
+ AbstractOMSProducer(final KeyValue properties) {
+ this.properties = properties;
+ this.rocketmqProducer = new DefaultMQProducer();
+
+ String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS);
+
+ if (accessPoints == null || accessPoints.isEmpty()) {
+ throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
+ }
+ String producerId = buildInstanceName();
+
+ int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
+
+ this.rocketmqProducer.setProducerGroup(producerId);
+ this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout);
+ this.rocketmqProducer.setInstanceName(producerId);
+ this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
+ this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
+
+ properties.put(PropertyKeys.PRODUCER_ID, producerId);
+ }
+
+ @Override
+ public synchronized void startup() {
+ if (!started) {
+ try {
+ this.rocketmqProducer.start();
+ } catch (MQClientException e) {
+ throw new OMSRuntimeException("-1", e);
+ }
+ }
+ this.started = true;
+ }
+
+ @Override
+ public synchronized void shutdown() {
+ if (this.started) {
+ this.rocketmqProducer.shutdown();
+ }
+ this.started = false;
+ }
+
+ OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
+ if (e instanceof MQClientException) {
+ if (e.getCause() != null) {
+ if (e.getCause() instanceof RemotingTimeoutException) {
+ return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
+ this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
+ } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
+ MQBrokerException brokerException = (MQBrokerException) e.getCause();
+ return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
+ topic, msgId, brokerException.getErrorMessage()), e);
+ }
+ }
+ // Exception thrown by local.
+ else {
+ MQClientException clientException = (MQClientException) e;
+ if (-1 == clientException.getResponseCode()) {
+ return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
+ topic, msgId), e);
+ } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
+ return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
+ topic, msgId), e);
+ }
+ }
+ }
+ return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
+ }
+
+ protected void checkMessageType(Message message) {
+ if (!(message instanceof BytesMessage)) {
+ throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
+ }
+ }
+
+ @Override
+ public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
+ BytesMessage bytesMessage = new BytesMessageImpl();
+ bytesMessage.setBody(body);
+ bytesMessage.headers().put(MessageHeader.TOPIC, topic);
+ return bytesMessage;
+ }
+
+ @Override
+ public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
+ BytesMessage bytesMessage = new BytesMessageImpl();
+ bytesMessage.setBody(body);
+ bytesMessage.headers().put(MessageHeader.QUEUE, queue);
+ return bytesMessage;
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..f5d2f2558ad7907d7de2a764bbfa7005dba050c8
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.Producer;
+import io.openmessaging.Promise;
+import io.openmessaging.PropertyKeys;
+import io.openmessaging.SendResult;
+import io.openmessaging.exception.OMSRuntimeException;
+import io.openmessaging.rocketmq.OMSUtil;
+import io.openmessaging.rocketmq.promise.DefaultPromise;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendStatus;
+
+import static io.openmessaging.rocketmq.OMSUtil.msgConvert;
+
+public class ProducerImpl extends AbstractOMSProducer implements Producer {
+
+ public ProducerImpl(final KeyValue properties) {
+ super(properties);
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public SendResult send(final Message message) {
+ return send(message, this.rocketmqProducer.getSendMsgTimeout());
+ }
+
+ @Override
+ public SendResult send(final Message message, final KeyValue properties) {
+ long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+ ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+ return send(message, timeout);
+ }
+
+ private SendResult send(final Message message, long timeout) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ try {
+ org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
+ if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
+ log.error(String.format("Send message to RocketMQ failed, %s", message));
+ throw new OMSRuntimeException("-1", "Send message to RocketMQ failed.");
+ }
+ message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+ return OMSUtil.sendResultConvert(rmqResult);
+ } catch (Exception e) {
+ log.error(String.format("Send message to RocketMQ failed, %s", message), e);
+ throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+ }
+ }
+
+ @Override
+ public Promise sendAsync(final Message message) {
+ return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
+ }
+
+ @Override
+ public Promise sendAsync(final Message message, final KeyValue properties) {
+ long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
+ ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
+ return sendAsync(message, timeout);
+ }
+
+ private Promise sendAsync(final Message message, long timeout) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ final Promise promise = new DefaultPromise<>();
+ try {
+ this.rocketmqProducer.send(rmqMessage, new SendCallback() {
+ @Override
+ public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
+ message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
+ promise.set(OMSUtil.sendResultConvert(rmqResult));
+ }
+
+ @Override
+ public void onException(final Throwable e) {
+ promise.setFailure(e);
+ }
+ }, timeout);
+ } catch (Exception e) {
+ promise.setFailure(e);
+ }
+ return promise;
+ }
+
+ @Override
+ public void sendOneway(final Message message) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
+ try {
+ this.rocketmqProducer.sendOneway(rmqMessage);
+ } catch (Exception ignore) { //Ignore the oneway exception.
+ }
+ }
+
+ @Override
+ public void sendOneway(final Message message, final KeyValue properties) {
+ sendOneway(message);
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
new file mode 100644
index 0000000000000000000000000000000000000000..89ece2bd65ee0f4eee29dd3c809eef8135d0e196
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.producer;
+
+import io.openmessaging.BytesMessage;
+import io.openmessaging.KeyValue;
+import io.openmessaging.Message;
+import io.openmessaging.MessageHeader;
+import io.openmessaging.SequenceProducer;
+import io.openmessaging.rocketmq.OMSUtil;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.rocketmq.client.Validators;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.SendResult;
+
+public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
+
+ private BlockingQueue msgCacheQueue;
+
+ public SequenceProducerImpl(final KeyValue properties) {
+ super(properties);
+ this.msgCacheQueue = new LinkedBlockingQueue<>();
+ }
+
+ @Override
+ public KeyValue properties() {
+ return properties;
+ }
+
+ @Override
+ public void send(final Message message) {
+ checkMessageType(message);
+ org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
+ try {
+ Validators.checkMessage(rmqMessage, this.rocketmqProducer);
+ } catch (MQClientException e) {
+ throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
+ }
+ msgCacheQueue.add(message);
+ }
+
+ @Override
+ public void send(final Message message, final KeyValue properties) {
+ send(message);
+ }
+
+ @Override
+ public synchronized void commit() {
+ List messages = new ArrayList<>();
+ msgCacheQueue.drainTo(messages);
+
+ List rmqMessages = new ArrayList<>();
+
+ for (Message message : messages) {
+ rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
+ }
+
+ try {
+ SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
+ String [] msgIdArray = sendResult.getMsgId().split(",");
+ for (int i = 0; i < messages.size(); i++) {
+ Message message = messages.get(i);
+ message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
+ }
+ } catch (Exception e) {
+ throw checkProducerException("", "", e);
+ }
+ }
+
+ @Override
+ public synchronized void rollback() {
+ msgCacheQueue.clear();
+ }
+}
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
new file mode 100644
index 0000000000000000000000000000000000000000..43f96ce85f6b47df7b3a500fa1c4503e7f456d3c
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.openmessaging.rocketmq.promise;
+
+import io.openmessaging.Promise;
+import io.openmessaging.PromiseListener;
+import io.openmessaging.exception.OMSRuntimeException;
+import java.util.ArrayList;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultPromise implements Promise {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
+ private final Object lock = new Object();
+ private volatile FutureState state = FutureState.DOING;
+ private V result = null;
+ private long timeout;
+ private long createTime;
+ private Throwable exception = null;
+ private List promiseListenerList;
+
+ public DefaultPromise() {
+ createTime = System.currentTimeMillis();
+ promiseListenerList = new ArrayList<>();
+ timeout = 5000;
+ }
+
+ @Override
+ public boolean cancel(final boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return state.isCancelledState();
+ }
+
+ @Override
+ public boolean isDone() {
+ return state.isDoneState();
+ }
+
+ @Override
+ public V get() {
+ return result;
+ }
+
+ @Override
+ public V get(final long timeout) {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return getValueOrThrowable();
+ }
+
+ if (timeout <= 0) {
+ try {
+ lock.wait();
+ } catch (Exception e) {
+ cancel(e);
+ }
+ return getValueOrThrowable();
+ } else {
+ long waitTime = timeout - (System.currentTimeMillis() - createTime);
+ if (waitTime > 0) {
+ for (; ; ) {
+ try {
+ lock.wait(waitTime);
+ } catch (InterruptedException e) {
+ LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
+ }
+
+ if (!isDoing()) {
+ break;
+ } else {
+ waitTime = timeout - (System.currentTimeMillis() - createTime);
+ if (waitTime <= 0) {
+ break;
+ }
+ }
+ }
+ }
+
+ if (isDoing()) {
+ timeoutSoCancel();
+ }
+ }
+ return getValueOrThrowable();
+ }
+ }
+
+ @Override
+ public boolean set(final V value) {
+ if (value == null)
+ return false;
+ this.result = value;
+ return done();
+ }
+
+ @Override
+ public boolean setFailure(final Throwable cause) {
+ if (cause == null)
+ return false;
+ this.exception = cause;
+ return done();
+ }
+
+ @Override
+ public void addListener(final PromiseListener listener) {
+ if (listener == null) {
+ throw new NullPointerException("FutureListener is null");
+ }
+
+ boolean notifyNow = false;
+ synchronized (lock) {
+ if (!isDoing()) {
+ notifyNow = true;
+ } else {
+ if (promiseListenerList == null) {
+ promiseListenerList = new ArrayList<>();
+ }
+ promiseListenerList.add(listener);
+ }
+ }
+
+ if (notifyNow) {
+ notifyListener(listener);
+ }
+ }
+
+ @Override
+ public Throwable getThrowable() {
+ return exception;
+ }
+
+ private void notifyListeners() {
+ if (promiseListenerList != null) {
+ for (PromiseListener listener : promiseListenerList) {
+ notifyListener(listener);
+ }
+ }
+ }
+
+ private boolean isSuccess() {
+ return isDone() && (exception == null);
+ }
+
+ private void timeoutSoCancel() {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return;
+ }
+ state = FutureState.CANCELLED;
+ exception = new RuntimeException("get request result is timeout or interrupted");
+ lock.notifyAll();
+ }
+ notifyListeners();
+ }
+
+ private V getValueOrThrowable() {
+ if (exception != null) {
+ Throwable e = exception.getCause() != null ? exception.getCause() : exception;
+ throw new OMSRuntimeException("-1", e);
+ }
+ notifyListeners();
+ return result;
+ }
+
+ private boolean isDoing() {
+ return state.isDoingState();
+ }
+
+ private boolean done() {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return false;
+ }
+
+ state = FutureState.DONE;
+ lock.notifyAll();
+ }
+
+ notifyListeners();
+ return true;
+ }
+
+ private void notifyListener(final PromiseListener listener) {
+ try {
+ if (exception != null)
+ listener.operationFailed(this);
+ else
+ listener.operationCompleted(this);
+ } catch (Throwable t) {
+ LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
+ }
+ }
+
+ private boolean cancel(Exception e) {
+ synchronized (lock) {
+ if (!isDoing()) {
+ return false;
+ }
+
+ state = FutureState.CANCELLED;
+ exception = e;
+ lock.notifyAll();
+ }
+
+ notifyListeners();
+ return true;
+ }
+}
+
diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
new file mode 100644
index 0000000000000000000000000000000000000000..9e2f69c2d9ee921aad72a84a35f9972b25f80fcc
--- /dev/null
+++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.openmessaging.rocketmq.promise;
+
+public enum FutureState {
+ /** the task is doing **/
+ DOING(0),
+ /** the task is done **/
+ DONE(1),
+ /** ths task is cancelled **/
+ CANCELLED(2);
+
+ public final int value;
+
+ private FutureState(int value) {
+ this.value = value;
+ }
+
+ public boolean isCancelledState() {
+ return this == CANCELLED;
+ }
+
+ public boolean isDoneState() {
+ return this == DONE;
+ }
+
+ public boolean isDoingState() {
+ return this == DOING;
+ }
+}
diff --git a/pom.xml b/pom.xml
index cf6ec9b2740837e2f96d8b7578a47736b8f0b2e4..865e9f9a0f5a14d561600e8c30b0ce3e9a457666 100644
--- a/pom.xml
+++ b/pom.xml
@@ -606,8 +606,8 @@
io.openmessaging
- messaging-user-level-api
- 1.0.0-SNAPSHOT
+ openmessaging-api
+ 0.1.0-beta