From 2169e3c075e48f8f016337c9b81eafc258b9b9b1 Mon Sep 17 00:00:00 2001 From: "shutian.lzh" Date: Sun, 15 Apr 2018 16:13:35 +0800 Subject: [PATCH] Make code compatible to OMS 0.3.0 --- .../apache/rocketmq/broker/BrokerStartup.java | 1 + .../rocketmq/client/log/ClientLoggerTest.java | 5 +- .../example/openmessaging/SimpleProducer.java | 54 ++++---- .../openmessaging/SimplePullConsumer.java | 54 +++++--- .../openmessaging/SimplePushConsumer.java | 19 ++- .../rocketmq/example/simple/Producer.java | 2 +- .../example/simple/PullScheduleService.java | 2 +- .../rocketmq/MessagingAccessPointImpl.java | 68 +++------- .../rocketmq/config/ClientConfig.java | 128 +++++++++--------- .../rocketmq/consumer/LocalMessageCache.java | 12 +- .../rocketmq/consumer/PullConsumerImpl.java | 44 ++++-- .../rocketmq/consumer/PushConsumerImpl.java | 60 +++++--- .../rocketmq/domain/BytesMessageImpl.java | 48 +++---- .../rocketmq/domain/RocketMQConstants.java | 7 + .../rocketmq/domain/SendResultImpl.java | 3 +- .../producer/AbstractOMSProducer.java | 27 ++-- .../rocketmq/producer/ProducerImpl.java | 45 ++++-- .../producer/SequenceProducerImpl.java | 95 ------------- .../rocketmq/promise/DefaultPromise.java | 15 +- .../rocketmq/utils/BeanUtils.java | 2 +- .../openmessaging/rocketmq/utils/OMSUtil.java | 62 ++++----- .../consumer/PullConsumerImplTest.java | 24 ++-- .../consumer/PushConsumerImplTest.java | 18 ++- .../rocketmq/producer/ProducerImplTest.java | 16 +-- .../producer/SequenceProducerImplTest.java | 86 ------------ .../rocketmq/promise/DefaultPromiseTest.java | 38 ++---- .../rocketmq/utils/BeanUtilsTest.java | 4 +- pom.xml | 2 +- 28 files changed, 392 insertions(+), 549 deletions(-) create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java delete mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java delete mode 100644 openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index f0a11501..1fc1b3b8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -70,6 +70,7 @@ public class BrokerStartup { } log.info(tip); + System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); diff --git a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java index 9fe0d8b9..4888186b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java @@ -49,7 +49,10 @@ public class ClientLoggerTest { rocketmqCommon.info("common message {}", i, new RuntimeException()); rocketmqRemoting.info("remoting message {}", i, new RuntimeException()); } - + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + } String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log"); Assert.assertTrue(content.contains("testClientlog")); Assert.assertTrue(content.contains("RocketmqClient")); diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index 9d162ac1..dbe1d105 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -16,19 +16,20 @@ */ package org.apache.rocketmq.example.openmessaging; +import io.openmessaging.Future; +import io.openmessaging.FutureListener; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; -import io.openmessaging.Producer; -import io.openmessaging.Promise; -import io.openmessaging.PromiseListener; -import io.openmessaging.SendResult; +import io.openmessaging.OMS; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; import java.nio.charset.Charset; +import java.util.concurrent.CountDownLatch; public class SimpleProducer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = + OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final Producer producer = messagingAccessPoint.createProducer(); @@ -38,39 +39,40 @@ public class SimpleProducer { producer.startup(); System.out.printf("Producer startup OK%n"); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - producer.shutdown(); - messagingAccessPoint.shutdown(); - } - })); - { - Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); //final Void aVoid = result.get(3000L); System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId()); } + final CountDownLatch countDownLatch = new CountDownLatch(1); { - final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); - result.addListener(new PromiseListener() { - @Override - public void operationCompleted(Promise promise) { - System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId()); - } - + final Future result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + result.addListener(new FutureListener() { @Override - public void operationFailed(Promise promise) { - System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage()); + public void operationComplete(Future future) { + if (future.getThrowable() != null) { + System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage()); + } else { + System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId()); + } + countDownLatch.countDown(); } }); } { - producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.printf("Send oneway message OK%n"); } + + try { + countDownLatch.await(); + Thread.sleep(500); // Wait some time for one-way delivery. + } catch (InterruptedException ignore) { + } + + producer.shutdown(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 8e067724..86aba410 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -17,42 +17,60 @@ package org.apache.rocketmq.example.openmessaging; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PullConsumer; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; public class SimplePullConsumer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = + OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); - final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", - OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + messagingAccessPoint.startup(); + + final Producer producer = messagingAccessPoint.createProducer(); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer( + OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - consumer.shutdown(); - messagingAccessPoint.shutdown(); - } - })); + final String queueName = "TopicTest"; + + producer.startup(); + Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes()); + SendResult sendResult = producer.send(msg); + System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId()); + producer.shutdown(); + + consumer.attachQueue(queueName); consumer.startup(); System.out.printf("Consumer startup OK%n"); - while (true) { - Message message = consumer.poll(); + // Keep running until we find the one that has just been sent + boolean stop = false; + while (!stop) { + Message message = consumer.receive(); if (message != null) { - String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); + + if (!stop) { + stop = msgId.equalsIgnoreCase(sendResult.messageId()); + } + + } else { + System.out.printf("Return without any message%n"); } } + + consumer.shutdown(); + messagingAccessPoint.shutdown(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index b0935d4c..220c1323 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -17,22 +17,19 @@ package org.apache.rocketmq.example.openmessaging; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.MessageListener; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PushConsumer; -import io.openmessaging.ReceivedMessageContext; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.PushConsumer; public class SimplePushConsumer { public static void main(String[] args) { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = OMS + .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final PushConsumer consumer = messagingAccessPoint. - createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); @@ -47,8 +44,8 @@ public class SimplePushConsumer { consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { @Override - public void onMessage(final Message message, final ReceivedMessageContext context) { - System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID)); + public void onReceived(Message message, Context context) { + System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)); context.ack(); } }); diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index 5751d22c..7b504dd2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -29,7 +29,7 @@ public class Producer { producer.start(); - for (int i = 0; i < 10000000; i++) + for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java index 151628f9..8cfdd9bb 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java @@ -32,7 +32,7 @@ public class PullScheduleService { final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); scheduleService.setMessageModel(MessageModel.CLUSTERING); - scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { + scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() { @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index 65caf840..51388f9c 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -16,24 +16,21 @@ */ package io.openmessaging.rocketmq; -import io.openmessaging.IterableConsumer; import io.openmessaging.KeyValue; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.Producer; -import io.openmessaging.PullConsumer; -import io.openmessaging.PushConsumer; import io.openmessaging.ResourceManager; -import io.openmessaging.SequenceProducer; -import io.openmessaging.ServiceEndPoint; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.consumer.PushConsumer; +import io.openmessaging.consumer.StreamingConsumer; import io.openmessaging.exception.OMSNotSupportedException; -import io.openmessaging.observer.Observer; +import io.openmessaging.producer.Producer; import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; -import io.openmessaging.rocketmq.producer.SequenceProducerImpl; import io.openmessaging.rocketmq.utils.OMSUtil; public class MessagingAccessPointImpl implements MessagingAccessPoint { + private final KeyValue accessPointProperties; public MessagingAccessPointImpl(final KeyValue accessPointProperties) { @@ -41,10 +38,15 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override - public KeyValue properties() { + public KeyValue attributes() { return accessPointProperties; } + @Override + public String implVersion() { + return "0.3.0"; + } + @Override public Producer createProducer() { return new ProducerImpl(this.accessPointProperties); @@ -55,16 +57,6 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } - @Override - public SequenceProducer createSequenceProducer() { - return new SequenceProducerImpl(this.accessPointProperties); - } - - @Override - public SequenceProducer createSequenceProducer(KeyValue properties) { - return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); - } - @Override public PushConsumer createPushConsumer() { return new PushConsumerImpl(accessPointProperties); @@ -76,50 +68,30 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override - public PullConsumer createPullConsumer(String queueName) { - return new PullConsumerImpl(queueName, accessPointProperties); + public PullConsumer createPullConsumer() { + return new PullConsumerImpl(accessPointProperties); } @Override - public PullConsumer createPullConsumer(String queueName, KeyValue properties) { - return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + public PullConsumer createPullConsumer(KeyValue attributes) { + return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes)); } @Override - public IterableConsumer createIterableConsumer(String queueName) { - throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + public StreamingConsumer createStreamingConsumer() { + return null; } @Override - public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) { - throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version"); + public StreamingConsumer createStreamingConsumer(KeyValue attributes) { + return null; } @Override - public ResourceManager getResourceManager() { + public ResourceManager resourceManager() { throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version."); } - @Override - public ServiceEndPoint createServiceEndPoint() { - throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); - } - - @Override - public ServiceEndPoint createServiceEndPoint(KeyValue properties) { - throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version."); - } - - @Override - public void addObserver(Observer observer) { - //Ignore - } - - @Override - public void deleteObserver(Observer observer) { - //Ignore - } - @Override public void startup() { //Ignore diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java index 7077c6dc..a5dfe494 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java @@ -16,20 +16,20 @@ */ package io.openmessaging.rocketmq.config; -import io.openmessaging.PropertyKeys; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys; -public class ClientConfig implements PropertyKeys, NonStandardKeys { - private String omsDriverImpl; - private String omsAccessPoints; - private String omsNamespace; - private String omsProducerId; - private String omsConsumerId; - private int omsOperationTimeout = 5000; - private String omsRoutingName; - private String omsOperatorName; - private String omsDstQueue; - private String omsSrcTopic; +public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { + private String driverImpl; + private String accessPoints; + private String namespace; + private String producerId; + private String consumerId; + private int operationTimeout = 5000; + private String region; + private String routingSource; + private String routingDestination; + private String routingExpression; private String rmqConsumerGroup; private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; private int rmqMaxRedeliveryTimes = 16; @@ -40,84 +40,60 @@ public class ClientConfig implements PropertyKeys, NonStandardKeys { private int rmqPullMessageBatchNums = 32; private int rmqPullMessageCacheCapacity = 1000; - public String getOmsDriverImpl() { - return omsDriverImpl; + public String getDriverImpl() { + return driverImpl; } - public void setOmsDriverImpl(final String omsDriverImpl) { - this.omsDriverImpl = omsDriverImpl; + public void setDriverImpl(final String driverImpl) { + this.driverImpl = driverImpl; } - public String getOmsAccessPoints() { - return omsAccessPoints; + public String getAccessPoints() { + return accessPoints; } - public void setOmsAccessPoints(final String omsAccessPoints) { - this.omsAccessPoints = omsAccessPoints; + public void setAccessPoints(final String accessPoints) { + this.accessPoints = accessPoints; } - public String getOmsNamespace() { - return omsNamespace; + public String getNamespace() { + return namespace; } - public void setOmsNamespace(final String omsNamespace) { - this.omsNamespace = omsNamespace; + public void setNamespace(final String namespace) { + this.namespace = namespace; } - public String getOmsProducerId() { - return omsProducerId; + public String getProducerId() { + return producerId; } - public void setOmsProducerId(final String omsProducerId) { - this.omsProducerId = omsProducerId; + public void setProducerId(final String producerId) { + this.producerId = producerId; } - public String getOmsConsumerId() { - return omsConsumerId; + public String getConsumerId() { + return consumerId; } - public void setOmsConsumerId(final String omsConsumerId) { - this.omsConsumerId = omsConsumerId; + public void setConsumerId(final String consumerId) { + this.consumerId = consumerId; } - public int getOmsOperationTimeout() { - return omsOperationTimeout; + public int getOperationTimeout() { + return operationTimeout; } - public void setOmsOperationTimeout(final int omsOperationTimeout) { - this.omsOperationTimeout = omsOperationTimeout; + public void setOperationTimeout(final int operationTimeout) { + this.operationTimeout = operationTimeout; } - public String getOmsRoutingName() { - return omsRoutingName; + public String getRoutingSource() { + return routingSource; } - public void setOmsRoutingName(final String omsRoutingName) { - this.omsRoutingName = omsRoutingName; - } - - public String getOmsOperatorName() { - return omsOperatorName; - } - - public void setOmsOperatorName(final String omsOperatorName) { - this.omsOperatorName = omsOperatorName; - } - - public String getOmsDstQueue() { - return omsDstQueue; - } - - public void setOmsDstQueue(final String omsDstQueue) { - this.omsDstQueue = omsDstQueue; - } - - public String getOmsSrcTopic() { - return omsSrcTopic; - } - - public void setOmsSrcTopic(final String omsSrcTopic) { - this.omsSrcTopic = omsSrcTopic; + public void setRoutingSource(final String routingSource) { + this.routingSource = routingSource; } public String getRmqConsumerGroup() { @@ -191,4 +167,28 @@ public class ClientConfig implements PropertyKeys, NonStandardKeys { public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getRoutingDestination() { + return routingDestination; + } + + public void setRoutingDestination(String routingDestination) { + this.routingDestination = routingDestination; + } + + public String getRoutingExpression() { + return routingExpression; + } + + public void setRoutingExpression(String routingExpression) { + this.routingExpression = routingExpression; + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index cc1a5157..93e60a73 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -17,7 +17,7 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; -import io.openmessaging.PropertyKeys; +import io.openmessaging.Message; import io.openmessaging.ServiceLifecycle; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; @@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.ThreadFactoryImpl; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.utils.ThreadUtils; +import org.apache.rocketmq.logging.InternalLogger; class LocalMessageCache implements ServiceLifecycle { private final BlockingQueue consumeRequestCache; @@ -91,13 +91,13 @@ class LocalMessageCache implements ServiceLifecycle { } MessageExt poll() { - return poll(clientConfig.getOmsOperationTimeout()); + return poll(clientConfig.getOperationTimeout()); } MessageExt poll(final KeyValue properties) { - int currentPollTimeout = clientConfig.getOmsOperationTimeout(); - if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { - currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + int currentPollTimeout = clientConfig.getOperationTimeout(); + if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { + currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); } return poll(currentPollTimeout); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index da4afdb7..2e22509a 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.Message; -import io.openmessaging.PropertyKeys; -import io.openmessaging.PullConsumer; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; @@ -34,28 +34,25 @@ import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer; private final KeyValue properties; private boolean started = false; - private String targetQueueName; private final MQPullConsumerScheduleService pullConsumerScheduleService; private final LocalMessageCache localMessageCache; private final ClientConfig clientConfig; final static InternalLogger log = ClientLogger.getLog(); - public PullConsumerImpl(final String queueName, final KeyValue properties) { + public PullConsumerImpl(final KeyValue properties) { this.properties = properties; - this.targetQueueName = queueName; - this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String consumerGroup = clientConfig.getRmqConsumerGroup(); + String consumerGroup = clientConfig.getConsumerId(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } @@ -63,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } @@ -76,24 +73,42 @@ public class PullConsumerImpl implements PullConsumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); - properties.put(PropertyKeys.CONSUMER_ID, consumerId); + properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); } @Override - public KeyValue properties() { + public KeyValue attributes() { return properties; } @Override - public Message poll() { + public PullConsumer attachQueue(String queueName) { + registerPullTaskCallback(queueName); + return this; + } + + @Override + public PullConsumer attachQueue(String queueName, KeyValue attributes) { + registerPullTaskCallback(queueName); + return this; + } + + @Override + public PullConsumer detachQueue(String queueName) { + this.rocketmqPullConsumer.getRegisterTopics().remove(queueName); + return this; + } + + @Override + public Message receive() { MessageExt rmqMsg = localMessageCache.poll(); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override - public Message poll(final KeyValue properties) { + public Message receive(final KeyValue properties) { MessageExt rmqMsg = localMessageCache.poll(properties); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @@ -112,7 +127,6 @@ public class PullConsumerImpl implements PullConsumer { public synchronized void startup() { if (!started) { try { - registerPullTaskCallback(); this.pullConsumerScheduleService.start(); this.localMessageCache.startup(); } catch (MQClientException e) { @@ -122,7 +136,7 @@ public class PullConsumerImpl implements PullConsumer { this.started = true; } - private void registerPullTaskCallback() { + private void registerPullTaskCallback(final String targetQueueName) { this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { @Override public void doPullTask(final MessageQueue mq, final PullTaskContext context) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index f9b8058e..9bfd7c8b 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -18,12 +18,12 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.MessageListener; import io.openmessaging.OMS; -import io.openmessaging.PropertyKeys; -import io.openmessaging.PushConsumer; -import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.MessageListener; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.interceptor.ConsumerInterceptor; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; @@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer { this.properties = properties; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); - String consumerGroup = clientConfig.getRmqConsumerGroup(); + String consumerGroup = clientConfig.getConsumerId(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } @@ -70,13 +70,13 @@ public class PushConsumerImpl implements PushConsumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); - properties.put(PropertyKeys.CONSUMER_ID, consumerId); + properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); } @Override - public KeyValue properties() { + public KeyValue attributes() { return properties; } @@ -90,6 +90,11 @@ public class PushConsumerImpl implements PushConsumer { this.rocketmqPushConsumer.suspend(); } + @Override + public void suspend(long timeout) { + + } + @Override public boolean isSuspended() { return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); @@ -106,6 +111,32 @@ public class PushConsumerImpl implements PushConsumer { return this; } + @Override + public PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes) { + return this.attachQueue(queueName, listener); + } + + @Override + public PushConsumer detachQueue(String queueName) { + this.subscribeTable.remove(queueName); + try { + this.rocketmqPushConsumer.unsubscribe(queueName); + } catch (Exception e) { + throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName)); + } + return null; + } + + @Override + public void addInterceptor(ConsumerInterceptor interceptor) { + + } + + @Override + public void removeInterceptor(ConsumerInterceptor interceptor) { + + } + @Override public synchronized void startup() { if (!started) { @@ -146,9 +177,9 @@ public class PushConsumerImpl implements PushConsumer { contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); - ReceivedMessageContext context = new ReceivedMessageContext() { + MessageListener.Context context = new MessageListener.Context() { @Override - public KeyValue properties() { + public KeyValue attributes() { return contextProperties; } @@ -158,16 +189,9 @@ public class PushConsumerImpl implements PushConsumer { contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); } - - @Override - public void ack(final KeyValue properties) { - sync.countDown(); - contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); - } }; long begin = System.currentTimeMillis(); - listener.onMessage(omsMsg, context); + listener.onReceived(omsMsg, context); long costs = System.currentTimeMillis() - begin; long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; try { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java index 43f80ae5..702d561f 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -23,13 +23,13 @@ import io.openmessaging.OMS; import org.apache.commons.lang3.builder.ToStringBuilder; public class BytesMessageImpl implements BytesMessage { - private KeyValue headers; - private KeyValue properties; + private KeyValue sysHeaders; + private KeyValue userHeaders; private byte[] body; public BytesMessageImpl() { - this.headers = OMS.newKeyValue(); - this.properties = OMS.newKeyValue(); + this.sysHeaders = OMS.newKeyValue(); + this.userHeaders = OMS.newKeyValue(); } @Override @@ -44,60 +44,60 @@ public class BytesMessageImpl implements BytesMessage { } @Override - public KeyValue headers() { - return headers; + public KeyValue sysHeaders() { + return sysHeaders; } @Override - public KeyValue properties() { - return properties; + public KeyValue userHeaders() { + return userHeaders; } @Override - public Message putHeaders(final String key, final int value) { - headers.put(key, value); + public Message putSysHeaders(String key, int value) { + sysHeaders.put(key, value); return this; } @Override - public Message putHeaders(final String key, final long value) { - headers.put(key, value); + public Message putSysHeaders(String key, long value) { + sysHeaders.put(key, value); return this; } @Override - public Message putHeaders(final String key, final double value) { - headers.put(key, value); + public Message putSysHeaders(String key, double value) { + sysHeaders.put(key, value); return this; } @Override - public Message putHeaders(final String key, final String value) { - headers.put(key, value); + public Message putSysHeaders(String key, String value) { + sysHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final int value) { - properties.put(key, value); + public Message putUserHeaders(String key, int value) { + userHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final long value) { - properties.put(key, value); + public Message putUserHeaders(String key, long value) { + userHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final double value) { - properties.put(key, value); + public Message putUserHeaders(String key, double value) { + userHeaders.put(key, value); return this; } @Override - public Message putProperties(final String key, final String value) { - properties.put(key, value); + public Message putUserHeaders(String key, String value) { + userHeaders.put(key, value); return this; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java new file mode 100644 index 00000000..4c6568ae --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/RocketMQConstants.java @@ -0,0 +1,7 @@ +package io.openmessaging.rocketmq.domain; + +public interface RocketMQConstants { + + String START_DELIVER_TIME = "__STARTDELIVERTIME"; + +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java index 228a9f0b..85bcd685 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/SendResultImpl.java @@ -17,7 +17,7 @@ package io.openmessaging.rocketmq.domain; import io.openmessaging.KeyValue; -import io.openmessaging.SendResult; +import io.openmessaging.producer.SendResult; public class SendResultImpl implements SendResult { private String messageId; @@ -33,7 +33,6 @@ public class SendResultImpl implements SendResult { return messageId; } - @Override public KeyValue properties() { return properties; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index db25fc65..f7337566 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -20,8 +20,7 @@ import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; import io.openmessaging.Message; import io.openmessaging.MessageFactory; -import io.openmessaging.MessageHeader; -import io.openmessaging.PropertyKeys; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.ServiceLifecycle; import io.openmessaging.exception.OMSMessageFormatException; import io.openmessaging.exception.OMSNotSupportedException; @@ -53,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer = new DefaultMQProducer(); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } @@ -61,10 +60,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); String producerId = buildInstanceName(); - this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); - properties.put(PropertyKeys.PRODUCER_ID, producerId); + properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); } @Override @@ -121,18 +120,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { } @Override - public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) { - BytesMessage bytesMessage = new BytesMessageImpl(); - bytesMessage.setBody(body); - bytesMessage.headers().put(MessageHeader.TOPIC, topic); - return bytesMessage; - } - - @Override - public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) { - BytesMessage bytesMessage = new BytesMessageImpl(); - bytesMessage.setBody(body); - bytesMessage.headers().put(MessageHeader.QUEUE, queue); - return bytesMessage; + public BytesMessage createBytesMessage(String queue, byte[] body) { + BytesMessage message = new BytesMessageImpl(); + message.setBody(body); + message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue); + return message; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index 2c00c60e..c2b6d3e3 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -19,12 +19,13 @@ package io.openmessaging.rocketmq.producer; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.Producer; import io.openmessaging.Promise; -import io.openmessaging.PropertyKeys; -import io.openmessaging.SendResult; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.producer.BatchMessageSender; +import io.openmessaging.producer.LocalTransactionExecutor; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.promise.DefaultPromise; import io.openmessaging.rocketmq.utils.OMSUtil; import org.apache.rocketmq.client.producer.SendCallback; @@ -39,7 +40,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { } @Override - public KeyValue properties() { + public KeyValue attributes() { return properties; } @@ -50,11 +51,16 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public SendResult send(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) - ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) + ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return send(message, timeout); } + @Override + public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) { + return null; + } + private SendResult send(final Message message, long timeout) { checkMessageType(message); org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message); @@ -64,11 +70,11 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { log.error(String.format("Send message to RocketMQ failed, %s", message)); throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed."); } - message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); return OMSUtil.sendResultConvert(rmqResult); } catch (Exception e) { log.error(String.format("Send message to RocketMQ failed, %s", message), e); - throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); + throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e); } } @@ -79,8 +85,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public Promise sendAsync(final Message message, final KeyValue properties) { - long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT) - ? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); + long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT) + ? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout(); return sendAsync(message, timeout); } @@ -92,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { this.rocketmqProducer.send(rmqMessage, new SendCallback() { @Override public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { - message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId()); + message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId()); promise.set(OMSUtil.sendResultConvert(rmqResult)); } @@ -121,4 +127,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { public void sendOneway(final Message message, final KeyValue properties) { sendOneway(message); } + + @Override + public BatchMessageSender createBatchMessageSender() { + return null; + } + + @Override + public void addInterceptor(ProducerInterceptor interceptor) { + + } + + @Override + public void removeInterceptor(ProducerInterceptor interceptor) { + + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java deleted file mode 100644 index 05225cc5..00000000 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.rocketmq.producer; - -import io.openmessaging.BytesMessage; -import io.openmessaging.KeyValue; -import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.SequenceProducer; -import io.openmessaging.rocketmq.utils.OMSUtil; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.rocketmq.client.Validators; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.SendResult; - -public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer { - - private BlockingQueue msgCacheQueue; - - public SequenceProducerImpl(final KeyValue properties) { - super(properties); - this.msgCacheQueue = new LinkedBlockingQueue<>(); - } - - @Override - public KeyValue properties() { - return properties; - } - - @Override - public void send(final Message message) { - checkMessageType(message); - org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message); - try { - Validators.checkMessage(rmqMessage, this.rocketmqProducer); - } catch (MQClientException e) { - throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e); - } - msgCacheQueue.add(message); - } - - @Override - public void send(final Message message, final KeyValue properties) { - send(message); - } - - @Override - public synchronized void commit() { - List messages = new ArrayList<>(); - msgCacheQueue.drainTo(messages); - - List rmqMessages = new ArrayList<>(); - - for (Message message : messages) { - rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message)); - } - - if (rmqMessages.size() == 0) { - return; - } - - try { - SendResult sendResult = this.rocketmqProducer.send(rmqMessages); - String[] msgIdArray = sendResult.getMsgId().split(","); - for (int i = 0; i < messages.size(); i++) { - Message message = messages.get(i); - message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]); - } - } catch (Exception e) { - throw checkProducerException("", "", e); - } - } - - @Override - public synchronized void rollback() { - msgCacheQueue.clear(); - } -} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java index 453b665c..c1b59993 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/promise/DefaultPromise.java @@ -17,7 +17,7 @@ package io.openmessaging.rocketmq.promise; import io.openmessaging.Promise; -import io.openmessaging.PromiseListener; +import io.openmessaging.FutureListener; import io.openmessaging.exception.OMSRuntimeException; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -33,7 +33,7 @@ public class DefaultPromise implements Promise { private long timeout; private long createTime; private Throwable exception = null; - private List> promiseListenerList; + private List> promiseListenerList; public DefaultPromise() { createTime = System.currentTimeMillis(); @@ -121,7 +121,7 @@ public class DefaultPromise implements Promise { } @Override - public void addListener(final PromiseListener listener) { + public void addListener(final FutureListener listener) { if (listener == null) { throw new NullPointerException("FutureListener is null"); } @@ -150,7 +150,7 @@ public class DefaultPromise implements Promise { private void notifyListeners() { if (promiseListenerList != null) { - for (PromiseListener listener : promiseListenerList) { + for (FutureListener listener : promiseListenerList) { notifyListener(listener); } } @@ -199,12 +199,9 @@ public class DefaultPromise implements Promise { return true; } - private void notifyListener(final PromiseListener listener) { + private void notifyListener(final FutureListener listener) { try { - if (exception != null) - listener.operationFailed(this); - else - listener.operationCompleted(this); + listener.operationComplete(this); } catch (Throwable t) { LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java index ba7cd597..ef9236f0 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -164,7 +164,7 @@ public final class BeanUtils { final Set keySet = properties.keySet(); for (String key : keySet) { - String[] keyGroup = key.split("\\."); + String[] keyGroup = key.split("[\\._]"); for (int i = 0; i < keyGroup.length; i++) { keyGroup[i] = keyGroup[i].toLowerCase(); keyGroup[i] = StringUtils.capitalize(keyGroup[i]); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 60c84081..23021413 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -18,11 +18,11 @@ package io.openmessaging.rocketmq.utils; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; -import io.openmessaging.MessageHeader; +import io.openmessaging.Message.BuiltinKeys; import io.openmessaging.OMS; -import io.openmessaging.SendResult; +import io.openmessaging.producer.SendResult; import io.openmessaging.rocketmq.domain.BytesMessageImpl; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.domain.RocketMQConstants; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; import java.util.Iterator; @@ -48,25 +48,26 @@ public class OMSUtil { org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message(); rmqMessage.setBody(omsMessage.getBody()); - KeyValue headers = omsMessage.headers(); - KeyValue properties = omsMessage.properties(); + KeyValue sysHeaders = omsMessage.sysHeaders(); + KeyValue userHeaders = omsMessage.userHeaders(); //All destinations in RocketMQ use Topic - if (headers.containsKey(MessageHeader.TOPIC)) { - rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); - rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); - } else { - rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); - rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); + rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION)); + + if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) { + long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0); + if (deliverTime > 0) { + rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); + } } - for (String key : properties.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); + for (String key : userHeaders.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); } - //Headers has a high priority - for (String key : headers.keySet()) { - MessageAccessor.putProperty(rmqMessage, key, headers.getString(key)); + //System headers has a high priority + for (String key : sysHeaders.keySet()) { + MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key)); } return rmqMessage; @@ -76,8 +77,8 @@ public class OMSUtil { BytesMessage omsMsg = new BytesMessageImpl(); omsMsg.setBody(rmqMsg.getBody()); - KeyValue headers = omsMsg.headers(); - KeyValue properties = omsMsg.properties(); + KeyValue headers = omsMsg.sysHeaders(); + KeyValue properties = omsMsg.userHeaders(); final Set> entries = rmqMsg.getProperties().entrySet(); @@ -89,25 +90,22 @@ public class OMSUtil { } } - omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); - if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || - rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { - omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); - } else { - omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); - } - omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); - omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); - omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); - omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); - omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); + omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId()); + + omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic()); + + omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys()); + omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); + omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); + omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); + omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); return omsMsg; } public static boolean isOMSHeader(String value) { - for (Field field : MessageHeader.class.getDeclaredFields()) { + for (Field field : BuiltinKeys.class.getDeclaredFields()) { try { - if (field.get(MessageHeader.class).equals(value)) { + if (field.get(BuiltinKeys.class).equals(value)) { return true; } } catch (IllegalAccessException e) { diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 843ddb78..da2e8a08 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -18,12 +18,10 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PropertyKeys; -import io.openmessaging.PullConsumer; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.lang.reflect.Field; @@ -50,18 +48,18 @@ public class PullConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = OMS + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - consumer = messagingAccessPoint.createPullConsumer(queueName, - OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); + consumer.attachQueue(queueName); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); field.setAccessible(true); field.set(consumer, rocketmqPullConsumer); //Replace ClientConfig clientConfig = new ClientConfig(); - clientConfig.setOmsOperationTimeout(200); + clientConfig.setOperationTimeout(200); localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig)); field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); @@ -83,18 +81,18 @@ public class PullConsumerImplTest { when(localMessageCache.poll()).thenReturn(consumedMsg); - Message message = consumer.poll(); - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + Message message = consumer.receive(); + assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); } @Test public void testPoll_WithTimeout() { //There is a default timeout value, @see ClientConfig#omsOperationTimeout. - Message message = consumer.poll(); + Message message = consumer.receive(); assertThat(message).isNull(); - message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100)); + message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100)); assertThat(message).isNull(); } } \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 882e57ea..b55816b8 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -18,13 +18,11 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.Message; -import io.openmessaging.MessageHeader; -import io.openmessaging.MessageListener; +import io.openmessaging.OMSBuiltinKeys; +import io.openmessaging.consumer.MessageListener; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; import io.openmessaging.OMS; -import io.openmessaging.PushConsumer; -import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.lang.reflect.Field; import java.util.Collections; @@ -49,10 +47,10 @@ public class PushConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = OMS + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); consumer = messagingAccessPoint.createPushConsumer( - OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); field.setAccessible(true); @@ -75,8 +73,8 @@ public class PushConsumerImplTest { consumedMsg.setTopic("HELLO_QUEUE"); consumer.attachQueue("HELLO_QUEUE", new MessageListener() { @Override - public void onMessage(final Message message, final ReceivedMessageContext context) { - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId"); + public void onReceived(Message message, Context context) { + assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody); context.ack(); } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java index 1db80c3e..fc6515ea 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -17,9 +17,9 @@ package io.openmessaging.rocketmq.producer; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; -import io.openmessaging.Producer; +import io.openmessaging.OMS; import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.producer.Producer; import java.lang.reflect.Field; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -49,8 +49,8 @@ public class ProducerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + final MessagingAccessPoint messagingAccessPoint = OMS + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); producer = messagingAccessPoint.createProducer(); Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); @@ -67,8 +67,8 @@ public class ProducerImplTest { sendResult.setMsgId("TestMsgID"); sendResult.setSendStatus(SendStatus.SEND_OK); when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); - io.openmessaging.SendResult omsResult = - producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + io.openmessaging.producer.SendResult omsResult = + producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); assertThat(omsResult.messageId()).isEqualTo("TestMsgID"); } @@ -80,7 +80,7 @@ public class ProducerImplTest { when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult); try { - producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); failBecauseExceptionWasNotThrown(OMSRuntimeException.class); } catch (Exception e) { assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); @@ -91,7 +91,7 @@ public class ProducerImplTest { public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class); try { - producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'})); + producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'})); failBecauseExceptionWasNotThrown(OMSRuntimeException.class); } catch (Exception e) { assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed."); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java deleted file mode 100644 index 823fe015..00000000 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/SequenceProducerImplTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.rocketmq.producer; - -import io.openmessaging.BytesMessage; -import io.openmessaging.MessageHeader; -import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.MessagingAccessPointFactory; -import io.openmessaging.SequenceProducer; -import java.lang.reflect.Field; -import org.apache.rocketmq.client.exception.MQBrokerException; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.exception.RemotingException; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnitRunner; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.when; - -@RunWith(MockitoJUnitRunner.class) -public class SequenceProducerImplTest { - - private SequenceProducer producer; - - @Mock - private DefaultMQProducer rocketmqProducer; - - @Before - public void init() throws NoSuchFieldException, IllegalAccessException { - final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); - producer = messagingAccessPoint.createSequenceProducer(); - - Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); - field.setAccessible(true); - field.set(producer, rocketmqProducer); - - messagingAccessPoint.startup(); - producer.startup(); - } - - @Test - public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { - SendResult sendResult = new SendResult(); - sendResult.setMsgId("TestMsgID"); - sendResult.setSendStatus(SendStatus.SEND_OK); - when(rocketmqProducer.send(ArgumentMatchers.anyList())).thenReturn(sendResult); - when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); - final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); - producer.send(message); - producer.commit(); - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID"); - } - - @Test - public void testRollback() { - when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024); - final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}); - producer.send(message); - producer.rollback(); - producer.commit(); //Commit nothing. - assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null); - } -} \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java index 2240ff2d..f226edef 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/promise/DefaultPromiseTest.java @@ -16,8 +16,9 @@ */ package io.openmessaging.rocketmq.promise; +import io.openmessaging.Future; +import io.openmessaging.FutureListener; import io.openmessaging.Promise; -import io.openmessaging.PromiseListener; import io.openmessaging.exception.OMSRuntimeException; import org.junit.Before; import org.junit.Test; @@ -63,14 +64,10 @@ public class DefaultPromiseTest { @Test public void testAddListener() throws Exception { - promise.addListener(new PromiseListener() { + promise.addListener(new FutureListener() { @Override - public void operationCompleted(final Promise promise) { + public void operationComplete(Future future) { assertThat(promise.get()).isEqualTo("Done"); - } - - @Override - public void operationFailed(final Promise promise) { } }); @@ -80,15 +77,10 @@ public class DefaultPromiseTest { @Test public void testAddListener_ListenerAfterSet() throws Exception { promise.set("Done"); - promise.addListener(new PromiseListener() { - @Override - public void operationCompleted(final Promise promise) { - assertThat(promise.get()).isEqualTo("Done"); - } - + promise.addListener(new FutureListener() { @Override - public void operationFailed(final Promise promise) { - + public void operationComplete(Future future) { + assertThat(future.get()).isEqualTo("Done"); } }); } @@ -97,13 +89,9 @@ public class DefaultPromiseTest { public void testAddListener_WithException_ListenerAfterSet() throws Exception { final Throwable exception = new OMSRuntimeException("-1", "Test Error"); promise.setFailure(exception); - promise.addListener(new PromiseListener() { - @Override - public void operationCompleted(final Promise promise) { - } - + promise.addListener(new FutureListener() { @Override - public void operationFailed(final Promise promise) { + public void operationComplete(Future future) { assertThat(promise.getThrowable()).isEqualTo(exception); } }); @@ -112,13 +100,9 @@ public class DefaultPromiseTest { @Test public void testAddListener_WithException() throws Exception { final Throwable exception = new OMSRuntimeException("-1", "Test Error"); - promise.addListener(new PromiseListener() { - @Override - public void operationCompleted(final Promise promise) { - } - + promise.addListener(new FutureListener() { @Override - public void operationFailed(final Promise promise) { + public void operationComplete(Future future) { assertThat(promise.getThrowable()).isEqualTo(exception); } }); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java index 71ca11cc..1a431d98 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java @@ -92,9 +92,9 @@ public class BeanUtilsTest { @Test public void testPopulate_ExistObj() { CustomizedConfig config = new CustomizedConfig(); - config.setOmsConsumerId("NewConsumerId"); + config.setConsumerId("NewConsumerId"); - Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); + Assert.assertEquals(config.getConsumerId(), "NewConsumerId"); config = BeanUtils.populate(properties, config); diff --git a/pom.xml b/pom.xml index 6737ae41..f4184a5f 100644 --- a/pom.xml +++ b/pom.xml @@ -592,7 +592,7 @@ io.openmessaging openmessaging-api - 0.1.0-alpha + 0.3.0-alpha-SNAPSHOT log4j -- GitLab