From ce14693419a21279f2f71c89be7d543215e66728 Mon Sep 17 00:00:00 2001 From: yukon Date: Tue, 11 Apr 2017 11:11:29 +0800 Subject: [PATCH] Add Producer related implementation for OpenMessaging. --- example/pom.xml | 9 + .../example/openmessaging/SimpleProducer.java | 51 ++++ openmessaging/pom.xml | 13 +- .../rocketmq/MessagingAccessPointImpl.java | 23 +- .../io/openmessaging/rocketmq/OMSUtil.java | 80 ++++++ .../rocketmq/domain/BytesMessageImpl.java | 102 ++++++++ .../rocketmq/domain/NonStandardKeys.java | 20 ++ .../rocketmq/domain/SendResultImpl.java | 40 +++ .../producer/AbstractOMSProducer.java | 138 +++++++++++ .../rocketmq/producer/ProducerImpl.java | 124 ++++++++++ .../producer/SequenceProducerImpl.java | 91 +++++++ .../rocketmq/promise/DefaultPromise.java | 227 ++++++++++++++++++ .../rocketmq/promise/FutureState.java | 45 ++++ pom.xml | 4 +- 14 files changed, 959 insertions(+), 8 deletions(-) create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java diff --git a/example/pom.xml b/example/pom.xml index 785a4ca8..840fa36f 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -48,5 +48,14 @@ org.javassist javassist + + io.openmessaging + openmessaging-api + + + org.apache.rocketmq + rocketmq-openmessaging + 4.1.0-incubating-SNAPSHOT + diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index 3b71849e..5a27f5a0 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -16,8 +16,59 @@ */ package org.apache.rocketmq.example.openmessaging; +import io.openmessaging.Message; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.Producer; +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.SendResult; +import java.nio.charset.Charset; + public class SimpleProducer { public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); + + final Producer producer = messagingAccessPoint.createProducer(); + + messagingAccessPoint.startup(); + System.out.println("messagingAccessPoint startup OK"); + + producer.startup(); + System.out.println("producer startup OK"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + producer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + { + Message message = producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + SendResult sendResult = producer.send(message); + //final Void aVoid = result.get(3000L); + System.out.println("send async message OK, msgId: " + sendResult.messageId()); + } + + { + final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new PromiseListener() { + @Override public void operationCompleted(Promise promise) { + System.out.println("Send async message OK, msgId: " + promise.get().messageId()); + } + + @Override public void operationFailed(Promise promise) { + System.out.println("send async message Failed, error: " + promise.getThrowable().getMessage()); + } + }); + } + { + producer.sendOneway(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + System.out.println("Send oneway message OK"); + } } } diff --git a/openmessaging/pom.xml b/openmessaging/pom.xml index d5686504..d6498127 100644 --- a/openmessaging/pom.xml +++ b/openmessaging/pom.xml @@ -27,11 +27,22 @@ 4.0.0 rocketmq-openmessaging + rocketmq-openmessaging ${project.version} io.openmessaging - messaging-user-level-api + openmessaging-api + + + org.apache.rocketmq + rocketmq-client + + + javax.jms + javax.jms-api + 2.0.1 + test \ No newline at end of file diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index 2f75686f..c30b7d41 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -26,26 +26,39 @@ import io.openmessaging.ResourceManager; import io.openmessaging.SequenceProducer; import io.openmessaging.ServiceEndPoint; import io.openmessaging.observer.Observer; +import io.openmessaging.rocketmq.producer.ProducerImpl; +import io.openmessaging.rocketmq.producer.SequenceProducerImpl; public class MessagingAccessPointImpl implements MessagingAccessPoint { + private final KeyValue accessPointProperties; + + public MessagingAccessPointImpl(final KeyValue accessPointProperties) { + this.accessPointProperties = accessPointProperties; + } + + @Override + public KeyValue properties() { + return accessPointProperties; + } + @Override public Producer createProducer() { - return null; + return new ProducerImpl(this.accessPointProperties); } @Override public Producer createProducer(KeyValue properties) { - return null; + return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } @Override public SequenceProducer createSequenceProducer() { - return null; + return new SequenceProducerImpl(this.accessPointProperties); } @Override public SequenceProducer createSequenceProducer(KeyValue properties) { - return null; + return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } @Override @@ -79,7 +92,7 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override - public ResourceManager createResourceManager() { + public ResourceManager getResourceManager() { return null; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java new file mode 100644 index 00000000..061ee6b9 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageHeader; +import io.openmessaging.OMS; +import io.openmessaging.SendResult; +import io.openmessaging.rocketmq.domain.SendResultImpl; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.message.MessageAccessor; + +public class OMSUtil { + + /** + * Builds a OMS client instance name. + * + * @return a unique instance name + */ + public static String buildInstanceName() { + return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); + } + + public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { + org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); + rmqMessage.setBody(omsMessage.getBody()); + + KeyValue headers = omsMessage.headers(); + KeyValue properties = omsMessage.properties(); + + //All destinations in RocketMQ use Topic + rmqMessage.setTopic(headers.containsKey(MessageHeader.TOPIC) + ? headers.getString(MessageHeader.TOPIC) : headers.getString(MessageHeader.QUEUE)); + + for (String key : properties.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); + } + + //Headers has a high priority + for (String key : headers.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); + } + + return rmqMessage; + } + + /** + * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. + */ + public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) { + assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK); + return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue()); + } + + public static KeyValue buildKeyValue(KeyValue ... keyValues) { + KeyValue keyValue = OMS.newKeyValue(); + for (KeyValue properties : keyValues) { + for (String key : properties.keySet()) { + keyValue.put(key, properties.getString(key)); + } + } + return keyValue; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java new file mode 100644 index 00000000..8140fe2e --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.OMS; + +public class BytesMessageImpl implements BytesMessage { + private KeyValue headers; + private KeyValue properties; + private byte[] body; + + public BytesMessageImpl() { + this.headers = OMS.newKeyValue(); + this.properties = OMS.newKeyValue(); + } + + @Override + public byte[] getBody() { + return body; + } + + @Override + public BytesMessage setBody(final byte[] body) { + this.body = body; + return this; + } + + @Override + public KeyValue headers() { + return headers; + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public Message putHeaders(final String key, final int value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final long value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final double value) { + headers.put(key, value); + return this; + } + + @Override + public Message putHeaders(final String key, final String value) { + headers.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final int value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final long value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final double value) { + properties.put(key, value); + return this; + } + + @Override + public Message putProperties(final String key, final String value) { + properties.put(key, value); + return this; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java new file mode 100644 index 00000000..cf83cfdc --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +public class NonStandardKeys { +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java new file mode 100644 index 00000000..228a9f0b --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.KeyValue; +import io.openmessaging.SendResult; + +public class SendResultImpl implements SendResult { + private String messageId; + private KeyValue properties; + + public SendResultImpl(final String messageId, final KeyValue properties) { + this.messageId = messageId; + this.properties = properties; + } + + @Override + public String messageId() { + return messageId; + } + + @Override + public KeyValue properties() { + return properties; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java new file mode 100644 index 00000000..9eb735d6 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageFactory; +import io.openmessaging.MessageHeader; +import io.openmessaging.PropertyKeys; +import io.openmessaging.ServiceLifecycle; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.exception.OMSTimeOutException; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.slf4j.Logger; + +import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName; + +abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ + final static Logger log = ClientLogger.getLog(); + final KeyValue properties; + final DefaultMQProducer rocketmqProducer; + private boolean started = false; + + AbstractOMSProducer(final KeyValue properties) { + this.properties = properties; + this.rocketmqProducer = new DefaultMQProducer(); + + String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + String producerId = buildInstanceName(); + + int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + + this.rocketmqProducer.setProducerGroup(producerId); + this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout); + this.rocketmqProducer.setInstanceName(producerId); + this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); + this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); + + properties.put(PropertyKeys.PRODUCER_ID, producerId); + } + + @Override + public synchronized void startup() { + if (!started) { + try { + this.rocketmqProducer.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.rocketmqProducer.shutdown(); + } + this.started = false; + } + + OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) { + if (e instanceof MQClientException) { + if (e.getCause() != null) { + if (e.getCause() instanceof RemotingTimeoutException) { + return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", + this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); + } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { + MQBrokerException brokerException = (MQBrokerException) e.getCause(); + return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", + topic, msgId, brokerException.getErrorMessage()), e); + } + } + // Exception thrown by local. + else { + MQClientException clientException = (MQClientException) e; + if (-1 == clientException.getResponseCode()) { + return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s", + topic, msgId), e); + } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) { + return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s", + topic, msgId), e); + } + } + } + return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e); + } + + protected void checkMessageType(Message message) { + if (!(message instanceof BytesMessage)) { + throw new OMSNotSupportedException("-1", "Only BytesMessage is supported."); + } + } + + @Override + public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) { + BytesMessage bytesMessage = new BytesMessageImpl(); + bytesMessage.setBody(body); + bytesMessage.headers().put(MessageHeader.TOPIC, topic); + return bytesMessage; + } + + @Override + public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) { + BytesMessage bytesMessage = new BytesMessageImpl(); + bytesMessage.setBody(body); + bytesMessage.headers().put(MessageHeader.QUEUE, queue); + return bytesMessage; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java new file mode 100644 index 00000000..f5d2f255 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.Producer; +import io.openmessaging.Promise; +import io.openmessaging.PropertyKeys; +import io.openmessaging.SendResult; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.promise.DefaultPromise; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendStatus; + +import static io.openmessaging.rocketmq.OMSUtil.msgConvert; + +public class ProducerImpl extends AbstractOMSProducer implements Producer { + + public ProducerImpl(final KeyValue properties) { + super(properties); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public SendResult send(final Message message) { + return send(message, this.rocketmqProducer.getSendMsgTimeout()); + } + + @Override + public SendResult send(final Message message, final KeyValue properties) { + long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) + ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + return send(message, timeout); + } + + private SendResult send(final Message message, long timeout) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + try { + org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); + if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { + log.error(String.format("Send message to RocketMQ failed, %s", message)); + throw new OMSRuntimeException("-1", "Send message to RocketMQ failed."); + } + message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + return OMSUtil.sendResultConvert(rmqResult); + } catch (Exception e) { + log.error(String.format("Send message to RocketMQ failed, %s", message), e); + throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + } + } + + @Override + public Promise sendAsync(final Message message) { + return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); + } + + @Override + public Promise sendAsync(final Message message, final KeyValue properties) { + long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) + ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + return sendAsync(message, timeout); + } + + private Promise sendAsync(final Message message, long timeout) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + final Promise promise = new DefaultPromise<>(); + try { + this.rocketmqProducer.send(rmqMessage, new SendCallback() { + @Override + public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { + message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + promise.set(OMSUtil.sendResultConvert(rmqResult)); + } + + @Override + public void onException(final Throwable e) { + promise.setFailure(e); + } + }, timeout); + } catch (Exception e) { + promise.setFailure(e); + } + return promise; + } + + @Override + public void sendOneway(final Message message) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + try { + this.rocketmqProducer.sendOneway(rmqMessage); + } catch (Exception ignore) { //Ignore the oneway exception. + } + } + + @Override + public void sendOneway(final Message message, final KeyValue properties) { + sendOneway(message); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java new file mode 100644 index 00000000..89ece2bd --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.producer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.SequenceProducer; +import io.openmessaging.rocketmq.OMSUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.rocketmq.client.Validators; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.SendResult; + +public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer { + + private BlockingQueue msgCacheQueue; + + public SequenceProducerImpl(final KeyValue properties) { + super(properties); + this.msgCacheQueue = new LinkedBlockingQueue<>(); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public void send(final Message message) { + checkMessageType(message); + org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); + try { + Validators.checkMessage(rmqMessage, this.rocketmqProducer); + } catch (MQClientException e) { + throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + } + msgCacheQueue.add(message); + } + + @Override + public void send(final Message message, final KeyValue properties) { + send(message); + } + + @Override + public synchronized void commit() { + List messages = new ArrayList<>(); + msgCacheQueue.drainTo(messages); + + List rmqMessages = new ArrayList<>(); + + for (Message message : messages) { + rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); + } + + try { + SendResult sendResult = this.rocketmqProducer.send(rmqMessages); + String [] msgIdArray = sendResult.getMsgId().split(","); + for (int i = 0; i < messages.size(); i++) { + Message message = messages.get(i); + message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); + } + } catch (Exception e) { + throw checkProducerException("", "", e); + } + } + + @Override + public synchronized void rollback() { + msgCacheQueue.clear(); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java new file mode 100644 index 00000000..43f96ce8 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.promise; + +import io.openmessaging.Promise; +import io.openmessaging.PromiseListener; +import io.openmessaging.exception.OMSRuntimeException; +import java.util.ArrayList; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultPromise implements Promise { + private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class); + private final Object lock = new Object(); + private volatile FutureState state = FutureState.DOING; + private V result = null; + private long timeout; + private long createTime; + private Throwable exception = null; + private List promiseListenerList; + + public DefaultPromise() { + createTime = System.currentTimeMillis(); + promiseListenerList = new ArrayList<>(); + timeout = 5000; + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return state.isCancelledState(); + } + + @Override + public boolean isDone() { + return state.isDoneState(); + } + + @Override + public V get() { + return result; + } + + @Override + public V get(final long timeout) { + synchronized (lock) { + if (!isDoing()) { + return getValueOrThrowable(); + } + + if (timeout <= 0) { + try { + lock.wait(); + } catch (Exception e) { + cancel(e); + } + return getValueOrThrowable(); + } else { + long waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime > 0) { + for (; ; ) { + try { + lock.wait(waitTime); + } catch (InterruptedException e) { + LOG.error("promise get value interrupted,excepiton:{}", e.getMessage()); + } + + if (!isDoing()) { + break; + } else { + waitTime = timeout - (System.currentTimeMillis() - createTime); + if (waitTime <= 0) { + break; + } + } + } + } + + if (isDoing()) { + timeoutSoCancel(); + } + } + return getValueOrThrowable(); + } + } + + @Override + public boolean set(final V value) { + if (value == null) + return false; + this.result = value; + return done(); + } + + @Override + public boolean setFailure(final Throwable cause) { + if (cause == null) + return false; + this.exception = cause; + return done(); + } + + @Override + public void addListener(final PromiseListener listener) { + if (listener == null) { + throw new NullPointerException("FutureListener is null"); + } + + boolean notifyNow = false; + synchronized (lock) { + if (!isDoing()) { + notifyNow = true; + } else { + if (promiseListenerList == null) { + promiseListenerList = new ArrayList<>(); + } + promiseListenerList.add(listener); + } + } + + if (notifyNow) { + notifyListener(listener); + } + } + + @Override + public Throwable getThrowable() { + return exception; + } + + private void notifyListeners() { + if (promiseListenerList != null) { + for (PromiseListener listener : promiseListenerList) { + notifyListener(listener); + } + } + } + + private boolean isSuccess() { + return isDone() && (exception == null); + } + + private void timeoutSoCancel() { + synchronized (lock) { + if (!isDoing()) { + return; + } + state = FutureState.CANCELLED; + exception = new RuntimeException("get request result is timeout or interrupted"); + lock.notifyAll(); + } + notifyListeners(); + } + + private V getValueOrThrowable() { + if (exception != null) { + Throwable e = exception.getCause() != null ? exception.getCause() : exception; + throw new OMSRuntimeException("-1", e); + } + notifyListeners(); + return result; + } + + private boolean isDoing() { + return state.isDoingState(); + } + + private boolean done() { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.DONE; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } + + private void notifyListener(final PromiseListener listener) { + try { + if (exception != null) + listener.operationFailed(this); + else + listener.operationCompleted(this); + } catch (Throwable t) { + LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); + } + } + + private boolean cancel(Exception e) { + synchronized (lock) { + if (!isDoing()) { + return false; + } + + state = FutureState.CANCELLED; + exception = e; + lock.notifyAll(); + } + + notifyListeners(); + return true; + } +} + diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java new file mode 100644 index 00000000..9e2f69c2 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/FutureState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.rocketmq.promise; + +public enum FutureState { + /** the task is doing **/ + DOING(0), + /** the task is done **/ + DONE(1), + /** ths task is cancelled **/ + CANCELLED(2); + + public final int value; + + private FutureState(int value) { + this.value = value; + } + + public boolean isCancelledState() { + return this == CANCELLED; + } + + public boolean isDoneState() { + return this == DONE; + } + + public boolean isDoingState() { + return this == DOING; + } +} diff --git a/pom.xml b/pom.xml index cf6ec9b2..865e9f9a 100644 --- a/pom.xml +++ b/pom.xml @@ -606,8 +606,8 @@ io.openmessaging - messaging-user-level-api - 1.0.0-SNAPSHOT + openmessaging-api + 0.1.0-beta -- GitLab