From abbc468d30e456d54f3fc3369a6998d224ea617d Mon Sep 17 00:00:00 2001 From: "shutian.lzh" Date: Sun, 15 Apr 2018 16:13:35 +0800 Subject: [PATCH] Make code compatible to OMS 0.3.0 --- .../example/openmessaging/SimpleProducer.java | 33 ++++--- .../openmessaging/SimplePullConsumer.java | 16 ++-- .../openmessaging/SimplePushConsumer.java | 13 +-- .../rocketmq/MessagingAccessPointImpl.java | 68 ++++--------- .../rocketmq/config/ClientConfig.java | 4 +- .../rocketmq/consumer/LocalMessageCache.java | 8 +- .../rocketmq/consumer/PullConsumerImpl.java | 38 +++++--- .../rocketmq/consumer/PushConsumerImpl.java | 56 +++++++---- .../rocketmq/domain/BytesMessageImpl.java | 48 +++++----- .../rocketmq/domain/SendResultImpl.java | 3 +- .../producer/AbstractOMSProducer.java | 23 ++--- .../rocketmq/producer/ProducerImpl.java | 45 ++++++--- .../producer/SequenceProducerImpl.java | 95 ------------------- .../rocketmq/promise/DefaultPromise.java | 15 ++- .../openmessaging/rocketmq/utils/OMSUtil.java | 56 +++++------ pom.xml | 2 +- 16 files changed, 216 insertions(+), 307 deletions(-) delete mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index 9d162ac1..2884797d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -16,18 +16,18 @@ */ package org.apache.rocketmq.example.openmessaging; +import io.openmessaging.Future; +import io.openmessaging.FutureListener; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; -import io.openmessaging.Producer; -import io.openmessaging.Promise; -import io.openmessaging.PromiseListener; -import io.openmessaging.SendResult; +import io.openmessaging.OMS; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; public class SimpleProducer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final Producer producer = messagingAccessPoint.createProducer(); @@ -47,29 +47,28 @@ public class SimpleProducer { })); { - Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); //final Void aVoid = result.get(3000L); System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); } { - final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); - result.addListener(new PromiseListener() { + final Future result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new FutureListener() { @Override - public void operationCompleted(Promise promise) { - System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); - } - - @Override - public void operationFailed(Promise promise) { - System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + public void operationComplete(Future future) { + if (future.getThrowable() != null) { + System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage()); + } else { + System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId()); + } } }); } { - producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.printf("Send oneway message OK%n"); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 8e067724..56c3266f 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -17,19 +17,17 @@ package org.apache.rocketmq.example.openmessaging; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PullConsumer; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.rocketmq.domain.NonStandardKeys; public class SimplePullConsumer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = + OMS.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); - final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + final PullConsumer consumer = messagingAccessPoint.createPullConsumer( OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); messagingAccessPoint.startup(); @@ -43,13 +41,15 @@ public class SimplePullConsumer { } })); + consumer.attachQueue("OMS_HELLO_TOPIC"); + consumer.startup(); System.out.printf("Consumer startup OK%n"); while (true) { - Message message = consumer.poll(); + Message message = consumer.receive(); if (message != null) { - String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index b0935d4c..18b7a4c2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -17,18 +17,15 @@ package org.apache.rocketmq.example.openmessaging; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.MessageListener; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PushConsumer; -import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.rocketmq.domain.NonStandardKeys; public class SimplePushConsumer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); final PushConsumer consumer = messagingAccessPoint. @@ -47,8 +44,8 @@ public class SimplePushConsumer { consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { @Override - public void onMessage(final Message message, final ReceivedMessageContext context) { - System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + public void onReceived(Message message, Context context) { + System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)); context.ack(); } }); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index 65caf840..51388f9c 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -16,24 +16,21 @@ */ package io.openmessaging.rocketmq; -import io.openmessaging.IterableConsumer; import io.openmessaging.KeyValue; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.Producer; -import io.openmessaging.PullConsumer; -import io.openmessaging.PushConsumer; import io.openmessaging.ResourceManager; -import io.openmessaging.SequenceProducer; -import io.openmessaging.ServiceEndPoint; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.consumer.PushConsumer; +import io.openmessaging.consumer.StreamingConsumer; import io.openmessaging.exception.OMSNotSupportedException; -import io.openmessaging.observer.Observer; +import io.openmessaging.producer.Producer; import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; -import io.openmessaging.rocketmq.producer.SequenceProducerImpl; import io.openmessaging.rocketmq.utils.OMSUtil; public class MessagingAccessPointImpl implements MessagingAccessPoint { + private final KeyValue accessPointProperties; public MessagingAccessPointImpl(final KeyValue accessPointProperties) { @@ -41,10 +38,15 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override - public KeyValue properties() { + public KeyValue attributes() { return accessPointProperties; } + @Override + public String implVersion() { + return "0.3.0"; + } + @Override public Producer createProducer() { return new ProducerImpl(this.accessPointProperties); @@ -55,16 +57,6 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } - @Override - public SequenceProducer createSequenceProducer() { - return new SequenceProducerImpl(this.accessPointProperties); - } - - @Override - public SequenceProducer createSequenceProducer(KeyValue properties) { - return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); - } - @Override public PushConsumer createPushConsumer() { return new PushConsumerImpl(accessPointProperties); @@ -76,50 +68,30 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override - public PullConsumer createPullConsumer(String queueName) { - return new PullConsumerImpl(queueName, accessPointProperties); + public PullConsumer createPullConsumer() { + return new PullConsumerImpl(accessPointProperties); } @Override - public PullConsumer createPullConsumer(String queueName, KeyValue properties) { - return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + public PullConsumer createPullConsumer(KeyValue attributes) { + return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes)); } @Override - public IterableConsumer createIterableConsumer(String queueName) { - throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + public StreamingConsumer createStreamingConsumer() { + return null; } @Override - public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) { - throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + public StreamingConsumer createStreamingConsumer(KeyValue attributes) { + return null; } @Override - public ResourceManager getResourceManager() { + public ResourceManager resourceManager() { throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version."); } - @Override - public ServiceEndPoint createServiceEndPoint() { - throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); - } - - @Override - public ServiceEndPoint createServiceEndPoint(KeyValue properties) { - throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); - } - - @Override - public void addObserver(Observer observer) { - //Ignore - } - - @Override - public void deleteObserver(Observer observer) { - //Ignore - } - @Override public void startup() { //Ignore diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java index 7077c6dc..774a7bc3 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java @@ -16,10 +16,10 @@ */ package io.openmessaging.rocketmq.config; -import io.openmessaging.PropertyKeys; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys; -public class ClientConfig implements PropertyKeys, NonStandardKeys { +public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { private String omsDriverImpl; private String omsAccessPoints; private String omsNamespace; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index cc1a5157..872d8fb5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -17,7 +17,7 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; -import io.openmessaging.PropertyKeys; +import io.openmessaging.Message; import io.openmessaging.ServiceLifecycle; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; @@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.logging.InternalLogger; class LocalMessageCache implements ServiceLifecycle { private final BlockingQueue consumeRequestCache; @@ -96,8 +96,8 @@ class LocalMessageCache implements ServiceLifecycle { MessageExt poll(final KeyValue properties) { int currentPollTimeout = clientConfig.getOmsOperationTimeout(); - if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { - currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { + currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); } return poll(currentPollTimeout); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index da4afdb7..c5bd24c7 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.Message; -import io.openmessaging.PropertyKeys; -import io.openmessaging.PullConsumer; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; @@ -42,17 +42,14 @@ public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer; private final KeyValue properties; private boolean started = false; - private String targetQueueName; private final MQPullConsumerScheduleService pullConsumerScheduleService; private final LocalMessageCache localMessageCache; private final ClientConfig clientConfig; final static InternalLogger log = ClientLogger.getLog(); - public PullConsumerImpl(final String queueName, final KeyValue properties) { + public PullConsumerImpl(final KeyValue properties) { this.properties = properties; - this.targetQueueName = queueName; - this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); String consumerGroup = clientConfig.getRmqConsumerGroup(); @@ -76,24 +73,42 @@ public class PullConsumerImpl implements PullConsumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); - properties.put(PropertyKeys.CONSUMER_ID, consumerId); + properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); } @Override - public KeyValue properties() { + public KeyValue attributes() { return properties; } @Override - public Message poll() { + public PullConsumer attachQueue(String queueName) { + registerPullTaskCallback(queueName); + return this; + } + + @Override + public PullConsumer attachQueue(String queueName, KeyValue attributes) { + registerPullTaskCallback(queueName); + return this; + } + + @Override + public PullConsumer detachQueue(String queueName) { + this.rocketmqPullConsumer.getRegisterTopics().remove(queueName); + return this; + } + + @Override + public Message receive() { MessageExt rmqMsg = localMessageCache.poll(); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override - public Message poll(final KeyValue properties) { + public Message receive(final KeyValue properties) { MessageExt rmqMsg = localMessageCache.poll(properties); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @@ -112,7 +127,6 @@ public class PullConsumerImpl implements PullConsumer { public synchronized void startup() { if (!started) { try { - registerPullTaskCallback(); this.pullConsumerScheduleService.start(); this.localMessageCache.startup(); } catch (MQClientException e) { @@ -122,7 +136,7 @@ public class PullConsumerImpl implements PullConsumer { this.started = true; } - private void registerPullTaskCallback() { + private void registerPullTaskCallback(final String targetQueueName) { this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { @Override public void doPullTask(final MessageQueue mq, final PullTaskContext context) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index f9b8058e..6c0f3921 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -18,12 +18,12 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.MessageListener; import io.openmessaging.OMS; -import io.openmessaging.PropertyKeys; -import io.openmessaging.PushConsumer; -import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.interceptor.ConsumerInterceptor; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; @@ -70,13 +70,13 @@ public class PushConsumerImpl implements PushConsumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); - properties.put(PropertyKeys.CONSUMER_ID, consumerId); + properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); } @Override - public KeyValue properties() { + public KeyValue attributes() { return properties; } @@ -90,6 +90,11 @@ public class PushConsumerImpl implements PushConsumer { this.rocketmqPushConsumer.suspend(); } + @Override + public void suspend(long timeout) { + + } + @Override public boolean isSuspended() { return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); @@ -106,6 +111,32 @@ public class PushConsumerImpl implements PushConsumer { return this; } + @Override + public PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes) { + return this.attachQueue(queueName, listener); + } + + @Override + public PushConsumer detachQueue(String queueName) { + this.subscribeTable.remove(queueName); + try { + this.rocketmqPushConsumer.unsubscribe(queueName); + } catch (Exception e) { + throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName)); + } + return null; + } + + @Override + public void addInterceptor(ConsumerInterceptor interceptor) { + + } + + @Override + public void removeInterceptor(ConsumerInterceptor interceptor) { + + } + @Override public synchronized void startup() { if (!started) { @@ -146,9 +177,9 @@ public class PushConsumerImpl implements PushConsumer { contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); - ReceivedMessageContext context = new ReceivedMessageContext() { + MessageListener.Context context = new MessageListener.Context() { @Override - public KeyValue properties() { + public KeyValue attributes() { return contextProperties; } @@ -158,16 +189,9 @@ public class PushConsumerImpl implements PushConsumer { contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); } - - @Override - public void ack(final KeyValue properties) { - sync.countDown(); - contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); - } }; long begin = System.currentTimeMillis(); - listener.onMessage(omsMsg, context); + listener.onReceived(omsMsg, context); long costs = System.currentTimeMillis() - begin; long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; try { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java index 43f80ae5..702d561f 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -23,13 +23,13 @@ import io.openmessaging.OMS; import org.apache.commons.lang3.builder.ToStringBuilder; public class BytesMessageImpl implements BytesMessage { - private KeyValue headers; - private KeyValue properties; + private KeyValue sysHeaders; + private KeyValue userHeaders; private byte[] body; public BytesMessageImpl() { - this.headers = OMS.newKeyValue(); - this.properties = OMS.newKeyValue(); + this.sysHeaders = OMS.newKeyValue(); + this.userHeaders = OMS.newKeyValue(); } @Override @@ -44,60 +44,60 @@ public class BytesMessageImpl implements BytesMessage { } @Override - public KeyValue headers() { - return headers; + public KeyValue sysHeaders() { + return sysHeaders; } @Override - public KeyValue properties() { - return properties; + public KeyValue userHeaders() { + return userHeaders; } @Override - public Message putHeaders(final String key, final int value) { - headers.put(key, value); + public Message putSysHeaders(String key, int value) { + sysHeaders.put(key, value); return this; } @Override - public Message putHeaders(final String key, final long value) { - headers.put(key, value); + public Message putSysHeaders(String key, long value) { + sysHeaders.put(key, value); return this; } @Override - public Message putHeaders(final String key, final double value) { - headers.put(key, value); + public Message putSysHeaders(String key, double value) { + sysHeaders.put(key, value); return this; } @Override - public Message putHeaders(final String key, final String value) { - headers.put(key, value); + public Message putSysHeaders(String key, String value) { + sysHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final int value) { - properties.put(key, value); + public Message putUserHeaders(String key, int value) { + userHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final long value) { - properties.put(key, value); + public Message putUserHeaders(String key, long value) { + userHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final double value) { - properties.put(key, value); + public Message putUserHeaders(String key, double value) { + userHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final String value) { - properties.put(key, value); + public Message putUserHeaders(String key, String value) { + userHeaders.put(key, value); return this; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java index 228a9f0b..85bcd685 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java @@ -17,7 +17,7 @@ package io.openmessaging.rocketmq.domain; import io.openmessaging.KeyValue; -import io.openmessaging.SendResult; +import io.openmessaging.producer.SendResult; public class SendResultImpl implements SendResult { private String messageId; @@ -33,7 +33,6 @@ public class SendResultImpl implements SendResult { return messageId; } - @Override public KeyValue properties() { return properties; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index db25fc65..2e99fd6e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -20,8 +20,7 @@ import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; import io.openmessaging.Message; import io.openmessaging.MessageFactory; -import io.openmessaging.MessageHeader; -import io.openmessaging.PropertyKeys; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSMessageFormatException; import io.openmessaging.exception.OMSNotSupportedException; @@ -64,7 +63,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); - properties.put(PropertyKeys.PRODUCER_ID, producerId); + properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); } @Override @@ -121,18 +120,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { } @Override - public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) { - BytesMessage bytesMessage = new BytesMessageImpl(); - bytesMessage.setBody(body); - bytesMessage.headers().put(MessageHeader.TOPIC, topic); - return bytesMessage; - } - - @Override - public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) { - BytesMessage bytesMessage = new BytesMessageImpl(); - bytesMessage.setBody(body); - bytesMessage.headers().put(MessageHeader.QUEUE, queue); - return bytesMessage; + public BytesMessage createBytesMessage(String queue, byte[] body) { + BytesMessage message = new BytesMessageImpl(); + message.setBody(body); + message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue); + return message; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index 2c00c60e..c2b6d3e3 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -19,12 +19,13 @@ package io.openmessaging.rocketmq.producer; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.Producer; import io.openmessaging.Promise; -import io.openmessaging.PropertyKeys; -import io.openmessaging.SendResult; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.producer.BatchMessageSender; +import io.openmessaging.producer.LocalTransactionExecutor; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.promise.DefaultPromise; import io.openmessaging.rocketmq.utils.OMSUtil; import org.apache.rocketmq.client.producer.SendCallback; @@ -39,7 +40,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { } @Override - public KeyValue properties() { + public KeyValue attributes() { return properties; } @@ -50,11 +51,16 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public SendResult send(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) - ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) + ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return send(message, timeout); } + @Override + public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) { + return null; + } + private SendResult send(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); @@ -64,11 +70,11 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { log.error(String.format("Send message to RocketMQ failed, %s", message)); throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); } - message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); return OMSUtil.sendResultConvert(rmqResult); } catch (Exception e) { log.error(String.format("Send message to RocketMQ failed, %s", message), e); - throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e); } } @@ -79,8 +85,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public Promise sendAsync(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) - ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) + ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return sendAsync(message, timeout); } @@ -92,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { this.rocketmqProducer.send(rmqMessage, new SendCallback() { @Override public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { - message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); promise.set(OMSUtil.sendResultConvert(rmqResult)); } @@ -121,4 +127,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { public void sendOneway(final Message message, final KeyValue properties) { sendOneway(message); } + + @Override + public BatchMessageSender createBatchMessageSender() { + return null; + } + + @Override + public void addInterceptor(ProducerInterceptor interceptor) { + + } + + @Override + public void removeInterceptor(ProducerInterceptor interceptor) { + + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java deleted file mode 100644 index 05225cc5..00000000 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.rocketmq.producer; - -import io.openmessaging.BytesMessage; -import io.openmessaging.KeyValue; -import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.SequenceProducer; -import io.openmessaging.rocketmq.utils.OMSUtil; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.rocketmq.client.Validators; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.SendResult; - -public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer { - - private BlockingQueue msgCacheQueue; - - public SequenceProducerImpl(final KeyValue properties) { - super(properties); - this.msgCacheQueue = new LinkedBlockingQueue<>(); - } - - @Override - public KeyValue properties() { - return properties; - } - - @Override - public void send(final Message message) { - checkMessageType(message); - org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); - try { - Validators.checkMessage(rmqMessage, this.rocketmqProducer); - } catch (MQClientException e) { - throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); - } - msgCacheQueue.add(message); - } - - @Override - public void send(final Message message, final KeyValue properties) { - send(message); - } - - @Override - public synchronized void commit() { - List messages = new ArrayList<>(); - msgCacheQueue.drainTo(messages); - - List rmqMessages = new ArrayList<>(); - - for (Message message : messages) { - rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); - } - - if (rmqMessages.size() == 0) { - return; - } - - try { - SendResult sendResult = this.rocketmqProducer.send(rmqMessages); - String[] msgIdArray = sendResult.getMsgId().split(","); - for (int i = 0; i < messages.size(); i++) { - Message message = messages.get(i); - message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); - } - } catch (Exception e) { - throw checkProducerException("", "", e); - } - } - - @Override - public synchronized void rollback() { - msgCacheQueue.clear(); - } -} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index 453b665c..c1b59993 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -17,7 +17,7 @@ package io.openmessaging.rocketmq.promise; import io.openmessaging.Promise; -import io.openmessaging.PromiseListener; +import io.openmessaging.FutureListener; import io.openmessaging.exception.OMSRuntimeException; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -33,7 +33,7 @@ public class DefaultPromise implements Promise { private long timeout; private long createTime; private Throwable exception = null; - private List> promiseListenerList; + private List> promiseListenerList; public DefaultPromise() { createTime = System.currentTimeMillis(); @@ -121,7 +121,7 @@ public class DefaultPromise implements Promise { } @Override - public void addListener(final PromiseListener listener) { + public void addListener(final FutureListener listener) { if (listener == null) { throw new NullPointerException("FutureListener is null"); } @@ -150,7 +150,7 @@ public class DefaultPromise implements Promise { private void notifyListeners() { if (promiseListenerList != null) { - for (PromiseListener listener : promiseListenerList) { + for (FutureListener listener : promiseListenerList) { notifyListener(listener); } } @@ -199,12 +199,9 @@ public class DefaultPromise implements Promise { return true; } - private void notifyListener(final PromiseListener listener) { + private void notifyListener(final FutureListener listener) { try { - if (exception != null) - listener.operationFailed(this); - else - listener.operationCompleted(this); + listener.operationComplete(this); } catch (Throwable t) { LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 60c84081..0938b831 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -18,11 +18,10 @@ package io.openmessaging.rocketmq.utils; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.MessageHeader; +import io.openmessaging.Message.BuiltinKeys; import io.openmessaging.OMS; -import io.openmessaging.SendResult; +import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.domain.BytesMessageImpl; -import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; import java.util.Iterator; @@ -48,25 +47,19 @@ public class OMSUtil { org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); rmqMessage.setBody(omsMessage.getBody()); - KeyValue headers = omsMessage.headers(); - KeyValue properties = omsMessage.properties(); + KeyValue sysHeaders = omsMessage.sysHeaders(); + KeyValue userHeaders = omsMessage.userHeaders(); //All destinations in RocketMQ use Topic - if (headers.containsKey(MessageHeader.TOPIC)) { - rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); - rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); - } else { - rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); - rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); - } + rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION)); - for (String key : properties.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); + for (String key : userHeaders.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); } - //Headers has a high priority - for (String key : headers.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); + //System headers has a high priority + for (String key : sysHeaders.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key)); } return rmqMessage; @@ -76,8 +69,8 @@ public class OMSUtil { BytesMessage omsMsg = new BytesMessageImpl(); omsMsg.setBody(rmqMsg.getBody()); - KeyValue headers = omsMsg.headers(); - KeyValue properties = omsMsg.properties(); + KeyValue headers = omsMsg.sysHeaders(); + KeyValue properties = omsMsg.userHeaders(); final Set> entries = rmqMsg.getProperties().entrySet(); @@ -89,25 +82,22 @@ public class OMSUtil { } } - omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); - if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || - rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { - omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); - } else { - omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); - } - omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); - omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); - omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); - omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); - omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); + omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId()); + + omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic()); + + omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); + omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); + omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); + omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); + omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); return omsMsg; } public static boolean isOMSHeader(String value) { - for (Field field : MessageHeader.class.getDeclaredFields()) { + for (Field field : BuiltinKeys.class.getDeclaredFields()) { try { - if (field.get(MessageHeader.class).equals(value)) { + if (field.get(BuiltinKeys.class).equals(value)) { return true; } } catch (IllegalAccessException e) { diff --git a/pom.xml b/pom.xml index 6737ae41..f4184a5f 100644 --- a/pom.xml +++ b/pom.xml @@ -592,7 +592,7 @@ io.openmessaging openmessaging-api - 0.1.0-alpha + 0.3.0-alpha-SNAPSHOT log4j -- GitLab