提交 2e3c1b00 编写于 作者: Y yukon

Polish the oms config load mechanism.

上级 625ba077
...@@ -48,9 +48,11 @@ public class SimplePullConsumer { ...@@ -48,9 +48,11 @@ public class SimplePullConsumer {
while (true) { while (true) {
Message message = consumer.poll(); Message message = consumer.poll();
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); if (message != null) {
System.out.println("Received one message: " + msgId); String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
consumer.ack(msgId); System.out.println("Received one message: " + msgId);
consumer.ack(msgId);
}
} }
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq;
import io.openmessaging.PropertyKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class ClientConfig implements PropertyKeys, NonStandardKeys {
private String omsDriverImpl;
private String omsAccessPoints;
private String omsNamespace;
private String omsProducerId;
private String omsConsumerId;
private int omsOperationTimeout = 5000;
private String omsRoutingName;
private String omsOperatorName;
private String omsDstQueue;
private String omsSrcTopic;
private String rmqConsumerGroup;
private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
private int rmqMaxRedeliveryTimes = 16;
private int rmqMessageConsumeTimeout = 15; //In minutes
private int rmqMaxConsumeThreadNums = 64;
private int rmqMinConsumeThreadNums = 20;
private String rmqMessageDestination;
private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000;
public String getOmsDriverImpl() {
return omsDriverImpl;
}
public void setOmsDriverImpl(final String omsDriverImpl) {
this.omsDriverImpl = omsDriverImpl;
}
public String getOmsAccessPoints() {
return omsAccessPoints;
}
public void setOmsAccessPoints(final String omsAccessPoints) {
this.omsAccessPoints = omsAccessPoints;
}
public String getOmsNamespace() {
return omsNamespace;
}
public void setOmsNamespace(final String omsNamespace) {
this.omsNamespace = omsNamespace;
}
public String getOmsProducerId() {
return omsProducerId;
}
public void setOmsProducerId(final String omsProducerId) {
this.omsProducerId = omsProducerId;
}
public String getOmsConsumerId() {
return omsConsumerId;
}
public void setOmsConsumerId(final String omsConsumerId) {
this.omsConsumerId = omsConsumerId;
}
public int getOmsOperationTimeout() {
return omsOperationTimeout;
}
public void setOmsOperationTimeout(final int omsOperationTimeout) {
this.omsOperationTimeout = omsOperationTimeout;
}
public String getOmsRoutingName() {
return omsRoutingName;
}
public void setOmsRoutingName(final String omsRoutingName) {
this.omsRoutingName = omsRoutingName;
}
public String getOmsOperatorName() {
return omsOperatorName;
}
public void setOmsOperatorName(final String omsOperatorName) {
this.omsOperatorName = omsOperatorName;
}
public String getOmsDstQueue() {
return omsDstQueue;
}
public void setOmsDstQueue(final String omsDstQueue) {
this.omsDstQueue = omsDstQueue;
}
public String getOmsSrcTopic() {
return omsSrcTopic;
}
public void setOmsSrcTopic(final String omsSrcTopic) {
this.omsSrcTopic = omsSrcTopic;
}
public String getRmqConsumerGroup() {
return rmqConsumerGroup;
}
public void setRmqConsumerGroup(final String rmqConsumerGroup) {
this.rmqConsumerGroup = rmqConsumerGroup;
}
public String getRmqProducerGroup() {
return rmqProducerGroup;
}
public void setRmqProducerGroup(final String rmqProducerGroup) {
this.rmqProducerGroup = rmqProducerGroup;
}
public int getRmqMaxRedeliveryTimes() {
return rmqMaxRedeliveryTimes;
}
public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) {
this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
}
public int getRmqMessageConsumeTimeout() {
return rmqMessageConsumeTimeout;
}
public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) {
this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
}
public int getRmqMaxConsumeThreadNums() {
return rmqMaxConsumeThreadNums;
}
public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) {
this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
}
public int getRmqMinConsumeThreadNums() {
return rmqMinConsumeThreadNums;
}
public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) {
this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
}
public String getRmqMessageDestination() {
return rmqMessageDestination;
}
public void setRmqMessageDestination(final String rmqMessageDestination) {
this.rmqMessageDestination = rmqMessageDestination;
}
public int getRmqPullMessageBatchNums() {
return rmqPullMessageBatchNums;
}
public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) {
this.rmqPullMessageBatchNums = rmqPullMessageBatchNums;
}
public int getRmqPullMessageCacheCapacity() {
return rmqPullMessageCacheCapacity;
}
public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
}
}
...@@ -31,6 +31,7 @@ import io.openmessaging.rocketmq.consumer.PullConsumerImpl; ...@@ -31,6 +31,7 @@ import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl;
import io.openmessaging.rocketmq.producer.SequenceProducerImpl; import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint { public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties; private final KeyValue accessPointProperties;
......
...@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer; ...@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.PropertyKeys; import io.openmessaging.PropertyKeys;
import io.openmessaging.rocketmq.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
...@@ -38,32 +38,19 @@ class LocalMessageCache { ...@@ -38,32 +38,19 @@ class LocalMessageCache {
private final Map<String, ConsumeRequest> consumedRequest; private final Map<String, ConsumeRequest> consumedRequest;
private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable; private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
private final DefaultMQPullConsumer rocketmqPullConsumer; private final DefaultMQPullConsumer rocketmqPullConsumer;
private int pullBatchNums = 32; private final ClientConfig clientConfig;
private int pollTimeout = -1;
private final static Logger log = ClientLogger.getLog(); private final static Logger log = ClientLogger.getLog();
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) { LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
int cacheCapacity = 1000; consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) {
cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY);
}
consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity);
if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) {
pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS);
}
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
}
this.consumedRequest = new ConcurrentHashMap<>(); this.consumedRequest = new ConcurrentHashMap<>();
this.pullOffsetTable = new ConcurrentHashMap<>(); this.pullOffsetTable = new ConcurrentHashMap<>();
this.rocketmqPullConsumer = rocketmqPullConsumer; this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig;
} }
int nextPullBatchNums() { int nextPullBatchNums() {
return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity()); return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity());
} }
long nextPullOffset(MessageQueue remoteQueue) { long nextPullOffset(MessageQueue remoteQueue) {
...@@ -90,31 +77,25 @@ class LocalMessageCache { ...@@ -90,31 +77,25 @@ class LocalMessageCache {
} }
MessageExt poll() { MessageExt poll() {
try { return poll(clientConfig.getOmsOperationTimeout());
ConsumeRequest consumeRequest = consumeRequestCache.take();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
return consumeRequest.getMessageExt();
} catch (InterruptedException ignore) {
}
return null;
} }
MessageExt poll(final KeyValue properties) { MessageExt poll(final KeyValue properties) {
int currentPollTimeout = pollTimeout; int currentPollTimeout = clientConfig.getOmsOperationTimeout();
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
} }
return poll(currentPollTimeout);
}
if (currentPollTimeout == -1) { private MessageExt poll(long timeout) {
return poll();
}
try { try {
ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS); ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); if (consumeRequest != null) {
consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
return consumeRequest.getMessageExt(); consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest);
return consumeRequest.getMessageExt();
}
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
return null; return null;
......
...@@ -21,9 +21,10 @@ import io.openmessaging.Message; ...@@ -21,9 +21,10 @@ import io.openmessaging.Message;
import io.openmessaging.PropertyKeys; import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer; import io.openmessaging.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.OMSUtil; import io.openmessaging.rocketmq.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
...@@ -44,6 +45,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -44,6 +45,7 @@ public class PullConsumerImpl implements PullConsumer {
private String targetQueueName; private String targetQueueName;
private final MQPullConsumerScheduleService pullConsumerScheduleService; private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache; private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
final static Logger log = ClientLogger.getLog(); final static Logger log = ClientLogger.getLog();
...@@ -51,7 +53,9 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -51,7 +53,9 @@ public class PullConsumerImpl implements PullConsumer {
this.properties = properties; this.properties = properties;
this.targetQueueName = queueName; this.targetQueueName = queueName;
String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String consumerGroup = clientConfig.getRmqConsumerGroup();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
} }
...@@ -59,7 +63,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -59,7 +63,7 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
} }
...@@ -67,16 +71,14 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -67,16 +71,14 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
if (maxReDeliveryTimes != 0) { this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
}
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId); this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId); properties.put(PropertyKeys.CONSUMER_ID, consumerId);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
} }
@Override @Override
...@@ -86,12 +88,14 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -86,12 +88,14 @@ public class PullConsumerImpl implements PullConsumer {
@Override @Override
public Message poll() { public Message poll() {
return OMSUtil.msgConvert(localMessageCache.poll()); MessageExt rmqMsg = localMessageCache.poll();
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
} }
@Override @Override
public Message poll(final KeyValue properties) { public Message poll(final KeyValue properties) {
return OMSUtil.msgConvert(localMessageCache.poll(properties)); MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
} }
@Override @Override
......
...@@ -24,7 +24,9 @@ import io.openmessaging.PropertyKeys; ...@@ -24,7 +24,9 @@ import io.openmessaging.PropertyKeys;
import io.openmessaging.PushConsumer; import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext; import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.OMSUtil; import io.openmessaging.rocketmq.ClientConfig;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -43,43 +45,29 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -43,43 +45,29 @@ public class PushConsumerImpl implements PushConsumer {
private final KeyValue properties; private final KeyValue properties;
private boolean started = false; private boolean started = false;
private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>(); private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
private final ClientConfig clientConfig;
public PushConsumerImpl(final KeyValue properties) { public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.rocketmqPushConsumer = new DefaultMQPushConsumer();
this.properties = properties; this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
} }
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); String consumerGroup = clientConfig.getRmqConsumerGroup();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
} }
this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());
if (maxReDeliveryTimes != 0) { this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
this.rocketmqPushConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
}
int messageConsumeTimeout = properties.getInt(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT);
if (messageConsumeTimeout != 0) {
this.rocketmqPushConsumer.setConsumeTimeout(messageConsumeTimeout);
}
int maxConsumeThreadNums = properties.getInt(NonStandardKeys.MAX_CONSUME_THREAD_NUMS);
if (maxConsumeThreadNums != 0) {
this.rocketmqPushConsumer.setConsumeThreadMax(maxConsumeThreadNums);
}
int minConsumeThreadNums = properties.getInt(NonStandardKeys.MIN_CONSUME_THREAD_NUMS);
if (minConsumeThreadNums != 0) {
this.rocketmqPushConsumer.setConsumeThreadMin(minConsumeThreadNums);
}
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId); this.rocketmqPushConsumer.setInstanceName(consumerId);
...@@ -181,10 +169,9 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -181,10 +169,9 @@ public class PushConsumerImpl implements PushConsumer {
long begin = System.currentTimeMillis(); long begin = System.currentTimeMillis();
listener.onMessage(omsMsg, context); listener.onMessage(omsMsg, context);
long costs = System.currentTimeMillis() - begin; long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try { try {
sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout() - costs) sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
......
...@@ -27,8 +27,9 @@ import io.openmessaging.exception.OMSMessageFormatException; ...@@ -27,8 +27,9 @@ import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException; import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSTimeOutException; import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.rocketmq.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
...@@ -38,33 +39,29 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; ...@@ -38,33 +39,29 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.slf4j.Logger; import org.slf4j.Logger;
import static io.openmessaging.rocketmq.OMSUtil.buildInstanceName; import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory{
final static Logger log = ClientLogger.getLog(); final static Logger log = ClientLogger.getLog();
final KeyValue properties; final KeyValue properties;
final DefaultMQProducer rocketmqProducer; final DefaultMQProducer rocketmqProducer;
private boolean started = false; private boolean started = false;
final ClientConfig clientConfig;
AbstractOMSProducer(final KeyValue properties) { AbstractOMSProducer(final KeyValue properties) {
this.properties = properties; this.properties = properties;
this.rocketmqProducer = new DefaultMQProducer(); this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
} }
this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerGroup = properties.getString(NonStandardKeys.PRODUCER_GROUP);
if (producerGroup == null || producerGroup.isEmpty()) {
producerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
}
this.rocketmqProducer.setProducerGroup(producerGroup);
String producerId = buildInstanceName(); String producerId = buildInstanceName();
int operationTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
this.rocketmqProducer.setSendMsgTimeout(operationTimeout == 0 ? 5000 : operationTimeout);
this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(PropertyKeys.PRODUCER_ID, producerId); properties.put(PropertyKeys.PRODUCER_ID, producerId);
......
...@@ -25,12 +25,12 @@ import io.openmessaging.Promise; ...@@ -25,12 +25,12 @@ import io.openmessaging.Promise;
import io.openmessaging.PropertyKeys; import io.openmessaging.PropertyKeys;
import io.openmessaging.SendResult; import io.openmessaging.SendResult;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.OMSUtil; import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.promise.DefaultPromise; import io.openmessaging.rocketmq.promise.DefaultPromise;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import static io.openmessaging.rocketmq.OMSUtil.msgConvert; import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
public class ProducerImpl extends AbstractOMSProducer implements Producer { public class ProducerImpl extends AbstractOMSProducer implements Producer {
......
...@@ -21,7 +21,7 @@ import io.openmessaging.KeyValue; ...@@ -21,7 +21,7 @@ import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.Message;
import io.openmessaging.MessageHeader; import io.openmessaging.MessageHeader;
import io.openmessaging.SequenceProducer; import io.openmessaging.SequenceProducer;
import io.openmessaging.rocketmq.OMSUtil; import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.utils;
import io.openmessaging.KeyValue;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.log.ClientLogger;
import org.slf4j.Logger;
public final class BeanUtils {
final static Logger log = ClientLogger.getLog();
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
primitiveWrapperMap.put(Byte.TYPE, Byte.class);
primitiveWrapperMap.put(Character.TYPE, Character.class);
primitiveWrapperMap.put(Short.TYPE, Short.class);
primitiveWrapperMap.put(Integer.TYPE, Integer.class);
primitiveWrapperMap.put(Long.TYPE, Long.class);
primitiveWrapperMap.put(Double.TYPE, Double.class);
primitiveWrapperMap.put(Float.TYPE, Float.class);
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
if (!primitiveClass.equals(wrapperClass)) {
wrapperMap.put(wrapperClass, primitiveClass);
}
}
wrapperMap.put(String.class, String.class);
}
/**
* <p>Populate the JavaBeans properties of the specified bean, based on
* the specified name/value pairs. This method uses Java reflection APIs
* to identify corresponding "property setter" method names, and deals
* with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
* <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
* <Code>double</Code>.</p>
*
* <p>The particular setter method to be called for each property is
* determined using the usual JavaBeans introspection mechanisms. Thus,
* you may identify custom setter methods using a BeanInfo class that is
* associated with the class of the bean itself. If no such BeanInfo
* class is available, the standard method name conversion ("set" plus
* the capitalized name of the property in question) is used.</p>
*
* <p><strong>NOTE</strong>: It is contrary to the JavaBeans Specification
* to have more than one setter method (with different argument
* signatures) for the same property.</p>
*
* @param clazz JavaBean class whose properties are being populated
* @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
* @param <T> Class type
* @return Class instance
*/
public static <T> T populate(final Properties properties, final Class<T> clazz) {
T obj = null;
try {
obj = clazz.newInstance();
return populate(properties, obj);
} catch (Throwable e) {
log.warn("Error occurs !", e);
}
return obj;
}
public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
T obj = null;
try {
obj = clazz.newInstance();
return populate(properties, obj);
} catch (Throwable e) {
log.warn("Error occurs !", e);
}
return obj;
}
public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (method.getName().equalsIgnoreCase(methodName)) {
return method.getParameterTypes()[0];
}
}
return null;
}
public static void setProperties(Class<?> clazz, Object obj, String methodName,
Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<?> parameterClass = getMethodClass(clazz, methodName);
Method setterMethod = clazz.getMethod(methodName, parameterClass);
if (parameterClass == Boolean.TYPE) {
setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
} else if (parameterClass == Integer.TYPE) {
setterMethod.invoke(obj, Integer.valueOf(value.toString()));
} else if (parameterClass == Double.TYPE) {
setterMethod.invoke(obj, Double.valueOf(value.toString()));
} else if (parameterClass == Float.TYPE) {
setterMethod.invoke(obj, Float.valueOf(value.toString()));
} else if (parameterClass == Long.TYPE) {
setterMethod.invoke(obj, Long.valueOf(value.toString()));
} else
setterMethod.invoke(obj, value);
}
public static <T> T populate(final Properties properties, final T obj) {
Class<?> clazz = obj.getClass();
try {
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
String entryKey = entry.getKey().toString();
String[] keyGroup = entryKey.split("\\.");
for (int i = 0; i < keyGroup.length; i++) {
keyGroup[i] = keyGroup[i].toLowerCase();
keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
}
String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
try {
setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue());
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
//ignored...
}
}
} catch (RuntimeException e) {
log.warn("Error occurs !", e);
}
return obj;
}
public static <T> T populate(final KeyValue properties, final T obj) {
Class<?> clazz = obj.getClass();
try {
final Set<String> keySet = properties.keySet();
for (String key : keySet) {
String[] keyGroup = key.split("\\.");
for (int i = 0; i < keyGroup.length; i++) {
keyGroup[i] = keyGroup[i].toLowerCase();
keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
}
String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
try {
setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
//ignored...
}
}
} catch (RuntimeException e) {
log.warn("Error occurs !", e);
}
return obj;
}
}
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.openmessaging.rocketmq; package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage; import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册