diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 4ddf50fc2bd5d6b7d5db5666ac7944c663e643b8..86aba410aee4013fb86ec446da5c1cea37ef1eb9 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -21,38 +21,56 @@ import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.producer.Producer; +import io.openmessaging.producer.SendResult; public class SimplePullConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); + messagingAccessPoint.startup(); + + final Producer producer = messagingAccessPoint.createProducer(); + final PullConsumer consumer = messagingAccessPoint.createPullConsumer( OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - consumer.shutdown(); - messagingAccessPoint.shutdown(); - } - })); + final String queueName = "TopicTest"; - consumer.attachQueue("OMS_HELLO_TOPIC"); + producer.startup(); + Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes()); + SendResult sendResult = producer.send(msg); + System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId()); + producer.shutdown(); + + consumer.attachQueue(queueName); consumer.startup(); System.out.printf("Consumer startup OK%n"); - while (true) { + // Keep running until we find the one that has just been sent + boolean stop = false; + while (!stop) { Message message = consumer.receive(); if (message != null) { String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); System.out.printf("Received one message: %s%n", msgId); consumer.ack(msgId); + + if (!stop) { + stop = msgId.equalsIgnoreCase(sendResult.messageId()); + } + + } else { + System.out.printf("Return without any message%n"); } } + + consumer.shutdown(); + messagingAccessPoint.shutdown(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index 5751d22c7f1d1525eddee97543836232dad0bdcb..7b504dd2a3e07595abbeb1043e9b6c28b7d6345e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -29,7 +29,7 @@ public class Producer { producer.start(); - for (int i = 0; i < 10000000; i++) + for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest", diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java index 151628f9a80a59487383756dd4b711ca7ba76c30..8cfdd9bbd0c692e1aeb072968f5f211344026a41 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PullScheduleService.java @@ -32,7 +32,7 @@ public class PullScheduleService { final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); scheduleService.setMessageModel(MessageModel.CLUSTERING); - scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { + scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() { @Override public void doPullTask(MessageQueue mq, PullTaskContext context) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 225b09e47a6a0cf7e6946bba851fbb2ee3ff18bb..2e22509a2a1c5e16807389f4e11fd7cb673c78ab 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.log.ClientLogger; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.logging.InternalLogger; public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer;