diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 8dd7b2369755f7074be6adb2e25018a6a1322bb4..86cb696e05f9688f471b45ab88128d0dd691baa3 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -48,9 +48,11 @@ public class SimplePullConsumer { while (true) { Message message = consumer.poll(); - String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); - System.out.println("Received one message: " + msgId); - consumer.ack(msgId); + if (message != null) { + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.println("Received one message: " + msgId); + consumer.ack(msgId); + } } } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..fbca21a283c5051f5f40adc9a937b49b025df028 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/ClientConfig.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq; + +import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class ClientConfig implements PropertyKeys, NonStandardKeys { + private String omsDriverImpl; + private String omsAccessPoints; + private String omsNamespace; + private String omsProducerId; + private String omsConsumerId; + private int omsOperationTimeout = 5000; + private String omsRoutingName; + private String omsOperatorName; + private String omsDstQueue; + private String omsSrcTopic; + private String rmqConsumerGroup; + private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; + private int rmqMaxRedeliveryTimes = 16; + private int rmqMessageConsumeTimeout = 15; //In minutes + private int rmqMaxConsumeThreadNums = 64; + private int rmqMinConsumeThreadNums = 20; + private String rmqMessageDestination; + private int rmqPullMessageBatchNums = 32; + private int rmqPullMessageCacheCapacity = 1000; + + public String getOmsDriverImpl() { + return omsDriverImpl; + } + + public void setOmsDriverImpl(final String omsDriverImpl) { + this.omsDriverImpl = omsDriverImpl; + } + + public String getOmsAccessPoints() { + return omsAccessPoints; + } + + public void setOmsAccessPoints(final String omsAccessPoints) { + this.omsAccessPoints = omsAccessPoints; + } + + public String getOmsNamespace() { + return omsNamespace; + } + + public void setOmsNamespace(final String omsNamespace) { + this.omsNamespace = omsNamespace; + } + + public String getOmsProducerId() { + return omsProducerId; + } + + public void setOmsProducerId(final String omsProducerId) { + this.omsProducerId = omsProducerId; + } + + public String getOmsConsumerId() { + return omsConsumerId; + } + + public void setOmsConsumerId(final String omsConsumerId) { + this.omsConsumerId = omsConsumerId; + } + + public int getOmsOperationTimeout() { + return omsOperationTimeout; + } + + public void setOmsOperationTimeout(final int omsOperationTimeout) { + this.omsOperationTimeout = omsOperationTimeout; + } + + public String getOmsRoutingName() { + return omsRoutingName; + } + + public void setOmsRoutingName(final String omsRoutingName) { + this.omsRoutingName = omsRoutingName; + } + + public String getOmsOperatorName() { + return omsOperatorName; + } + + public void setOmsOperatorName(final String omsOperatorName) { + this.omsOperatorName = omsOperatorName; + } + + public String getOmsDstQueue() { + return omsDstQueue; + } + + public void setOmsDstQueue(final String omsDstQueue) { + this.omsDstQueue = omsDstQueue; + } + + public String getOmsSrcTopic() { + return omsSrcTopic; + } + + public void setOmsSrcTopic(final String omsSrcTopic) { + this.omsSrcTopic = omsSrcTopic; + } + + public String getRmqConsumerGroup() { + return rmqConsumerGroup; + } + + public void setRmqConsumerGroup(final String rmqConsumerGroup) { + this.rmqConsumerGroup = rmqConsumerGroup; + } + + public String getRmqProducerGroup() { + return rmqProducerGroup; + } + + public void setRmqProducerGroup(final String rmqProducerGroup) { + this.rmqProducerGroup = rmqProducerGroup; + } + + public int getRmqMaxRedeliveryTimes() { + return rmqMaxRedeliveryTimes; + } + + public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) { + this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes; + } + + public int getRmqMessageConsumeTimeout() { + return rmqMessageConsumeTimeout; + } + + public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) { + this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout; + } + + public int getRmqMaxConsumeThreadNums() { + return rmqMaxConsumeThreadNums; + } + + public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) { + this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums; + } + + public int getRmqMinConsumeThreadNums() { + return rmqMinConsumeThreadNums; + } + + public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) { + this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums; + } + + public String getRmqMessageDestination() { + return rmqMessageDestination; + } + + public void setRmqMessageDestination(final String rmqMessageDestination) { + this.rmqMessageDestination = rmqMessageDestination; + } + + public int getRmqPullMessageBatchNums() { + return rmqPullMessageBatchNums; + } + + public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) { + this.rmqPullMessageBatchNums = rmqPullMessageBatchNums; + } + + public int getRmqPullMessageCacheCapacity() { + return rmqPullMessageCacheCapacity; + } + + public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { + this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index af1695b151fb288d8058b788d3912632516e8aee..a897da593b8bc30e954ee7128f851ae4858d20c0 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -31,6 +31,7 @@ import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.SequenceProducerImpl; +import io.openmessaging.rocketmq.utils.OMSUtil; public class MessagingAccessPointImpl implements MessagingAccessPoint { private final KeyValue accessPointProperties; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 968229a978fdf1ede57e6314acfa495a6cc5e47b..0ffd36c0dd6e5731ae369665691d03a57cd90e4c 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; -import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.util.Collections; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -38,32 +38,19 @@ class LocalMessageCache { private final Map consumedRequest; private final ConcurrentHashMap pullOffsetTable; private final DefaultMQPullConsumer rocketmqPullConsumer; - private int pullBatchNums = 32; - private int pollTimeout = -1; + private final ClientConfig clientConfig; private final static Logger log = ClientLogger.getLog(); - LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) { - int cacheCapacity = 1000; - if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) { - cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY); - } - consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity); - - if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) { - pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS); - } - - if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { - pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); - } - + LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) { + consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity()); this.consumedRequest = new ConcurrentHashMap<>(); this.pullOffsetTable = new ConcurrentHashMap<>(); this.rocketmqPullConsumer = rocketmqPullConsumer; + this.clientConfig = clientConfig; } int nextPullBatchNums() { - return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity()); + return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity()); } long nextPullOffset(MessageQueue remoteQueue) { @@ -90,31 +77,25 @@ class LocalMessageCache { } MessageExt poll() { - try { - ConsumeRequest consumeRequest = consumeRequestCache.take(); - consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); - return consumeRequest.getMessageExt(); - } catch (InterruptedException ignore) { - } - return null; + return poll(clientConfig.getOmsOperationTimeout()); } MessageExt poll(final KeyValue properties) { - int currentPollTimeout = pollTimeout; + int currentPollTimeout = clientConfig.getOmsOperationTimeout(); if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); } + return poll(currentPollTimeout); + } - if (currentPollTimeout == -1) { - return poll(); - } - + private MessageExt poll(long timeout) { try { - ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS); - consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); - return consumeRequest.getMessageExt(); + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + if (consumeRequest != null) { + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); + return consumeRequest.getMessageExt(); + } } catch (InterruptedException ignore) { } return null; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index bd33d7811a375f172fc780e65d6a5bbda11b7939..56a49a49dde5c7906756d380660bf7013d9642fe 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -21,9 +21,10 @@ import io.openmessaging.Message; import io.openmessaging.PropertyKeys; import io.openmessaging.PullConsumer; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; @@ -44,6 +45,7 @@ public class PullConsumerImpl implements PullConsumer { private String targetQueueName; private final MQPullConsumerScheduleService pullConsumerScheduleService; private final LocalMessageCache localMessageCache; + private final ClientConfig clientConfig; final static Logger log = ClientLogger.getLog(); @@ -51,7 +53,9 @@ public class PullConsumerImpl implements PullConsumer { this.properties = properties; this.targetQueueName = queueName; - String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); + + String consumerGroup = clientConfig.getRmqConsumerGroup(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } @@ -59,7 +63,7 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); - String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } @@ -67,16 +71,14 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); - int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); - if (maxReDeliveryTimes != 0) { - this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); - } + int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes(); + this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); properties.put(PropertyKeys.CONSUMER_ID, consumerId); - this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties); + this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); } @Override @@ -86,12 +88,14 @@ public class PullConsumerImpl implements PullConsumer { @Override public Message poll() { - return OMSUtil.msgConvert(localMessageCache.poll()); + MessageExt rmqMsg = localMessageCache.poll(); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override public Message poll(final KeyValue properties) { - return OMSUtil.msgConvert(localMessageCache.poll(properties)); + MessageExt rmqMsg = localMessageCache.poll(properties); + return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); } @Override diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 9c3b6a9a69dcaf8cae0d39e15531d70ef540d879..65c8ee006f9f3bdcfb07f541f2ade33679faeae9 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -24,7 +24,9 @@ import io.openmessaging.PropertyKeys; import io.openmessaging.PushConsumer; import io.openmessaging.ReceivedMessageContext; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.ClientConfig; +import io.openmessaging.rocketmq.utils.BeanUtils; +import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.util.List; import java.util.Map; @@ -43,43 +45,29 @@ public class PushConsumerImpl implements PushConsumer { private final KeyValue properties; private boolean started = false; private final Map subscribeTable = new ConcurrentHashMap<>(); + private final ClientConfig clientConfig; public PushConsumerImpl(final KeyValue properties) { this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.properties = properties; + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); - String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); + String consumerGroup = clientConfig.getRmqConsumerGroup(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); - - int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); - if (maxReDeliveryTimes != 0) { - this.rocketmqPushConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); - } - - int messageConsumeTimeout = properties.getInt(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT); - if (messageConsumeTimeout != 0) { - this.rocketmqPushConsumer.setConsumeTimeout(messageConsumeTimeout); - } - - int maxConsumeThreadNums = properties.getInt(NonStandardKeys.MAX_CONSUME_THREAD_NUMS); - if (maxConsumeThreadNums != 0) { - this.rocketmqPushConsumer.setConsumeThreadMax(maxConsumeThreadNums); - } - - int minConsumeThreadNums = properties.getInt(NonStandardKeys.MIN_CONSUME_THREAD_NUMS); - if (minConsumeThreadNums != 0) { - this.rocketmqPushConsumer.setConsumeThreadMin(minConsumeThreadNums); - } + this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes()); + this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout()); + this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums()); + this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums()); String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); @@ -181,10 +169,9 @@ public class PushConsumerImpl implements PushConsumer { long begin = System.currentTimeMillis(); listener.onMessage(omsMsg, context); long costs = System.currentTimeMillis() - begin; - + long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000; try { - sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout() - costs) - , TimeUnit.MILLISECONDS); + sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 32d65cd2ab9ac46087ef8315e721d983b6ff7a59..7de7888389bfc752de54622f3afc9dacc030dc29 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -27,8 +27,9 @@ import io.openmessaging.exception.OMSMessageFormatException; import io.openmessaging.exception.OMSNotSupportedException; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSTimeOutException; +import io.openmessaging.rocketmq.ClientConfig; import io.openmessaging.rocketmq.domain.BytesMessageImpl; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.BeanUtils; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; @@ -38,33 +39,29 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.slf4j.Logger; -import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName; +import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ final static Logger log = ClientLogger.getLog(); final KeyValue properties; final DefaultMQProducer rocketmqProducer; private boolean started = false; + final ClientConfig clientConfig; AbstractOMSProducer(final KeyValue properties) { this.properties = properties; this.rocketmqProducer = new DefaultMQProducer(); + this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + String accessPoints = clientConfig.getOmsAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); - - String producerGroup = properties.getString(NonStandardKeys.PRODUCER_GROUP); - if (producerGroup == null || producerGroup.isEmpty()) { - producerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; - } - this.rocketmqProducer.setProducerGroup(producerGroup); + this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); String producerId = buildInstanceName(); - int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); - this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); properties.put(PropertyKeys.PRODUCER_ID, producerId); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index f5d2f2558ad7907d7de2a764bbfa7005dba050c8..8b2ddd26c9d5b0af46a4801ef2754fc00c149234 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -25,12 +25,12 @@ import io.openmessaging.Promise; import io.openmessaging.PropertyKeys; import io.openmessaging.SendResult; import io.openmessaging.exception.OMSRuntimeException; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.promise.DefaultPromise; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendStatus; -import static io.openmessaging.rocketmq.OMSUtil.msgConvert; +import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert; public class ProducerImpl extends AbstractOMSProducer implements Producer { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java index 89ece2bd65ee0f4eee29dd3c809eef8135d0e196..58b1a122a2711f858384b1eadb4ec83f7f146a5d 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/SequenceProducerImpl.java @@ -21,7 +21,7 @@ import io.openmessaging.KeyValue; import io.openmessaging.Message; import io.openmessaging.MessageHeader; import io.openmessaging.SequenceProducer; -import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSUtil; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..d8eed843b685a3d1ab551aaa9bbb558fd11e24c9 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.utils; + +import io.openmessaging.KeyValue; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.log.ClientLogger; +import org.slf4j.Logger; + +public final class BeanUtils { + final static Logger log = ClientLogger.getLog(); + + /** + * Maps primitive {@code Class}es to their corresponding wrapper {@code Class}. + */ + private static Map, Class> primitiveWrapperMap = new HashMap, Class>(); + + static { + primitiveWrapperMap.put(Boolean.TYPE, Boolean.class); + primitiveWrapperMap.put(Byte.TYPE, Byte.class); + primitiveWrapperMap.put(Character.TYPE, Character.class); + primitiveWrapperMap.put(Short.TYPE, Short.class); + primitiveWrapperMap.put(Integer.TYPE, Integer.class); + primitiveWrapperMap.put(Long.TYPE, Long.class); + primitiveWrapperMap.put(Double.TYPE, Double.class); + primitiveWrapperMap.put(Float.TYPE, Float.class); + primitiveWrapperMap.put(Void.TYPE, Void.TYPE); + } + + private static Map, Class> wrapperMap = new HashMap, Class>(); + + static { + for (final Class primitiveClass : primitiveWrapperMap.keySet()) { + final Class wrapperClass = primitiveWrapperMap.get(primitiveClass); + if (!primitiveClass.equals(wrapperClass)) { + wrapperMap.put(wrapperClass, primitiveClass); + } + } + wrapperMap.put(String.class, String.class); + } + + /** + *

Populate the JavaBeans properties of the specified bean, based on + * the specified name/value pairs. This method uses Java reflection APIs + * to identify corresponding "property setter" method names, and deals + * with setter arguments of type String, boolean, + * int, long, float, and + * double.

+ * + *

The particular setter method to be called for each property is + * determined using the usual JavaBeans introspection mechanisms. Thus, + * you may identify custom setter methods using a BeanInfo class that is + * associated with the class of the bean itself. If no such BeanInfo + * class is available, the standard method name conversion ("set" plus + * the capitalized name of the property in question) is used.

+ * + *

NOTE: It is contrary to the JavaBeans Specification + * to have more than one setter method (with different argument + * signatures) for the same property.

+ * + * @param clazz JavaBean class whose properties are being populated + * @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set + * @param Class type + * @return Class instance + */ + public static T populate(final Properties properties, final Class clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static T populate(final KeyValue properties, final Class clazz) { + T obj = null; + try { + obj = clazz.newInstance(); + return populate(properties, obj); + } catch (Throwable e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static Class getMethodClass(Class clazz, String methodName) { + Method[] methods = clazz.getMethods(); + for (Method method : methods) { + if (method.getName().equalsIgnoreCase(methodName)) { + return method.getParameterTypes()[0]; + } + } + return null; + } + + public static void setProperties(Class clazz, Object obj, String methodName, + Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + Class parameterClass = getMethodClass(clazz, methodName); + Method setterMethod = clazz.getMethod(methodName, parameterClass); + if (parameterClass == Boolean.TYPE) { + setterMethod.invoke(obj, Boolean.valueOf(value.toString())); + } else if (parameterClass == Integer.TYPE) { + setterMethod.invoke(obj, Integer.valueOf(value.toString())); + } else if (parameterClass == Double.TYPE) { + setterMethod.invoke(obj, Double.valueOf(value.toString())); + } else if (parameterClass == Float.TYPE) { + setterMethod.invoke(obj, Float.valueOf(value.toString())); + } else if (parameterClass == Long.TYPE) { + setterMethod.invoke(obj, Long.valueOf(value.toString())); + } else + setterMethod.invoke(obj, value); + } + + public static T populate(final Properties properties, final T obj) { + Class clazz = obj.getClass(); + try { + + Set> entries = properties.entrySet(); + for (Map.Entry entry : entries) { + String entryKey = entry.getKey().toString(); + String[] keyGroup = entryKey.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue()); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } + + public static T populate(final KeyValue properties, final T obj) { + Class clazz = obj.getClass(); + try { + + final Set keySet = properties.keySet(); + for (String key : keySet) { + String[] keyGroup = key.split("\\."); + for (int i = 0; i < keyGroup.length; i++) { + keyGroup[i] = keyGroup[i].toLowerCase(); + keyGroup[i] = StringUtils.capitalize(keyGroup[i]); + } + String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); + try { + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key)); + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { + //ignored... + } + } + } catch (RuntimeException e) { + log.warn("Error occurs !", e); + } + return obj; + } +} + diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java similarity index 99% rename from openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java rename to openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java index 87037ee221acd2b3546c4f2daeecf3eaf097ea81..60c840813cf08021270f5334ff77a9a00081fb24 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.openmessaging.rocketmq; +package io.openmessaging.rocketmq.utils; import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue;