提交 30abd95e 编写于 作者: S shutian.lzh

Fix consume examples

上级 7b0b9d15
...@@ -19,16 +19,16 @@ package org.apache.rocketmq.example.openmessaging; ...@@ -19,16 +19,16 @@ package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message; import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer; import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePullConsumer { public class SimplePullConsumer {
public static void main(String[] args) { public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer( final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup(); messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n"); System.out.printf("MessagingAccessPoint startup OK%n");
......
...@@ -19,17 +19,17 @@ package org.apache.rocketmq.example.openmessaging; ...@@ -19,17 +19,17 @@ package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message; import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer; import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePushConsumer { public class SimplePushConsumer {
public static void main(String[] args) { public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PushConsumer consumer = messagingAccessPoint. final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup(); messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n"); System.out.printf("MessagingAccessPoint startup OK%n");
......
...@@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer {
this.properties = properties; this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String consumerGroup = clientConfig.getRmqConsumerGroup(); String consumerGroup = clientConfig.getOmsConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
} }
......
...@@ -58,7 +58,7 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -58,7 +58,7 @@ public class PushConsumerImpl implements PushConsumer {
} }
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
String consumerGroup = clientConfig.getRmqConsumerGroup(); String consumerGroup = clientConfig.getOmsConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册