提交 3180e55f 编写于 作者: H huzongtang

[issue#1198]Implement the 1.0.0 openmessaging producer API for rocketmq oms module.

上级 84d2260b
...@@ -18,7 +18,7 @@ package org.apache.rocketmq.example.openmessaging; ...@@ -18,7 +18,7 @@ package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Future; import io.openmessaging.Future;
import io.openmessaging.FutureListener; import io.openmessaging.FutureListener;
import io.openmessaging.Message; import io.openmessaging.message.Message;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.producer.Producer; import io.openmessaging.producer.Producer;
...@@ -32,15 +32,11 @@ public class SimpleProducer { ...@@ -32,15 +32,11 @@ public class SimpleProducer {
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer(); final Producer producer = messagingAccessPoint.createProducer();
producer.start();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n"); System.out.printf("Producer startup OK%n");
{ {
Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); Message message = producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message); SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L); //final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
...@@ -48,7 +44,7 @@ public class SimpleProducer { ...@@ -48,7 +44,7 @@ public class SimpleProducer {
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
{ {
final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); final Future<SendResult> result = producer.sendAsync(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() { result.addListener(new FutureListener<SendResult>() {
@Override @Override
public void operationComplete(Future<SendResult> future) { public void operationComplete(Future<SendResult> future) {
...@@ -63,7 +59,7 @@ public class SimpleProducer { ...@@ -63,7 +59,7 @@ public class SimpleProducer {
} }
{ {
producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); producer.sendOneway(producer.createMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n"); System.out.printf("Send oneway message OK%n");
} }
...@@ -73,6 +69,6 @@ public class SimpleProducer { ...@@ -73,6 +69,6 @@ public class SimpleProducer {
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
producer.shutdown(); producer.stop();
} }
} }
...@@ -18,16 +18,13 @@ package io.openmessaging.rocketmq; ...@@ -18,16 +18,13 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.ResourceManager; import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.PullConsumer; import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.consumer.PushConsumer; import io.openmessaging.manager.ResourceManager;
import io.openmessaging.consumer.StreamingConsumer; import io.openmessaging.message.MessageFactory;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.producer.Producer; import io.openmessaging.producer.Producer;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint { public class MessagingAccessPointImpl implements MessagingAccessPoint {
...@@ -43,8 +40,8 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { ...@@ -43,8 +40,8 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
} }
@Override @Override
public String implVersion() { public String version() {
return "0.3.0"; return "1.0.0";
} }
@Override @Override
...@@ -52,53 +49,20 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { ...@@ -52,53 +49,20 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return new ProducerImpl(this.accessPointProperties); return new ProducerImpl(this.accessPointProperties);
} }
@Override @Override public Producer createProducer(TransactionStateCheckListener transactionStateCheckListener) {
public Producer createProducer(KeyValue properties) {
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
@Override
public PushConsumer createPushConsumer(KeyValue properties) {
return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PullConsumer createPullConsumer() {
return new PullConsumerImpl(accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(KeyValue attributes) {
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
}
@Override
public StreamingConsumer createStreamingConsumer() {
return null; return null;
} }
@Override @Override public Consumer createConsumer() {
public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
return null; return null;
} }
@Override @Override
public ResourceManager resourceManager() { public ResourceManager resourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version."); throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version.");
} }
@Override @Override public MessageFactory messageFactory() {
public void startup() { return null;
//Ignore
}
@Override
public void shutdown() {
//Ignore
} }
} }
...@@ -16,98 +16,52 @@ ...@@ -16,98 +16,52 @@
*/ */
package io.openmessaging.rocketmq.domain; package io.openmessaging.rocketmq.domain;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.extension.ExtensionHeader;
import io.openmessaging.message.Header;
import io.openmessaging.message.Message;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.exception.OMSMessageFormatException; import java.util.Optional;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class BytesMessageImpl implements BytesMessage { public class BytesMessageImpl implements Message {
private KeyValue sysHeaders;
private KeyValue userHeaders;
private byte[] body;
public BytesMessageImpl() { private Header sysHeaders;
this.sysHeaders = OMS.newKeyValue(); private KeyValue userProperties;
this.userHeaders = OMS.newKeyValue(); private byte[] data;
}
@Override
public <T> T getBody(Class<T> type) throws OMSMessageFormatException {
if (type == byte[].class) {
return (T)body;
}
throw new OMSMessageFormatException("", "Cannot assign byte[] to " + type.getName());
}
@Override public BytesMessageImpl() {
public BytesMessage setBody(final byte[] body) { this.sysHeaders = new MessageHeader();
this.body = body; this.userProperties = OMS.newKeyValue();
return this;
} }
@Override @Override
public KeyValue sysHeaders() { public Header header() {
return sysHeaders; return sysHeaders;
} }
@Override @Override
public KeyValue userHeaders() { public Optional<ExtensionHeader> extensionHeader() {
return userHeaders; return null;
}
@Override
public Message putSysHeaders(String key, int value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putSysHeaders(String key, long value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putSysHeaders(String key, double value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putSysHeaders(String key, String value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putUserHeaders(String key, int value) {
userHeaders.put(key, value);
return this;
} }
@Override @Override
public Message putUserHeaders(String key, long value) { public KeyValue properties() {
userHeaders.put(key, value); return userProperties;
return this;
} }
@Override @Override
public Message putUserHeaders(String key, double value) { public byte[] getData() {
userHeaders.put(key, value); return this.data;
return this;
} }
@Override @Override
public Message putUserHeaders(String key, String value) { public void setData(byte[] data) {
userHeaders.put(key, value); this.data = data;
return this;
} }
@Override @Override
public String toString() { public MessageReceipt getMessageReceipt() {
return ToStringBuilder.reflectionToString(this); return null;
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.message.Header;
public class MessageHeader implements Header{
private String destination;
private String messageId;
private long bornTimestamp;
private String bornHost;
private short priority;
private int deliveryCount;
private short compression;
private short durability;
public MessageHeader() {
}
@Override public Header setDestination(String destination) {
this.destination = destination;
return this;
}
@Override public Header setMessageId(String messageId) {
this.messageId = messageId;
return this;
}
@Override public Header setBornTimestamp(long bornTimestamp) {
this.bornTimestamp = bornTimestamp;
return this;
}
@Override public Header setBornHost(String bornHost) {
this.bornHost = bornHost;
return this;
}
@Override public Header setPriority(short priority) {
this.priority = priority;
return this;
}
@Override public Header setDurability(short durability) {
this.durability = durability;
return this;
}
@Override public Header setDeliveryCount(int deliveryCount) {
this.deliveryCount = deliveryCount;
return this;
}
@Override public Header setCompression(short compression) {
this.compression = compression;
return this;
}
@Override public String getDestination() {
return this.destination;
}
@Override public String getMessageId() {
return this.messageId;
}
@Override public long getBornTimestamp() {
return this.bornTimestamp;
}
@Override public String getBornHost() {
return this.bornHost;
}
@Override public short getPriority() {
return this.priority;
}
@Override public short getDurability() {
return this.durability;
}
@Override public int getDeliveryCount() {
return this.deliveryCount;
}
@Override public short getCompression() {
return this.compression;
}
}
...@@ -27,4 +27,6 @@ public interface NonStandardKeys { ...@@ -27,4 +27,6 @@ public interface NonStandardKeys {
String MESSAGE_DESTINATION = "rmq.message.destination"; String MESSAGE_DESTINATION = "rmq.message.destination";
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID ="CONSUMER_ID";
} }
...@@ -23,4 +23,6 @@ public interface RocketMQConstants { ...@@ -23,4 +23,6 @@ public interface RocketMQConstants {
*/ */
String START_DELIVER_TIME = "__STARTDELIVERTIME"; String START_DELIVER_TIME = "__STARTDELIVERTIME";
String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
} }
...@@ -16,18 +16,17 @@ ...@@ -16,18 +16,17 @@
*/ */
package io.openmessaging.rocketmq.producer; package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.message.Message;
import io.openmessaging.MessageFactory; import io.openmessaging.message.MessageFactory;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.ServiceLifecycle; import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException; import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException; import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.BeanUtils;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
...@@ -56,7 +55,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -56,7 +55,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) { if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) {
String accessPoints = clientConfig.getAccessPoints(); String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty.");
} }
this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
...@@ -69,23 +68,23 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -69,23 +68,23 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
this.rocketmqProducer.setLanguage(LanguageCode.OMS); this.rocketmqProducer.setLanguage(LanguageCode.OMS);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); properties.put(NonStandardKeys.PRODUCER_ID, producerId);
} }
@Override @Override
public synchronized void startup() { public synchronized void start() {
if (!started) { if (!started) {
try { try {
this.rocketmqProducer.start(); this.rocketmqProducer.start();
} catch (MQClientException e) { } catch (MQClientException e) {
throw new OMSRuntimeException("-1", e); throw new OMSRuntimeException(-1, e);
} }
} }
this.started = true; this.started = true;
} }
@Override @Override
public synchronized void shutdown() { public synchronized void stop() {
if (this.started) { if (this.started) {
this.rocketmqProducer.shutdown(); this.rocketmqProducer.shutdown();
} }
...@@ -96,21 +95,20 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -96,21 +95,20 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
if (e instanceof MQClientException) { if (e instanceof MQClientException) {
if (e.getCause() != null) { if (e.getCause() != null) {
if (e.getCause() instanceof RemotingTimeoutException) { if (e.getCause() instanceof RemotingTimeoutException) {
return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s", return new OMSTimeOutException(-1, String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e); this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
} else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) { } else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
if (e.getCause() instanceof MQBrokerException) { if (e.getCause() instanceof MQBrokerException) {
MQBrokerException brokerException = (MQBrokerException) e.getCause(); MQBrokerException brokerException = (MQBrokerException) e.getCause();
return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s", return new OMSRuntimeException(-1, String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
topic, msgId, brokerException.getErrorMessage()), e); topic, msgId, brokerException.getErrorMessage()), e);
} }
if (e.getCause() instanceof RemotingConnectException) { if (e.getCause() instanceof RemotingConnectException) {
RemotingConnectException connectException = (RemotingConnectException)e.getCause(); RemotingConnectException connectException = (RemotingConnectException)e.getCause();
return new OMSRuntimeException("-1", return new OMSRuntimeException(-1,
String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s", String.format("Network connection experiences failures. Topic=%s, msgId=%s, %s",
topic, msgId, connectException.getMessage()), topic, msgId, connectException.getMessage()), e);
e);
} }
} }
} }
...@@ -118,28 +116,21 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -118,28 +116,21 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
else { else {
MQClientException clientException = (MQClientException) e; MQClientException clientException = (MQClientException) e;
if (-1 == clientException.getResponseCode()) { if (-1 == clientException.getResponseCode()) {
return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s", return new OMSRuntimeException(-1, String.format("Topic does not exist, Topic=%s, msgId=%s",
topic, msgId), e); topic, msgId), e);
} else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) { } else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s", return new OMSMessageFormatException(-1, String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
topic, msgId), e); topic, msgId), e);
} }
} }
} }
return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e); return new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.", e);
} }
protected void checkMessageType(Message message) { protected void checkMessageType(Message message) {
if (!(message instanceof BytesMessage)) { if (!(message instanceof BytesMessageImpl)) {
throw new OMSNotSupportedException("-1", "Only BytesMessage is supported."); throw new OMSUnsupportException(-1, "Only BytesMessage is supported.");
} }
} }
@Override
public BytesMessage createBytesMessage(String queue, byte[] body) {
BytesMessage message = new BytesMessageImpl();
message.setBody(body);
message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
return message;
}
} }
...@@ -16,18 +16,24 @@ ...@@ -16,18 +16,24 @@
*/ */
package io.openmessaging.rocketmq.producer; package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage; import io.openmessaging.Future;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.ServiceLifeState;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Message;
import io.openmessaging.Promise; import io.openmessaging.Promise;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor; import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.producer.BatchMessageSender;
import io.openmessaging.producer.LocalTransactionExecutor;
import io.openmessaging.producer.Producer; import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult; import io.openmessaging.producer.SendResult;
import io.openmessaging.producer.TransactionalResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.promise.DefaultPromise; import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.List;
import java.util.Optional;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
...@@ -39,42 +45,25 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { ...@@ -39,42 +45,25 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
super(properties); super(properties);
} }
@Override
public KeyValue attributes() {
return properties;
}
@Override @Override
public SendResult send(final Message message) { public SendResult send(final Message message) {
return send(message, this.rocketmqProducer.getSendMsgTimeout()); return send(message, this.rocketmqProducer.getSendMsgTimeout());
} }
@Override
public SendResult send(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return send(message, timeout);
}
@Override
public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
return null;
}
private SendResult send(final Message message, long timeout) { private SendResult send(final Message message, long timeout) {
checkMessageType(message); checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
try { try {
org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout); org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) { if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error(String.format("Send message to RocketMQ failed, %s", message)); log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
} }
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); message.header().setMessageId(rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult); return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) { } catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e); log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e); throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
} }
} }
...@@ -83,22 +72,15 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { ...@@ -83,22 +72,15 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout()); return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
} }
@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}
private Promise<SendResult> sendAsync(final Message message, long timeout) { private Promise<SendResult> sendAsync(final Message message, long timeout) {
checkMessageType(message); checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
final Promise<SendResult> promise = new DefaultPromise<>(); final Promise<SendResult> promise = new DefaultPromise<>();
try { try {
this.rocketmqProducer.send(rmqMessage, new SendCallback() { this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override @Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); message.header().setMessageId(rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult)); promise.set(OMSUtil.sendResultConvert(rmqResult));
} }
...@@ -116,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { ...@@ -116,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override @Override
public void sendOneway(final Message message) { public void sendOneway(final Message message) {
checkMessageType(message); checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessageImpl) message);
try { try {
this.rocketmqProducer.sendOneway(rmqMessage); this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception. } catch (Exception ignore) { //Ignore the oneway exception.
...@@ -124,22 +106,65 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { ...@@ -124,22 +106,65 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
} }
@Override @Override
public void sendOneway(final Message message, final KeyValue properties) { public void send(List<Message> messages) {
sendOneway(message); if (messages == null || messages.isEmpty()) {
throw new OMSMessageFormatException(-1, "The messages collection is empty");
}
for (Message message : messages) {
sendOneway(messages);
}
} }
@Override @Override
public BatchMessageSender createBatchMessageSender() { public Future<SendResult> sendAsync(List<Message> messages) {
return null; return null;
} }
@Override @Override
public void addInterceptor(ProducerInterceptor interceptor) { public void sendOneway(List<Message> messages) {
if (messages == null || messages.isEmpty()) {
throw new OMSMessageFormatException(-1, "The messages collection is empty");
}
for (Message message : messages) {
sendOneway(messages);
}
}
@Override
public void addInterceptor(ProducerInterceptor interceptor) {
} }
@Override @Override
public void removeInterceptor(ProducerInterceptor interceptor) { public void removeInterceptor(ProducerInterceptor interceptor) {
}
@Override
public TransactionalResult prepare(Message message) {
return null;
}
@Override
public ServiceLifeState currentState() {
return null;
}
@Override
public Optional<Extension> getExtension() {
return null;
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return null;
}
@Override
public Message createMessage(String queueName, byte[] body) {
Message message = new BytesMessageImpl();
message.setData(body);
message.header().setDestination(queueName);
return message;
} }
} }
...@@ -175,7 +175,7 @@ public class DefaultPromise<V> implements Promise<V> { ...@@ -175,7 +175,7 @@ public class DefaultPromise<V> implements Promise<V> {
private V getValueOrThrowable() { private V getValueOrThrowable() {
if (exception != null) { if (exception != null) {
Throwable e = exception.getCause() != null ? exception.getCause() : exception; Throwable e = exception.getCause() != null ? exception.getCause() : exception;
throw new OMSRuntimeException("-1", e); throw new OMSRuntimeException(-1, e);
} }
notifyListeners(); notifyListeners();
return result; return result;
......
...@@ -16,18 +16,15 @@ ...@@ -16,18 +16,15 @@
*/ */
package io.openmessaging.rocketmq.utils; package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.message.Header;
import io.openmessaging.producer.SendResult; import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.RocketMQConstants; import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl; import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
...@@ -44,68 +41,55 @@ public class OMSUtil { ...@@ -44,68 +41,55 @@ public class OMSUtil {
return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime(); return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
} }
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) { public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessageImpl omsMessage) {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody(byte[].class)); rmqMessage.setBody(omsMessage.getData());
KeyValue sysHeaders = omsMessage.sysHeaders(); Header sysHeaders = omsMessage.header();
KeyValue userHeaders = omsMessage.userHeaders(); KeyValue userHeaders = omsMessage.properties();
//All destinations in RocketMQ use Topic //All destinations in RocketMQ use Topic
rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION)); rmqMessage.setTopic(sysHeaders.getDestination());
if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { long deliverTime = sysHeaders.getBornTimestamp();
long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); if (deliverTime > 0) {
if (deliverTime > 0) { rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
} }
for (String key : userHeaders.keySet()) { for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
} }
//System headers has a high priority MessageAccessor.putProperty(rmqMessage, RocketMQConstants.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(sysHeaders.getDeliveryCount()));
for (String key : sysHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
}
return rmqMessage; return rmqMessage;
} }
public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { public static BytesMessageImpl msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
BytesMessage omsMsg = new BytesMessageImpl(); BytesMessageImpl omsMsg = new BytesMessageImpl();
omsMsg.setBody(rmqMsg.getBody()); omsMsg.setData(rmqMsg.getBody());
KeyValue headers = omsMsg.sysHeaders();
KeyValue properties = omsMsg.userHeaders();
final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet(); final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
for (final Map.Entry<String, String> entry : entries) { for (final Map.Entry<String, String> entry : entries) {
if (isOMSHeader(entry.getKey())) { if (!isOMSHeader(entry.getKey())) {
headers.put(entry.getKey(), entry.getValue()); omsMsg.properties().put(entry.getKey(), entry.getValue());
} else {
properties.put(entry.getKey(), entry.getValue());
} }
} }
omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId()); omsMsg.header().setMessageId(rmqMsg.getMsgId());
omsMsg.header().setDestination(rmqMsg.getTopic());
omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic()); omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
return omsMsg; return omsMsg;
} }
public static boolean isOMSHeader(String value) { public static boolean isOMSHeader(String value) {
for (Field field : BuiltinKeys.class.getDeclaredFields()) { for (Field field : Header.class.getDeclaredFields()) {
try { try {
if (field.get(BuiltinKeys.class).equals(value)) { if (field.get(Header.class).equals(value)) {
return true; return true;
} }
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
...@@ -132,49 +116,4 @@ public class OMSUtil { ...@@ -132,49 +116,4 @@ public class OMSUtil {
} }
return keyValue; return keyValue;
} }
/**
* Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
*/
public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
return new Iterator<T>() {
Iterator<T> iterator = new Iterator<T>() {
@Override
public synchronized boolean hasNext() {
return false;
}
@Override
public synchronized T next() {
throw new NoSuchElementException();
}
@Override
public synchronized void remove() {
//Ignore
}
};
@Override
public synchronized boolean hasNext() {
return iterator.hasNext() || iterable.iterator().hasNext();
}
@Override
public synchronized T next() {
if (!iterator.hasNext()) {
iterator = iterable.iterator();
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
}
return iterator.next();
}
@Override
public synchronized void remove() {
iterator.remove();
}
};
}
} }
...@@ -56,9 +56,7 @@ public class ProducerImplTest { ...@@ -56,9 +56,7 @@ public class ProducerImplTest {
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true); field.setAccessible(true);
field.set(producer, rocketmqProducer); field.set(producer, rocketmqProducer);
producer.start();
messagingAccessPoint.startup();
producer.startup();
} }
@Test @Test
...@@ -68,7 +66,7 @@ public class ProducerImplTest { ...@@ -68,7 +66,7 @@ public class ProducerImplTest {
sendResult.setSendStatus(SendStatus.SEND_OK); sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.producer.SendResult omsResult = io.openmessaging.producer.SendResult omsResult =
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
} }
...@@ -80,7 +78,7 @@ public class ProducerImplTest { ...@@ -80,7 +78,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try { try {
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class); failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) { } catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
...@@ -91,7 +89,7 @@ public class ProducerImplTest { ...@@ -91,7 +89,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try { try {
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); producer.send(producer.createMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class); failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) { } catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......
...@@ -87,7 +87,7 @@ public class DefaultPromiseTest { ...@@ -87,7 +87,7 @@ public class DefaultPromiseTest {
@Test @Test
public void testAddListener_WithException_ListenerAfterSet() throws Exception { public void testAddListener_WithException_ListenerAfterSet() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error"); final Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.setFailure(exception); promise.setFailure(exception);
promise.addListener(new FutureListener<String>() { promise.addListener(new FutureListener<String>() {
@Override @Override
...@@ -99,7 +99,7 @@ public class DefaultPromiseTest { ...@@ -99,7 +99,7 @@ public class DefaultPromiseTest {
@Test @Test
public void testAddListener_WithException() throws Exception { public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error"); final Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.addListener(new FutureListener<String>() { promise.addListener(new FutureListener<String>() {
@Override @Override
public void operationComplete(Future<String> future) { public void operationComplete(Future<String> future) {
...@@ -112,7 +112,7 @@ public class DefaultPromiseTest { ...@@ -112,7 +112,7 @@ public class DefaultPromiseTest {
@Test @Test
public void getThrowable() throws Exception { public void getThrowable() throws Exception {
assertThat(promise.getThrowable()).isNull(); assertThat(promise.getThrowable()).isNull();
Throwable exception = new OMSRuntimeException("-1", "Test Error"); Throwable exception = new OMSRuntimeException(-1, "Test Error");
promise.setFailure(exception); promise.setFailure(exception);
assertThat(promise.getThrowable()).isEqualTo(exception); assertThat(promise.getThrowable()).isEqualTo(exception);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册