diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index dbe1d10568804e684e4098d94d7f107ce4496e34..3a780b09c74b7a48deef8908673b0d8658ef2b99 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.example.openmessaging; import io.openmessaging.Future; import io.openmessaging.FutureListener; -import io.openmessaging.Message; +import io.openmessaging.message.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.producer.Producer; @@ -32,15 +32,11 @@ public class SimpleProducer { OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final Producer producer = messagingAccessPoint.createProducer(); - - messagingAccessPoint.startup(); - System.out.printf("MessagingAccessPoint startup OK%n"); - - producer.startup(); + producer.start(); System.out.printf("Producer startup OK%n"); { - Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + Message message = producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); //final Void aVoid = result.get(3000L); System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); @@ -48,7 +44,7 @@ public class SimpleProducer { final CountDownLatch countDownLatch = new CountDownLatch(1); { - final Future result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + final Future result = producer.sendAsync(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new FutureListener() { @Override public void operationComplete(Future future) { @@ -63,7 +59,7 @@ public class SimpleProducer { } { - producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + producer.sendOneway(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.printf("Send oneway message OK%n"); } @@ -73,6 +69,6 @@ public class SimpleProducer { } catch (InterruptedException ignore) { } - producer.shutdown(); + producer.stop(); } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index 51388f9c610e101c64242bc59ffd1babc0b87dd9..0655fa5ec19ac57d666f9ad0253aac3077dfe83b 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -18,16 +18,13 @@ package io.openmessaging.rocketmq; import io.openmessaging.KeyValue; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.ResourceManager; -import io.openmessaging.consumer.PullConsumer; -import io.openmessaging.consumer.PushConsumer; -import io.openmessaging.consumer.StreamingConsumer; -import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.consumer.Consumer; +import io.openmessaging.exception.OMSUnsupportException; +import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; -import io.openmessaging.rocketmq.consumer.PullConsumerImpl; -import io.openmessaging.rocketmq.consumer.PushConsumerImpl; +import io.openmessaging.producer.TransactionStateCheckListener; import io.openmessaging.rocketmq.producer.ProducerImpl; -import io.openmessaging.rocketmq.utils.OMSUtil; public class MessagingAccessPointImpl implements MessagingAccessPoint { @@ -43,8 +40,8 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override - public String implVersion() { - return "0.3.0"; + public String version() { + return "1.0.0"; } @Override @@ -52,53 +49,20 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { return new ProducerImpl(this.accessPointProperties); } - @Override - public Producer createProducer(KeyValue properties) { - return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); - } - - @Override - public PushConsumer createPushConsumer() { - return new PushConsumerImpl(accessPointProperties); - } - - @Override - public PushConsumer createPushConsumer(KeyValue properties) { - return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); - } - - @Override - public PullConsumer createPullConsumer() { - return new PullConsumerImpl(accessPointProperties); - } - - @Override - public PullConsumer createPullConsumer(KeyValue attributes) { - return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes)); - } - - @Override - public StreamingConsumer createStreamingConsumer() { + @Override public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) { return null; } - @Override - public StreamingConsumer createStreamingConsumer(KeyValue attributes) { + @Override public Consumer createConsumer() { return null; } @Override public ResourceManager resourceManager() { - throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version."); + throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version."); } - @Override - public void startup() { - //Ignore - } - - @Override - public void shutdown() { - //Ignore + @Override public MessageFactory messageFactory() { + return null; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java index 6d8995a1564fe92e7efd85101048adc8b32c9e2b..f1405b25335862d02188be4b9ba2f8c8948dd430 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -16,98 +16,52 @@ */ package io.openmessaging.rocketmq.domain; -import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.Message; +import io.openmessaging.consumer.MessageReceipt; +import io.openmessaging.extension.ExtensionHeader; +import io.openmessaging.message.Header; +import io.openmessaging.message.Message; import io.openmessaging.OMS; -import io.openmessaging.exception.OMSMessageFormatException; -import org.apache.commons.lang3.builder.ToStringBuilder; +import java.util.Optional; -public class BytesMessageImpl implements BytesMessage { - private KeyValue sysHeaders; - private KeyValue userHeaders; - private byte[] body; +public class BytesMessageImpl implements Message { - public BytesMessageImpl() { - this.sysHeaders = OMS.newKeyValue(); - this.userHeaders = OMS.newKeyValue(); - } - - @Override - public T getBody(Class type) throws OMSMessageFormatException { - if (type == byte[].class) { - return (T)body; - } - - throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName()); - } + private Header sysHeaders; + private KeyValue userProperties; + private byte[] data; - @Override - public BytesMessage setBody(final byte[] body) { - this.body = body; - return this; + public BytesMessageImpl() { + this.sysHeaders = new MessageHeader(); + this.userProperties = OMS.newKeyValue(); } @Override - public KeyValue sysHeaders() { + public Header header() { return sysHeaders; } @Override - public KeyValue userHeaders() { - return userHeaders; - } - - @Override - public Message putSysHeaders(String key, int value) { - sysHeaders.put(key, value); - return this; - } - - @Override - public Message putSysHeaders(String key, long value) { - sysHeaders.put(key, value); - return this; - } - - @Override - public Message putSysHeaders(String key, double value) { - sysHeaders.put(key, value); - return this; - } - - @Override - public Message putSysHeaders(String key, String value) { - sysHeaders.put(key, value); - return this; - } - - @Override - public Message putUserHeaders(String key, int value) { - userHeaders.put(key, value); - return this; + public Optional extensionHeader() { + return null; } @Override - public Message putUserHeaders(String key, long value) { - userHeaders.put(key, value); - return this; + public KeyValue properties() { + return userProperties; } @Override - public Message putUserHeaders(String key, double value) { - userHeaders.put(key, value); - return this; + public byte[] getData() { + return this.data; } @Override - public Message putUserHeaders(String key, String value) { - userHeaders.put(key, value); - return this; + public void setData(byte[] data) { + this.data = data; } @Override - public String toString() { - return ToStringBuilder.reflectionToString(this); + public MessageReceipt getMessageReceipt() { + return null; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java new file mode 100644 index 0000000000000000000000000000000000000000..8dda492642d8ac10b1ba2315a3e7d5b84303f58f --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.message.Header; + +public class MessageHeader implements Header{ + + private String destination; + + private String messageId; + + private long bornTimestamp; + + private String bornHost; + + private short priority; + + private int deliveryCount; + + private short compression; + + private short durability; + + public MessageHeader() { + } + + @Override public Header setDestination(String destination) { + this.destination = destination; + return this; + } + + @Override public Header setMessageId(String messageId) { + this.messageId = messageId; + return this; + } + + @Override public Header setBornTimestamp(long bornTimestamp) { + this.bornTimestamp = bornTimestamp; + return this; + } + + @Override public Header setBornHost(String bornHost) { + this.bornHost = bornHost; + return this; + } + + @Override public Header setPriority(short priority) { + this.priority = priority; + return this; + } + + @Override public Header setDurability(short durability) { + this.durability = durability; + return this; + } + + @Override public Header setDeliveryCount(int deliveryCount) { + this.deliveryCount = deliveryCount; + return this; + } + + @Override public Header setCompression(short compression) { + this.compression = compression; + return this; + } + + @Override public String getDestination() { + return this.destination; + } + + @Override public String getMessageId() { + return this.messageId; + } + + @Override public long getBornTimestamp() { + return this.bornTimestamp; + } + + @Override public String getBornHost() { + return this.bornHost; + } + + @Override public short getPriority() { + return this.priority; + } + + @Override public short getDurability() { + return this.durability; + } + + @Override public int getDeliveryCount() { + return this.deliveryCount; + } + + @Override public short getCompression() { + return this.compression; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java index 3639a3f836b8b7446e1021740081ab360a4da79a..c8d7bb3617f6ca37c88a8bd684a66372fe1916c9 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -27,4 +27,6 @@ public interface NonStandardKeys { String MESSAGE_DESTINATION = "rmq.message.destination"; String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; + String PRODUCER_ID = "PRODUCER_ID"; + String CONSUMER_ID ="CONSUMER_ID"; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java index 2bebc8abfabcbdef984579cc2096570948fdad92..b4b4753a14c076112777bd545962ba26b54081f5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java @@ -23,4 +23,6 @@ public interface RocketMQConstants { */ String START_DELIVER_TIME = "__STARTDELIVERTIME"; + String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; + } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 3db859048f6d0eb056708427af36a2a00a6a3dc2..63034e396002097d9c82a2a9006d50b1f064582e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -16,18 +16,17 @@ */ package io.openmessaging.rocketmq.producer; -import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.Message; -import io.openmessaging.MessageFactory; -import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.message.Message; +import io.openmessaging.message.MessageFactory; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSMessageFormatException; -import io.openmessaging.exception.OMSNotSupportedException; +import io.openmessaging.exception.OMSUnsupportException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -56,7 +55,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) { String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { - throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty."); } this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); @@ -69,23 +68,23 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); this.rocketmqProducer.setLanguage(LanguageCode.OMS); - properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); + properties.put(NonStandardKeys.PRODUCER_ID, producerId); } @Override - public synchronized void startup() { + public synchronized void start() { if (!started) { try { this.rocketmqProducer.start(); } catch (MQClientException e) { - throw new OMSRuntimeException("-1", e); + throw new OMSRuntimeException(-1, e); } } this.started = true; } @Override - public synchronized void shutdown() { + public synchronized void stop() { if (this.started) { this.rocketmqProducer.shutdown(); } @@ -96,21 +95,20 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { if (e instanceof MQClientException) { if (e.getCause() != null) { if (e.getCause() instanceof RemotingTimeoutException) { - return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", + return new OMSTimeOutException(-1, String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { if (e.getCause() instanceof MQBrokerException) { MQBrokerException brokerException = (MQBrokerException) e.getCause(); - return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", + return new OMSRuntimeException(-1, String.format("Received a broker exception, Topic=%s, msgId=%s, %s", topic, msgId, brokerException.getErrorMessage()), e); } if (e.getCause() instanceof RemotingConnectException) { RemotingConnectException connectException = (RemotingConnectException)e.getCause(); - return new OMSRuntimeException("-1", + return new OMSRuntimeException(-1, String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s", - topic, msgId, connectException.getMessage()), - e); + topic, msgId, connectException.getMessage()), e); } } } @@ -118,28 +116,21 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { else { MQClientException clientException = (MQClientException) e; if (-1 == clientException.getResponseCode()) { - return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s", + return new OMSRuntimeException(-1, String.format("Topic does not exist, Topic=%s, msgId=%s", topic, msgId), e); } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) { - return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s", + return new OMSMessageFormatException(-1, String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s", topic, msgId), e); } } } - return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e); + return new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.", e); } protected void checkMessageType(Message message) { - if (!(message instanceof BytesMessage)) { - throw new OMSNotSupportedException("-1", "Only BytesMessage is supported."); + if (!(message instanceof BytesMessageImpl)) { + throw new OMSUnsupportException(-1, "Only BytesMessage is supported."); } } - @Override - public BytesMessage createBytesMessage(String queue, byte[] body) { - BytesMessage message = new BytesMessageImpl(); - message.setBody(body); - message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue); - return message; - } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index c2b6d3e3c772c3ecef82e7ec25a4b5085fc66188..d3acce2219f670472130902fc281b042064da420 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -16,18 +16,24 @@ */ package io.openmessaging.rocketmq.producer; -import io.openmessaging.BytesMessage; +import io.openmessaging.Future; import io.openmessaging.KeyValue; -import io.openmessaging.Message; +import io.openmessaging.ServiceLifeState; +import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.extension.Extension; +import io.openmessaging.extension.QueueMetaData; +import io.openmessaging.message.Message; import io.openmessaging.Promise; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.interceptor.ProducerInterceptor; -import io.openmessaging.producer.BatchMessageSender; -import io.openmessaging.producer.LocalTransactionExecutor; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; +import io.openmessaging.producer.TransactionalResult; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.promise.DefaultPromise; import io.openmessaging.rocketmq.utils.OMSUtil; +import java.util.List; +import java.util.Optional; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendStatus; @@ -39,42 +45,25 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { super(properties); } - @Override - public KeyValue attributes() { - return properties; - } - @Override public SendResult send(final Message message) { return send(message, this.rocketmqProducer.getSendMsgTimeout()); } - @Override - public SendResult send(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) - ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); - return send(message, timeout); - } - - @Override - public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) { - return null; - } - private SendResult send(final Message message, long timeout) { checkMessageType(message); - org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message); try { org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { log.error(String.format("Send message to RocketMQ failed, %s", message)); - throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); + throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed."); } - message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); + message.header().setMessageId(rmqResult.getMsgId()); return OMSUtil.sendResultConvert(rmqResult); } catch (Exception e) { log.error(String.format("Send message to RocketMQ failed, %s", message), e); - throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e); + throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e); } } @@ -83,22 +72,15 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); } - @Override - public Promise sendAsync(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) - ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); - return sendAsync(message, timeout); - } - private Promise sendAsync(final Message message, long timeout) { checkMessageType(message); - org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message); final Promise promise = new DefaultPromise<>(); try { this.rocketmqProducer.send(rmqMessage, new SendCallback() { @Override public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { - message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); + message.header().setMessageId(rmqResult.getMsgId()); promise.set(OMSUtil.sendResultConvert(rmqResult)); } @@ -116,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public void sendOneway(final Message message) { checkMessageType(message); - org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); + org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message); try { this.rocketmqProducer.sendOneway(rmqMessage); } catch (Exception ignore) { //Ignore the oneway exception. @@ -124,22 +106,65 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { } @Override - public void sendOneway(final Message message, final KeyValue properties) { - sendOneway(message); + public void send(List messages) { + if (messages == null || messages.isEmpty()) { + throw new OMSMessageFormatException(-1, "The messages collection is empty"); + } + + for (Message message : messages) { + sendOneway(messages); + } } @Override - public BatchMessageSender createBatchMessageSender() { + public Future sendAsync(List messages) { return null; } @Override - public void addInterceptor(ProducerInterceptor interceptor) { + public void sendOneway(List messages) { + if (messages == null || messages.isEmpty()) { + throw new OMSMessageFormatException(-1, "The messages collection is empty"); + } + for (Message message : messages) { + sendOneway(messages); + } + } + + @Override + public void addInterceptor(ProducerInterceptor interceptor) { } @Override public void removeInterceptor(ProducerInterceptor interceptor) { + } + + @Override + public TransactionalResult prepare(Message message) { + return null; + } + + @Override + public ServiceLifeState currentState() { + return null; + } + + @Override + public Optional getExtension() { + return null; + } + @Override + public QueueMetaData getQueueMetaData(String queueName) { + return null; + } + + @Override + public Message createMessage(String queueName, byte[] body) { + Message message = new BytesMessageImpl(); + message.setData(body); + message.header().setDestination(queueName); + return message; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index c1b59993f6f814295727dd06def963c1878cd097..e472d2ae6e793bd18c95a8251e165b38ea01c346 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -175,7 +175,7 @@ public class DefaultPromise implements Promise { private V getValueOrThrowable() { if (exception != null) { Throwable e = exception.getCause() != null ? exception.getCause() : exception; - throw new OMSRuntimeException("-1", e); + throw new OMSRuntimeException(-1, e); } notifyListeners(); return result; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 66af8cebb9163a9eabc494efd0864f2bc6c9634e..5b095eee8f43da0ab627841d598ec2b922afecb8 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -16,18 +16,15 @@ */ package io.openmessaging.rocketmq.utils; -import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.Message.BuiltinKeys; import io.openmessaging.OMS; +import io.openmessaging.message.Header; import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.RocketMQConstants; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; -import java.util.Iterator; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.UtilAll; @@ -44,68 +41,55 @@ public class OMSUtil { return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); } - public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { + public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessageImpl omsMessage) { org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); - rmqMessage.setBody(omsMessage.getBody(byte[].class)); + rmqMessage.setBody(omsMessage.getData()); - KeyValue sysHeaders = omsMessage.sysHeaders(); - KeyValue userHeaders = omsMessage.userHeaders(); + Header sysHeaders = omsMessage.header(); + KeyValue userHeaders = omsMessage.properties(); //All destinations in RocketMQ use Topic - rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION)); + rmqMessage.setTopic(sysHeaders.getDestination()); - if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { - long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); - if (deliverTime > 0) { - rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); - } + long deliverTime = sysHeaders.getBornTimestamp(); + if (deliverTime > 0) { + rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); } + for (String key : userHeaders.keySet()) { MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); } - //System headers has a high priority - for (String key : sysHeaders.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key)); - } - + MessageAccessor.putProperty(rmqMessage, RocketMQConstants.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(sysHeaders.getDeliveryCount())); return rmqMessage; } - public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { - BytesMessage omsMsg = new BytesMessageImpl(); - omsMsg.setBody(rmqMsg.getBody()); - - KeyValue headers = omsMsg.sysHeaders(); - KeyValue properties = omsMsg.userHeaders(); + public static BytesMessageImpl msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { + BytesMessageImpl omsMsg = new BytesMessageImpl(); + omsMsg.setData(rmqMsg.getBody()); final Set> entries = rmqMsg.getProperties().entrySet(); for (final Map.Entry entry : entries) { - if (isOMSHeader(entry.getKey())) { - headers.put(entry.getKey(), entry.getValue()); - } else { - properties.put(entry.getKey(), entry.getValue()); + if (!isOMSHeader(entry.getKey())) { + omsMsg.properties().put(entry.getKey(), entry.getValue()); } } - omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId()); - - omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic()); + omsMsg.header().setMessageId(rmqMsg.getMsgId()); + omsMsg.header().setDestination(rmqMsg.getTopic()); + omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost())); + omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp()); + omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel()); - omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); - omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); - omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); - omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); - omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); return omsMsg; } public static boolean isOMSHeader(String value) { - for (Field field : BuiltinKeys.class.getDeclaredFields()) { + for (Field field : Header.class.getDeclaredFields()) { try { - if (field.get(BuiltinKeys.class).equals(value)) { + if (field.get(Header.class).equals(value)) { return true; } } catch (IllegalAccessException e) { @@ -132,49 +116,4 @@ public class OMSUtil { } return keyValue; } - - /** - * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. - */ - public static Iterator cycle(final Iterable iterable) { - return new Iterator() { - Iterator iterator = new Iterator() { - @Override - public synchronized boolean hasNext() { - return false; - } - - @Override - public synchronized T next() { - throw new NoSuchElementException(); - } - - @Override - public synchronized void remove() { - //Ignore - } - }; - - @Override - public synchronized boolean hasNext() { - return iterator.hasNext() || iterable.iterator().hasNext(); - } - - @Override - public synchronized T next() { - if (!iterator.hasNext()) { - iterator = iterable.iterator(); - if (!iterator.hasNext()) { - throw new NoSuchElementException(); - } - } - return iterator.next(); - } - - @Override - public synchronized void remove() { - iterator.remove(); - } - }; - } } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java index fc6515ea8d14cb5936cd2817c45fc2689863dae7..61178c979a34bbb2917308ca8edc49e6533bd3ae 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -56,9 +56,7 @@ public class ProducerImplTest { Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); field.setAccessible(true); field.set(producer, rocketmqProducer); - - messagingAccessPoint.startup(); - producer.startup(); + producer.start(); } @Test @@ -68,7 +66,7 @@ public class ProducerImplTest { sendResult.setSendStatus(SendStatus.SEND_OK); when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); io.openmessaging.producer.SendResult omsResult = - producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'})); assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); } @@ -80,7 +78,7 @@ public class ProducerImplTest { when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); try { - producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'})); failBecauseExceptionWasNotThrown(OMSRuntimeException.class); } catch (Exception e) { assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); @@ -91,7 +89,7 @@ public class ProducerImplTest { public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); try { - producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'})); failBecauseExceptionWasNotThrown(OMSRuntimeException.class); } catch (Exception e) { assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java index f226edef0a2510f664db14a16d3b6972ac416426..0f6414fe673489a39b6a2b41869c3dfcef529020 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java @@ -87,7 +87,7 @@ public class DefaultPromiseTest { @Test public void testAddListener_WithException_ListenerAfterSet() throws Exception { - final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + final Throwable exception = new OMSRuntimeException(-1, "Test Error"); promise.setFailure(exception); promise.addListener(new FutureListener() { @Override @@ -99,7 +99,7 @@ public class DefaultPromiseTest { @Test public void testAddListener_WithException() throws Exception { - final Throwable exception = new OMSRuntimeException("-1", "Test Error"); + final Throwable exception = new OMSRuntimeException(-1, "Test Error"); promise.addListener(new FutureListener() { @Override public void operationComplete(Future future) { @@ -112,7 +112,7 @@ public class DefaultPromiseTest { @Test public void getThrowable() throws Exception { assertThat(promise.getThrowable()).isNull(); - Throwable exception = new OMSRuntimeException("-1", "Test Error"); + Throwable exception = new OMSRuntimeException(-1, "Test Error"); promise.setFailure(exception); assertThat(promise.getThrowable()).isEqualTo(exception); }