From 753588947b782d1abfbde5998783895e6e7e3ddc Mon Sep 17 00:00:00 2001 From: "shutian.lzh" Date: Thu, 19 Apr 2018 16:44:32 +0800 Subject: [PATCH] Run tests --- .../openmessaging/SimplePullConsumer.java | 36 ++++++++++++++----- .../rocketmq/example/simple/Producer.java | 2 +- .../example/simple/PullScheduleService.java | 2 +- .../rocketmq/consumer/PullConsumerImpl.java | 2 +- 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 4ddf50fc..86aba410 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -21,38 +21,56 @@ import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; public class SimplePullConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); + messagingAccessPoint.startup(); + + final Producer producer = messagingAccessPoint.createProducer(); + final PullConsumer consumer = messagingAccessPoint.createPullConsumer( OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - consumer.shutdown(); - messagingAccessPoint.shutdown(); - } - })); + final String queueName = "TopicTest"; - consumer.attachQueue("OMS_HELLO_TOPIC"); + producer.startup(); + Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes()); + SendResult sendResult = producer.send(msg); + System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId()); + producer.shutdown(); + + consumer.attachQueue(queueName); consumer.startup(); System.out.printf("Consumer startup OK%n"); - while (true) { + // Keep running until we find the one that has just been sent + boolean stop = false; + while (!stop) { Message message = consumer.receive(); if (message != null) { String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); + + if (!stop) { + stop = msgId.equalsIgnoreCase(sendResult.messageId()); + } + + } else { + System.out.printf("Return without any message%n"); } } + + consumer.shutdown(); + messagingAccessPoint.shutdown(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index 5751d22c..7b504dd2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -29,7 +29,7 @@ public class Producer { producer.start(); - for (int i = 0; i < 10000000; i++) + for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java index 151628f9..8cfdd9bb 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java @@ -32,7 +32,7 @@ public class PullScheduleService { final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); scheduleService.setMessageModel(MessageModel.CLUSTERING); - scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { + scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() { @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 225b09e4..2e22509a 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer; -- GitLab