未验证 提交 03c50054 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1202 from zongtanghu/feature_oms_1.0.0

[ISSUE#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
......@@ -32,15 +32,11 @@ public class SimpleProducer {
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
producer.start();
System.out.printf("Producer startup OK%n");
{
Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
Message message = producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
......@@ -48,7 +44,7 @@ public class SimpleProducer {
final CountDownLatch countDownLatch = new CountDownLatch(1);
{
final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
final Future<SendResult> result = producer.sendAsync(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() {
@Override
public void operationComplete(Future<SendResult> future) {
......@@ -63,7 +59,7 @@ public class SimpleProducer {
}
{
producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
producer.sendOneway(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
......@@ -73,6 +69,6 @@ public class SimpleProducer {
} catch (InterruptedException ignore) {
}
producer.shutdown();
producer.stop();
}
}
......@@ -18,16 +18,13 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ResourceManager;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.consumer.StreamingConsumer;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
......@@ -43,8 +40,8 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
public String implVersion() {
return "0.3.0";
public String version() {
return "1.0.0";
}
@Override
......@@ -52,53 +49,20 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return new ProducerImpl(this.accessPointProperties);
}
@Override
public Producer createProducer(KeyValue properties) {
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
@Override
public PushConsumer createPushConsumer(KeyValue properties) {
return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PullConsumer createPullConsumer() {
return new PullConsumerImpl(accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(KeyValue attributes) {
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
}
@Override
public StreamingConsumer createStreamingConsumer() {
@Override public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) {
return null;
}
@Override
public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
@Override public Consumer createConsumer() {
return null;
}
@Override
public ResourceManager resourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version.");
}
@Override
public void startup() {
//Ignore
}
@Override
public void shutdown() {
//Ignore
@Override public MessageFactory messageFactory() {
return null;
}
}
......@@ -16,98 +16,52 @@
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.extension.ExtensionHeader;
import io.openmessaging.message.Header;
import io.openmessaging.message.Message;
import io.openmessaging.OMS;
import io.openmessaging.exception.OMSMessageFormatException;
import org.apache.commons.lang3.builder.ToStringBuilder;
import java.util.Optional;
public class BytesMessageImpl implements BytesMessage {
private KeyValue sysHeaders;
private KeyValue userHeaders;
private byte[] body;
public class BytesMessageImpl implements Message {
public BytesMessageImpl() {
this.sysHeaders = OMS.newKeyValue();
this.userHeaders = OMS.newKeyValue();
}
@Override
public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
if (type == byte[].class) {
return (T)body;
}
throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
}
private Header sysHeaders;
private KeyValue userProperties;
private byte[] data;
@Override
public BytesMessage setBody(final byte[] body) {
this.body = body;
return this;
public BytesMessageImpl() {
this.sysHeaders = new MessageHeader();
this.userProperties = OMS.newKeyValue();
}
@Override
public KeyValue sysHeaders() {
public Header header() {
return sysHeaders;
}
@Override
public KeyValue userHeaders() {
return userHeaders;
}
@Override
public Message putSysHeaders(String key, int value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putSysHeaders(String key, long value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putSysHeaders(String key, double value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putSysHeaders(String key, String value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putUserHeaders(String key, int value) {
userHeaders.put(key, value);
return this;
public Optional<ExtensionHeader> extensionHeader() {
return null;
}
@Override
public Message putUserHeaders(String key, long value) {
userHeaders.put(key, value);
return this;
public KeyValue properties() {
return userProperties;
}
@Override
public Message putUserHeaders(String key, double value) {
userHeaders.put(key, value);
return this;
public byte[] getData() {
return this.data;
}
@Override
public Message putUserHeaders(String key, String value) {
userHeaders.put(key, value);
return this;
public void setData(byte[] data) {
this.data = data;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
public MessageReceipt getMessageReceipt() {
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.message.Header;
public class MessageHeader implements Header{
private String destination;
private String messageId;
private long bornTimestamp;
private String bornHost;
private short priority;
private int deliveryCount;
private short compression;
private short durability;
public MessageHeader() {
}
@Override public Header setDestination(String destination) {
this.destination = destination;
return this;
}
@Override public Header setMessageId(String messageId) {
this.messageId = messageId;
return this;
}
@Override public Header setBornTimestamp(long bornTimestamp) {
this.bornTimestamp = bornTimestamp;
return this;
}
@Override public Header setBornHost(String bornHost) {
this.bornHost = bornHost;
return this;
}
@Override public Header setPriority(short priority) {
this.priority = priority;
return this;
}
@Override public Header setDurability(short durability) {
this.durability = durability;
return this;
}
@Override public Header setDeliveryCount(int deliveryCount) {
this.deliveryCount = deliveryCount;
return this;
}
@Override public Header setCompression(short compression) {
this.compression = compression;
return this;
}
@Override public String getDestination() {
return this.destination;
}
@Override public String getMessageId() {
return this.messageId;
}
@Override public long getBornTimestamp() {
return this.bornTimestamp;
}
@Override public String getBornHost() {
return this.bornHost;
}
@Override public short getPriority() {
return this.priority;
}
@Override public short getDurability() {
return this.durability;
}
@Override public int getDeliveryCount() {
return this.deliveryCount;
}
@Override public short getCompression() {
return this.compression;
}
}
......@@ -27,4 +27,6 @@ public interface NonStandardKeys {
String MESSAGE_DESTINATION = "rmq.message.destination";
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID ="CONSUMER_ID";
}
......@@ -23,4 +23,6 @@ public interface RocketMQConstants {
*/
String START_DELIVER_TIME = "__STARTDELIVERTIME";
String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
}
......@@ -16,18 +16,17 @@
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageFactory;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.message.Message;
import io.openmessaging.message.MessageFactory;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -56,7 +55,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) {
String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty.");
}
this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
......@@ -69,23 +68,23 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
this.rocketmqProducer.setLanguage(LanguageCode.OMS);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
properties.put(NonStandardKeys.PRODUCER_ID, producerId);
}
@Override
public synchronized void startup() {
public synchronized void start() {
if (!started) {
try {
this.rocketmqProducer.start();
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", e);
throw new OMSRuntimeException(-1, e);
}
}
this.started = true;
}
@Override
public synchronized void shutdown() {
public synchronized void stop() {
if (this.started) {
this.rocketmqProducer.shutdown();
}
......@@ -96,21 +95,20 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
if (e instanceof MQClientException) {
if (e.getCause() != null) {
if (e.getCause() instanceof RemotingTimeoutException) {
return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
return new OMSTimeOutException(-1, String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
} else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
if (e.getCause() instanceof MQBrokerException) {
MQBrokerException brokerException = (MQBrokerException) e.getCause();
return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
return new OMSRuntimeException(-1, String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
topic, msgId, brokerException.getErrorMessage()), e);
}
if (e.getCause() instanceof RemotingConnectException) {
RemotingConnectException connectException = (RemotingConnectException)e.getCause();
return new OMSRuntimeException("-1",
return new OMSRuntimeException(-1,
String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s",
topic, msgId, connectException.getMessage()),
e);
topic, msgId, connectException.getMessage()), e);
}
}
}
......@@ -118,28 +116,21 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
else {
MQClientException clientException = (MQClientException) e;
if (-1 == clientException.getResponseCode()) {
return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
return new OMSRuntimeException(-1, String.format("Topic does not exist, Topic=%s, msgId=%s",
topic, msgId), e);
} else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
return new OMSMessageFormatException(-1, String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
topic, msgId), e);
}
}
}
return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
return new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.", e);
}
protected void checkMessageType(Message message) {
if (!(message instanceof BytesMessage)) {
throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
if (!(message instanceof BytesMessageImpl)) {
throw new OMSUnsupportException(-1, "Only BytesMessage is supported.");
}
}
@Override
public BytesMessage createBytesMessage(String queue, byte[] body) {
BytesMessage message = new BytesMessageImpl();
message.setBody(body);
message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
return message;
}
}
......@@ -16,18 +16,24 @@
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Future;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.ServiceLifeState;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Message;
import io.openmessaging.Promise;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.producer.BatchMessageSender;
import io.openmessaging.producer.LocalTransactionExecutor;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import io.openmessaging.producer.TransactionalResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.List;
import java.util.Optional;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus;
......@@ -39,42 +45,25 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
super(properties);
}
@Override
public KeyValue attributes() {
return properties;
}
@Override
public SendResult send(final Message message) {
return send(message, this.rocketmqProducer.getSendMsgTimeout());
}
@Override
public SendResult send(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return send(message, timeout);
}
@Override
public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
return null;
}
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
try {
org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
}
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
message.header().setMessageId(rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
}
}
......@@ -83,22 +72,15 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
}
@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}
private Promise<SendResult> sendAsync(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
final Promise<SendResult> promise = new DefaultPromise<>();
try {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
message.header().setMessageId(rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
......@@ -116,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
......@@ -124,22 +106,65 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
@Override
public void sendOneway(final Message message, final KeyValue properties) {
sendOneway(message);
public void send(List<Message> messages) {
if (messages == null || messages.isEmpty()) {
throw new OMSMessageFormatException(-1, "The messages collection is empty");
}
for (Message message : messages) {
sendOneway(messages);
}
}
@Override
public BatchMessageSender createBatchMessageSender() {
public Future<SendResult> sendAsync(List<Message> messages) {
return null;
}
@Override
public void addInterceptor(ProducerInterceptor interceptor) {
public void sendOneway(List<Message> messages) {
if (messages == null || messages.isEmpty()) {
throw new OMSMessageFormatException(-1, "The messages collection is empty");
}
for (Message message : messages) {
sendOneway(messages);
}
}
@Override
public void addInterceptor(ProducerInterceptor interceptor) {
}
@Override
public void removeInterceptor(ProducerInterceptor interceptor) {
}
@Override
public TransactionalResult prepare(Message message) {
return null;
}
@Override
public ServiceLifeState currentState() {
return null;
}
@Override
public Optional<Extension> getExtension() {
return null;
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return null;
}
@Override
public Message createMessage(String queueName, byte[] body) {
Message message = new BytesMessageImpl();
message.setData(body);
message.header().setDestination(queueName);
return message;
}
}
......@@ -175,7 +175,7 @@ public class DefaultPromise<V> implements Promise<V> {
private V getValueOrThrowable() {
if (exception != null) {
Throwable e = exception.getCause() != null ? exception.getCause() : exception;
throw new OMSRuntimeException("-1", e);
throw new OMSRuntimeException(-1, e);
}
notifyListeners();
return result;
......
......@@ -16,18 +16,15 @@
*/
package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS;
import io.openmessaging.message.Header;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
......@@ -44,68 +41,55 @@ public class OMSUtil {
return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
}
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessageImpl omsMessage) {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody(byte[].class));
rmqMessage.setBody(omsMessage.getData());
KeyValue sysHeaders = omsMessage.sysHeaders();
KeyValue userHeaders = omsMessage.userHeaders();
Header sysHeaders = omsMessage.header();
KeyValue userHeaders = omsMessage.properties();
//All destinations in RocketMQ use Topic
rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
rmqMessage.setTopic(sysHeaders.getDestination());
if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
if (deliverTime > 0) {
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
long deliverTime = sysHeaders.getBornTimestamp();
if (deliverTime > 0) {
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
//System headers has a high priority
for (String key : sysHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
}
MessageAccessor.putProperty(rmqMessage, RocketMQConstants.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(sysHeaders.getDeliveryCount()));
return rmqMessage;
}
public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
BytesMessage omsMsg = new BytesMessageImpl();
omsMsg.setBody(rmqMsg.getBody());
KeyValue headers = omsMsg.sysHeaders();
KeyValue properties = omsMsg.userHeaders();
public static BytesMessageImpl msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
BytesMessageImpl omsMsg = new BytesMessageImpl();
omsMsg.setData(rmqMsg.getBody());
final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
for (final Map.Entry<String, String> entry : entries) {
if (isOMSHeader(entry.getKey())) {
headers.put(entry.getKey(), entry.getValue());
} else {
properties.put(entry.getKey(), entry.getValue());
if (!isOMSHeader(entry.getKey())) {
omsMsg.properties().put(entry.getKey(), entry.getValue());
}
}
omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
omsMsg.header().setMessageId(rmqMsg.getMsgId());
omsMsg.header().setDestination(rmqMsg.getTopic());
omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
return omsMsg;
}
public static boolean isOMSHeader(String value) {
for (Field field : BuiltinKeys.class.getDeclaredFields()) {
for (Field field : Header.class.getDeclaredFields()) {
try {
if (field.get(BuiltinKeys.class).equals(value)) {
if (field.get(Header.class).equals(value)) {
return true;
}
} catch (IllegalAccessException e) {
......@@ -132,49 +116,4 @@ public class OMSUtil {
}
return keyValue;
}
/**
* Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
*/
public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
return new Iterator<T>() {
Iterator<T> iterator = new Iterator<T>() {
@Override
public synchronized boolean hasNext() {
return false;
}
@Override
public synchronized T next() {
throw new NoSuchElementException();
}
@Override
public synchronized void remove() {
//Ignore
}
};
@Override
public synchronized boolean hasNext() {
return iterator.hasNext() || iterable.iterator().hasNext();
}
@Override
public synchronized T next() {
if (!iterator.hasNext()) {
iterator = iterable.iterator();
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
}
return iterator.next();
}
@Override
public synchronized void remove() {
iterator.remove();
}
};
}
}
......@@ -56,9 +56,7 @@ public class ProducerImplTest {
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
producer.start();
}
@Test
......@@ -68,7 +66,7 @@ public class ProducerImplTest {
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.producer.SendResult omsResult =
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
......@@ -80,7 +78,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try {
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......@@ -91,7 +89,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try {
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......
......@@ -87,7 +87,7 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException_ListenerAfterSet() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
final Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.setFailure(exception);
promise.addListener(new FutureListener<String>() {
@Override
......@@ -99,7 +99,7 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
final Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.addListener(new FutureListener<String>() {
@Override
public void operationComplete(Future<String> future) {
......@@ -112,7 +112,7 @@ public class DefaultPromiseTest {
@Test
public void getThrowable() throws Exception {
assertThat(promise.getThrowable()).isNull();
Throwable exception = new OMSRuntimeException("-1", "Test Error");
Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.setFailure(exception);
assertThat(promise.getThrowable()).isEqualTo(exception);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册