提交 abbc468d 编写于 作者: S shutian.lzh

Make code compatible to OMS 0.3.0

上级 48476ae5
......@@ -16,18 +16,18 @@
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.SendResult;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
public class SimpleProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
......@@ -47,29 +47,28 @@ public class SimpleProducer {
}));
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
public void operationComplete(Future<SendResult> future) {
if (future.getThrowable() != null) {
System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage());
} else {
System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
}
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
}
......
......@@ -17,19 +17,17 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PullConsumer;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
......@@ -43,13 +41,15 @@ public class SimplePullConsumer {
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC");
consumer.startup();
System.out.printf("Consumer startup OK%n");
while (true) {
Message message = consumer.poll();
Message message = consumer.receive();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
......
......@@ -17,18 +17,15 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
......@@ -47,8 +44,8 @@ public class SimplePushConsumer {
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
public void onReceived(Message message, Context context) {
System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
context.ack();
}
});
......
......@@ -16,24 +16,21 @@
*/
package io.openmessaging.rocketmq;
import io.openmessaging.IterableConsumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.Producer;
import io.openmessaging.PullConsumer;
import io.openmessaging.PushConsumer;
import io.openmessaging.ResourceManager;
import io.openmessaging.SequenceProducer;
import io.openmessaging.ServiceEndPoint;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.consumer.StreamingConsumer;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.observer.Observer;
import io.openmessaging.producer.Producer;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties;
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
......@@ -41,10 +38,15 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return accessPointProperties;
}
@Override
public String implVersion() {
return "0.3.0";
}
@Override
public Producer createProducer() {
return new ProducerImpl(this.accessPointProperties);
......@@ -55,16 +57,6 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public SequenceProducer createSequenceProducer() {
return new SequenceProducerImpl(this.accessPointProperties);
}
@Override
public SequenceProducer createSequenceProducer(KeyValue properties) {
return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
......@@ -76,50 +68,30 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
public PullConsumer createPullConsumer(String queueName) {
return new PullConsumerImpl(queueName, accessPointProperties);
public PullConsumer createPullConsumer() {
return new PullConsumerImpl(accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
public PullConsumer createPullConsumer(KeyValue attributes) {
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
}
@Override
public IterableConsumer createIterableConsumer(String queueName) {
throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
public StreamingConsumer createStreamingConsumer() {
return null;
}
@Override
public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) {
throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
return null;
}
@Override
public ResourceManager getResourceManager() {
public ResourceManager resourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
}
@Override
public ServiceEndPoint createServiceEndPoint() {
throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
}
@Override
public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
}
@Override
public void addObserver(Observer observer) {
//Ignore
}
@Override
public void deleteObserver(Observer observer) {
//Ignore
}
@Override
public void startup() {
//Ignore
......
......@@ -16,10 +16,10 @@
*/
package io.openmessaging.rocketmq.config;
import io.openmessaging.PropertyKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class ClientConfig implements PropertyKeys, NonStandardKeys {
public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
private String omsDriverImpl;
private String omsAccessPoints;
private String omsNamespace;
......
......@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.PropertyKeys;
import io.openmessaging.Message;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
......@@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
......@@ -96,8 +96,8 @@ class LocalMessageCache implements ServiceLifecycle {
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOmsOperationTimeout();
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
}
return poll(currentPollTimeout);
}
......
......@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
......@@ -42,17 +42,14 @@ public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
private boolean started = false;
private String targetQueueName;
private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
final static InternalLogger log = ClientLogger.getLog();
public PullConsumerImpl(final String queueName, final KeyValue properties) {
public PullConsumerImpl(final KeyValue properties) {
this.properties = properties;
this.targetQueueName = queueName;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String consumerGroup = clientConfig.getRmqConsumerGroup();
......@@ -76,24 +73,42 @@ public class PullConsumerImpl implements PullConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return properties;
}
@Override
public Message poll() {
public PullConsumer attachQueue(String queueName) {
registerPullTaskCallback(queueName);
return this;
}
@Override
public PullConsumer attachQueue(String queueName, KeyValue attributes) {
registerPullTaskCallback(queueName);
return this;
}
@Override
public PullConsumer detachQueue(String queueName) {
this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
return this;
}
@Override
public Message receive() {
MessageExt rmqMsg = localMessageCache.poll();
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
public Message poll(final KeyValue properties) {
public Message receive(final KeyValue properties) {
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
......@@ -112,7 +127,6 @@ public class PullConsumerImpl implements PullConsumer {
public synchronized void startup() {
if (!started) {
try {
registerPullTaskCallback();
this.pullConsumerScheduleService.start();
this.localMessageCache.startup();
} catch (MQClientException e) {
......@@ -122,7 +136,7 @@ public class PullConsumerImpl implements PullConsumer {
this.started = true;
}
private void registerPullTaskCallback() {
private void registerPullTaskCallback(final String targetQueueName) {
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
@Override
public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
......
......@@ -18,12 +18,12 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageListener;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
......@@ -70,13 +70,13 @@ public class PushConsumerImpl implements PushConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return properties;
}
......@@ -90,6 +90,11 @@ public class PushConsumerImpl implements PushConsumer {
this.rocketmqPushConsumer.suspend();
}
@Override
public void suspend(long timeout) {
}
@Override
public boolean isSuspended() {
return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
......@@ -106,6 +111,32 @@ public class PushConsumerImpl implements PushConsumer {
return this;
}
@Override
public PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes) {
return this.attachQueue(queueName, listener);
}
@Override
public PushConsumer detachQueue(String queueName) {
this.subscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.unsubscribe(queueName);
} catch (Exception e) {
throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
}
return null;
}
@Override
public void addInterceptor(ConsumerInterceptor interceptor) {
}
@Override
public void removeInterceptor(ConsumerInterceptor interceptor) {
}
@Override
public synchronized void startup() {
if (!started) {
......@@ -146,9 +177,9 @@ public class PushConsumerImpl implements PushConsumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
ReceivedMessageContext context = new ReceivedMessageContext() {
MessageListener.Context context = new MessageListener.Context() {
@Override
public KeyValue properties() {
public KeyValue attributes() {
return contextProperties;
}
......@@ -158,16 +189,9 @@ public class PushConsumerImpl implements PushConsumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
@Override
public void ack(final KeyValue properties) {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
};
long begin = System.currentTimeMillis();
listener.onMessage(omsMsg, context);
listener.onReceived(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try {
......
......@@ -23,13 +23,13 @@ import io.openmessaging.OMS;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class BytesMessageImpl implements BytesMessage {
private KeyValue headers;
private KeyValue properties;
private KeyValue sysHeaders;
private KeyValue userHeaders;
private byte[] body;
public BytesMessageImpl() {
this.headers = OMS.newKeyValue();
this.properties = OMS.newKeyValue();
this.sysHeaders = OMS.newKeyValue();
this.userHeaders = OMS.newKeyValue();
}
@Override
......@@ -44,60 +44,60 @@ public class BytesMessageImpl implements BytesMessage {
}
@Override
public KeyValue headers() {
return headers;
public KeyValue sysHeaders() {
return sysHeaders;
}
@Override
public KeyValue properties() {
return properties;
public KeyValue userHeaders() {
return userHeaders;
}
@Override
public Message putHeaders(final String key, final int value) {
headers.put(key, value);
public Message putSysHeaders(String key, int value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final long value) {
headers.put(key, value);
public Message putSysHeaders(String key, long value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final double value) {
headers.put(key, value);
public Message putSysHeaders(String key, double value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final String value) {
headers.put(key, value);
public Message putSysHeaders(String key, String value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final int value) {
properties.put(key, value);
public Message putUserHeaders(String key, int value) {
userHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final long value) {
properties.put(key, value);
public Message putUserHeaders(String key, long value) {
userHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final double value) {
properties.put(key, value);
public Message putUserHeaders(String key, double value) {
userHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final String value) {
properties.put(key, value);
public Message putUserHeaders(String key, String value) {
userHeaders.put(key, value);
return this;
}
......
......@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.domain;
import io.openmessaging.KeyValue;
import io.openmessaging.SendResult;
import io.openmessaging.producer.SendResult;
public class SendResultImpl implements SendResult {
private String messageId;
......@@ -33,7 +33,6 @@ public class SendResultImpl implements SendResult {
return messageId;
}
@Override
public KeyValue properties() {
return properties;
}
......
......@@ -20,8 +20,7 @@ import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageFactory;
import io.openmessaging.MessageHeader;
import io.openmessaging.PropertyKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException;
......@@ -64,7 +63,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(PropertyKeys.PRODUCER_ID, producerId);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
}
@Override
......@@ -121,18 +120,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
}
@Override
public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.TOPIC, topic);
return bytesMessage;
}
@Override
public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.QUEUE, queue);
return bytesMessage;
public BytesMessage createBytesMessage(String queue, byte[] body) {
BytesMessage message = new BytesMessageImpl();
message.setBody(body);
message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
return message;
}
}
......@@ -19,12 +19,13 @@ package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.Producer;
import io.openmessaging.Promise;
import io.openmessaging.PropertyKeys;
import io.openmessaging.SendResult;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.producer.BatchMessageSender;
import io.openmessaging.producer.LocalTransactionExecutor;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.producer.SendCallback;
......@@ -39,7 +40,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return properties;
}
......@@ -50,11 +51,16 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public SendResult send(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return send(message, timeout);
}
@Override
public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
return null;
}
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
......@@ -64,11 +70,11 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
}
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
}
}
......@@ -79,8 +85,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}
......@@ -92,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
......@@ -121,4 +127,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
public void sendOneway(final Message message, final KeyValue properties) {
sendOneway(message);
}
@Override
public BatchMessageSender createBatchMessageSender() {
return null;
}
@Override
public void addInterceptor(ProducerInterceptor interceptor) {
}
@Override
public void removeInterceptor(ProducerInterceptor interceptor) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.SequenceProducer;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
private BlockingQueue<Message> msgCacheQueue;
public SequenceProducerImpl(final KeyValue properties) {
super(properties);
this.msgCacheQueue = new LinkedBlockingQueue<>();
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public void send(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
try {
Validators.checkMessage(rmqMessage, this.rocketmqProducer);
} catch (MQClientException e) {
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
}
msgCacheQueue.add(message);
}
@Override
public void send(final Message message, final KeyValue properties) {
send(message);
}
@Override
public synchronized void commit() {
List<Message> messages = new ArrayList<>();
msgCacheQueue.drainTo(messages);
List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
for (Message message : messages) {
rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
}
if (rmqMessages.size() == 0) {
return;
}
try {
SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
String[] msgIdArray = sendResult.getMsgId().split(",");
for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
}
} catch (Exception e) {
throw checkProducerException("", "", e);
}
}
@Override
public synchronized void rollback() {
msgCacheQueue.clear();
}
}
......@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.FutureListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -33,7 +33,7 @@ public class DefaultPromise<V> implements Promise<V> {
private long timeout;
private long createTime;
private Throwable exception = null;
private List<PromiseListener<V>> promiseListenerList;
private List<FutureListener<V>> promiseListenerList;
public DefaultPromise() {
createTime = System.currentTimeMillis();
......@@ -121,7 +121,7 @@ public class DefaultPromise<V> implements Promise<V> {
}
@Override
public void addListener(final PromiseListener<V> listener) {
public void addListener(final FutureListener<V> listener) {
if (listener == null) {
throw new NullPointerException("FutureListener is null");
}
......@@ -150,7 +150,7 @@ public class DefaultPromise<V> implements Promise<V> {
private void notifyListeners() {
if (promiseListenerList != null) {
for (PromiseListener<V> listener : promiseListenerList) {
for (FutureListener<V> listener : promiseListenerList) {
notifyListener(listener);
}
}
......@@ -199,12 +199,9 @@ public class DefaultPromise<V> implements Promise<V> {
return true;
}
private void notifyListener(final PromiseListener<V> listener) {
private void notifyListener(final FutureListener<V> listener) {
try {
if (exception != null)
listener.operationFailed(this);
else
listener.operationCompleted(this);
listener.operationComplete(this);
} catch (Throwable t) {
LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
}
......
......@@ -18,11 +18,10 @@ package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageHeader;
import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS;
import io.openmessaging.SendResult;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
......@@ -48,25 +47,19 @@ public class OMSUtil {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody());
KeyValue headers = omsMessage.headers();
KeyValue properties = omsMessage.properties();
KeyValue sysHeaders = omsMessage.sysHeaders();
KeyValue userHeaders = omsMessage.userHeaders();
//All destinations in RocketMQ use Topic
if (headers.containsKey(MessageHeader.TOPIC)) {
rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
} else {
rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
}
rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
for (String key : properties.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
//Headers has a high priority
for (String key : headers.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
//System headers has a high priority
for (String key : sysHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
}
return rmqMessage;
......@@ -76,8 +69,8 @@ public class OMSUtil {
BytesMessage omsMsg = new BytesMessageImpl();
omsMsg.setBody(rmqMsg.getBody());
KeyValue headers = omsMsg.headers();
KeyValue properties = omsMsg.properties();
KeyValue headers = omsMsg.sysHeaders();
KeyValue properties = omsMsg.userHeaders();
final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
......@@ -89,25 +82,22 @@ public class OMSUtil {
}
}
omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
} else {
omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
}
omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
return omsMsg;
}
public static boolean isOMSHeader(String value) {
for (Field field : MessageHeader.class.getDeclaredFields()) {
for (Field field : BuiltinKeys.class.getDeclaredFields()) {
try {
if (field.get(MessageHeader.class).equals(value)) {
if (field.get(BuiltinKeys.class).equals(value)) {
return true;
}
} catch (IllegalAccessException e) {
......
......@@ -592,7 +592,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>0.1.0-alpha</version>
<version>0.3.0-alpha-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册