From a5ea4e45bfa0b877bfc4476c2e8f23186d82a177 Mon Sep 17 00:00:00 2001 From: yukon Date: Mon, 17 Apr 2017 18:05:04 +0800 Subject: [PATCH] Add PushConsumer related implementation for OpenMessaging. --- .../example/openmessaging/SimpleProducer.java | 6 +- .../openmessaging/SimplePushConsumer.java | 58 ++++++ .../rocketmq/MessagingAccessPointImpl.java | 15 +- .../io/openmessaging/rocketmq/OMSUtil.java | 60 +++++- .../rocketmq/consumer/PullConsumerImpl.java | 62 ++++++ .../rocketmq/consumer/PushConsumerImpl.java | 188 ++++++++++++++++++ .../rocketmq/domain/BytesMessageImpl.java | 6 + .../rocketmq/domain/NonStandardKeys.java | 10 +- .../producer/AbstractOMSProducer.java | 15 +- pom.xml | 2 +- 10 files changed, 402 insertions(+), 20 deletions(-) create mode 100644 example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index 5a27f5a0..f89ae4c7 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -47,14 +47,14 @@ public class SimpleProducer { })); { - Message message = producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8"))); + Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))); SendResult sendResult = producer.send(message); //final Void aVoid = result.get(3000L); System.out.println("send async message OK, msgId: " + sendResult.messageId()); } { - final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + final Promise result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); result.addListener(new PromiseListener() { @Override public void operationCompleted(Promise promise) { System.out.println("Send async message OK, msgId: " + promise.get().messageId()); @@ -67,7 +67,7 @@ public class SimpleProducer { } { - producer.sendOneway(producer.createBytesMessageToTopic("HELLO_TOPIC", "HELLO_BODY".getBytes(Charset.forName("UTF-8")))); + producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.println("Send oneway message OK"); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java new file mode 100644 index 00000000..6fc8e39d --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessageListener; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class SimplePushConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); + + final PushConsumer consumer = messagingAccessPoint. + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.println("messagingAccessPoint startup OK"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + @Override + public void onMessage(final Message message, final ReceivedMessageContext context) { + System.out.println("Received one message: " + message); + context.ack(); + } + }); + + consumer.startup(); + System.out.println("consumer startup OK"); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index c30b7d41..fecd69f9 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -25,7 +25,10 @@ import io.openmessaging.PushConsumer; import io.openmessaging.ResourceManager; import io.openmessaging.SequenceProducer; import io.openmessaging.ServiceEndPoint; +import io.openmessaging.exception.OMSNotSupportedException; import io.openmessaging.observer.Observer; +import io.openmessaging.rocketmq.consumer.PullConsumerImpl; +import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.SequenceProducerImpl; @@ -63,32 +66,32 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { @Override public PushConsumer createPushConsumer() { - return null; + return new PushConsumerImpl(accessPointProperties); } @Override public PushConsumer createPushConsumer(KeyValue properties) { - return null; + return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } @Override public PullConsumer createPullConsumer(String queueName) { - return null; + return new PullConsumerImpl(accessPointProperties); } @Override public PullConsumer createPullConsumer(String queueName, KeyValue properties) { - return null; + return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } @Override public IterableConsumer createIterableConsumer(String queueName) { - return null; + throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in RocketMQ"); } @Override public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) { - return null; + throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in RocketMQ"); } @Override diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java index 061ee6b9..dd591a6a 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java @@ -21,7 +21,12 @@ import io.openmessaging.KeyValue; import io.openmessaging.MessageHeader; import io.openmessaging.OMS; import io.openmessaging.SendResult; +import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.SendResultImpl; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.Set; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; @@ -45,8 +50,13 @@ public class OMSUtil { KeyValue properties = omsMessage.properties(); //All destinations in RocketMQ use Topic - rmqMessage.setTopic(headers.containsKey(MessageHeader.TOPIC) - ? headers.getString(MessageHeader.TOPIC) : headers.getString(MessageHeader.QUEUE)); + if (headers.containsKey(MessageHeader.TOPIC)) { + rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + } else { + rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE)); + rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE"); + } for (String key : properties.keySet()) { MessageAccessor.putProperty(rmqMessage, key, properties.getString(key)); @@ -60,6 +70,50 @@ public class OMSUtil { return rmqMessage; } + public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) { + BytesMessage omsMsg = new BytesMessageImpl(); + omsMsg.setBody(rmqMsg.getBody()); + + KeyValue headers = omsMsg.headers(); + KeyValue properties = omsMsg.properties(); + + final Set> entries = rmqMsg.getProperties().entrySet(); + + for (final Map.Entry entry : entries) { + if (isOMSHeader(entry.getKey())) { + headers.put(entry.getKey(), entry.getValue()); + } else { + properties.put(entry.getKey(), entry.getValue()); + } + } + + omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); + if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { + omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); + } else { + omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); + } + omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys()); + omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost())); + omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp()); + omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost())); + omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp()); + return omsMsg; + } + + public static boolean isOMSHeader(String value) { + for (Field field : MessageHeader.class.getDeclaredFields()) { + try { + if (field.get(MessageHeader.class).equals(value)) { + return true; + } + } catch (IllegalAccessException e) { + return false; + } + } + return false; + } + /** * Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult. */ @@ -68,7 +122,7 @@ public class OMSUtil { return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue()); } - public static KeyValue buildKeyValue(KeyValue ... keyValues) { + public static KeyValue buildKeyValue(KeyValue... keyValues) { KeyValue keyValue = OMS.newKeyValue(); for (KeyValue properties : keyValues) { for (String key : properties.keySet()) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java new file mode 100644 index 00000000..6730b1f3 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.Message; +import io.openmessaging.PullConsumer; + +public class PullConsumerImpl implements PullConsumer { + public PullConsumerImpl(final KeyValue properties) { + + } + + @Override + public KeyValue properties() { + return null; + } + + @Override + public Message poll() { + return null; + } + + @Override + public Message poll(final KeyValue properties) { + return null; + } + + @Override + public void ack(final String messageId) { + + } + + @Override + public void ack(final String messageId, final KeyValue properties) { + + } + + @Override + public void startup() { + + } + + @Override + public void shutdown() { + + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java new file mode 100644 index 00000000..cd83212c --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.BytesMessage; +import io.openmessaging.KeyValue; +import io.openmessaging.MessageListener; +import io.openmessaging.OMS; +import io.openmessaging.PropertyKeys; +import io.openmessaging.PushConsumer; +import io.openmessaging.ReceivedMessageContext; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; + +public class PushConsumerImpl implements PushConsumer { + private final DefaultMQPushConsumer rocketmqPushConsumer; + private final KeyValue properties; + private boolean started = false; + private final Map subscribeTable = new ConcurrentHashMap<>(); + + + public PushConsumerImpl(final KeyValue properties) { + this.rocketmqPushConsumer = new DefaultMQPushConsumer(); + this.properties = properties; + + String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); + + String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); + if (null == consumerGroup || consumerGroup.isEmpty()) { + throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); + } + this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); + + int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); + if (maxReDeliveryTimes != 0) { + this.rocketmqPushConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); + } + + int messageConsumeTimeout = properties.getInt(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT); + if (messageConsumeTimeout != 0) { + this.rocketmqPushConsumer.setConsumeTimeout(messageConsumeTimeout); + } + + int maxConsumeThreadNums = properties.getInt(NonStandardKeys.MAX_CONSUME_THREAD_NUMS); + if (maxConsumeThreadNums != 0) { + this.rocketmqPushConsumer.setConsumeThreadMax(maxConsumeThreadNums); + } + + int minConsumeThreadNums = properties.getInt(NonStandardKeys.MIN_CONSUME_THREAD_NUMS); + if (minConsumeThreadNums != 0) { + this.rocketmqPushConsumer.setConsumeThreadMin(minConsumeThreadNums); + } + + String consumerId = OMSUtil.buildInstanceName(); + this.rocketmqPushConsumer.setInstanceName(consumerId); + properties.put(PropertyKeys.CONSUMER_ID, consumerId); + + this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); + } + + @Override + public KeyValue properties() { + return properties; + } + + @Override + public void resume() { + this.rocketmqPushConsumer.resume(); + } + + @Override + public void suspend() { + this.rocketmqPushConsumer.suspend(); + } + + @Override + public boolean isSuspended() { + return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); + } + + @Override + public PushConsumer attachQueue(final String queueName, final MessageListener listener) { + this.subscribeTable.put(queueName, listener); + try { + this.rocketmqPushConsumer.subscribe(queueName, "*"); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName)); + } + return this; + } + + @Override + public synchronized void startup() { + if (!started) { + try { + this.rocketmqPushConsumer.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + @Override + public synchronized void shutdown() { + if (this.started) { + this.rocketmqPushConsumer.shutdown(); + } + this.started = false; + } + + class MessageListenerImpl implements MessageListenerConcurrently { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, ConsumeConcurrentlyContext contextRMQ) { + MessageExt rmqMsg = rmqMsgList.get(0); + BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); + + MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic()); + + if (listener == null) { + throw new OMSRuntimeException("-1", + String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); + } + + final KeyValue contextProperties = OMS.newKeyValue(); + final CountDownLatch sync = new CountDownLatch(1); + + ReceivedMessageContext context = new ReceivedMessageContext() { + @Override + public KeyValue properties() { + return contextProperties; + } + + @Override + public void ack() { + sync.countDown(); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + } + + @Override + public void ack(final KeyValue properties) { + sync.countDown(); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, + properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); + } + }; + listener.onMessage(omsMsg, context); + try { + sync.await(PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ignore) { + } + + return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); + } + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java index 8140fe2e..43f80ae5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue; import io.openmessaging.Message; import io.openmessaging.OMS; +import org.apache.commons.lang3.builder.ToStringBuilder; public class BytesMessageImpl implements BytesMessage { private KeyValue headers; @@ -99,4 +100,9 @@ public class BytesMessageImpl implements BytesMessage { properties.put(key, value); return this; } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java index cf83cfdc..566a17d4 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -16,5 +16,13 @@ */ package io.openmessaging.rocketmq.domain; -public class NonStandardKeys { +public interface NonStandardKeys { + String CONSUMER_GROUP = "rmq.consumer.group"; + String PRODUCER_GROUP = "rmq.producer.group"; + String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times"; + String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout"; + String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums"; + String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums"; + String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status"; + String MESSAGE_DESTINATION = "rmq.message.destination"; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 9eb735d6..32d65cd2 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -28,6 +28,7 @@ import io.openmessaging.exception.OMSNotSupportedException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.NonStandardKeys; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; @@ -50,20 +51,22 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ this.rocketmqProducer = new DefaultMQProducer(); String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); - if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } - String producerId = buildInstanceName(); + this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); - int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + String producerGroup = properties.getString(NonStandardKeys.PRODUCER_GROUP); + if (producerGroup == null || producerGroup.isEmpty()) { + producerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; + } + this.rocketmqProducer.setProducerGroup(producerGroup); - this.rocketmqProducer.setProducerGroup(producerId); + String producerId = buildInstanceName(); + int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout); this.rocketmqProducer.setInstanceName(producerId); - this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); - properties.put(PropertyKeys.PRODUCER_ID, producerId); } diff --git a/pom.xml b/pom.xml index 865e9f9a..a69b2c6e 100644 --- a/pom.xml +++ b/pom.xml @@ -607,7 +607,7 @@ io.openmessaging openmessaging-api - 0.1.0-beta + 0.1.0-alpha -- GitLab