提交 75358894 编写于 作者: S shutian.lzh

Run tests

上级 4fb72cd1
...@@ -21,38 +21,56 @@ import io.openmessaging.MessagingAccessPoint; ...@@ -21,38 +21,56 @@ import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer; import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
public class SimplePullConsumer { public class SimplePullConsumer {
public static void main(String[] args) { public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
messagingAccessPoint.startup();
final Producer producer = messagingAccessPoint.createProducer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer( final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup(); messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n"); System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { final String queueName = "TopicTest";
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC"); producer.startup();
Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
producer.shutdown();
consumer.attachQueue(queueName);
consumer.startup(); consumer.startup();
System.out.printf("Consumer startup OK%n"); System.out.printf("Consumer startup OK%n");
while (true) { // Keep running until we find the one that has just been sent
boolean stop = false;
while (!stop) {
Message message = consumer.receive(); Message message = consumer.receive();
if (message != null) { if (message != null) {
String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId); System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId); consumer.ack(msgId);
if (!stop) {
stop = msgId.equalsIgnoreCase(sendResult.messageId());
}
} else {
System.out.printf("Return without any message%n");
} }
} }
consumer.shutdown();
messagingAccessPoint.shutdown();
} }
} }
...@@ -29,7 +29,7 @@ public class Producer { ...@@ -29,7 +29,7 @@ public class Producer {
producer.start(); producer.start();
for (int i = 0; i < 10000000; i++) for (int i = 0; i < 128; i++)
try { try {
{ {
Message msg = new Message("TopicTest", Message msg = new Message("TopicTest",
......
...@@ -32,7 +32,7 @@ public class PullScheduleService { ...@@ -32,7 +32,7 @@ public class PullScheduleService {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1"); final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() { scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
@Override @Override
public void doPullTask(MessageQueue mq, PullTaskContext context) { public void doPullTask(MessageQueue mq, PullTaskContext context) {
......
...@@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext; ...@@ -34,9 +34,9 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
public class PullConsumerImpl implements PullConsumer { public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer; private final DefaultMQPullConsumer rocketmqPullConsumer;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册