From 4fb72cd151d7a3607b5aef2b14c3b2de36e15a2a Mon Sep 17 00:00:00 2001 From: "shutian.lzh" Date: Tue, 17 Apr 2018 10:50:18 +0800 Subject: [PATCH] Fix tests --- .../rocketmq/client/log/ClientLoggerTest.java | 5 +- .../example/openmessaging/SimpleProducer.java | 1 + .../rocketmq/config/ClientConfig.java | 124 +++++++++--------- .../rocketmq/consumer/LocalMessageCache.java | 4 +- .../rocketmq/consumer/PullConsumerImpl.java | 4 +- .../rocketmq/consumer/PushConsumerImpl.java | 4 +- .../producer/AbstractOMSProducer.java | 4 +- .../rocketmq/utils/BeanUtils.java | 2 +- .../consumer/PullConsumerImplTest.java | 7 +- .../consumer/PushConsumerImplTest.java | 5 +- .../rocketmq/producer/ProducerImplTest.java | 2 +- .../rocketmq/utils/BeanUtilsTest.java | 4 +- 12 files changed, 86 insertions(+), 80 deletions(-) diff --git a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java index 9fe0d8b9..4888186b 100644 --- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java @@ -49,7 +49,10 @@ public class ClientLoggerTest { rocketmqCommon.info("common message {}", i, new RuntimeException()); rocketmqRemoting.info("remoting message {}", i, new RuntimeException()); } - + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + } String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log"); Assert.assertTrue(content.contains("testClientlog")); Assert.assertTrue(content.contains("RocketmqClient")); diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index f9932251..dbe1d105 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -69,6 +69,7 @@ public class SimpleProducer { try { countDownLatch.await(); + Thread.sleep(500); // Wait some time for one-way delivery. } catch (InterruptedException ignore) { } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java index 774a7bc3..a5dfe494 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java @@ -20,16 +20,16 @@ import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys; public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { - private String omsDriverImpl; - private String omsAccessPoints; - private String omsNamespace; - private String omsProducerId; - private String omsConsumerId; - private int omsOperationTimeout = 5000; - private String omsRoutingName; - private String omsOperatorName; - private String omsDstQueue; - private String omsSrcTopic; + private String driverImpl; + private String accessPoints; + private String namespace; + private String producerId; + private String consumerId; + private int operationTimeout = 5000; + private String region; + private String routingSource; + private String routingDestination; + private String routingExpression; private String rmqConsumerGroup; private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; private int rmqMaxRedeliveryTimes = 16; @@ -40,84 +40,60 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { private int rmqPullMessageBatchNums = 32; private int rmqPullMessageCacheCapacity = 1000; - public String getOmsDriverImpl() { - return omsDriverImpl; + public String getDriverImpl() { + return driverImpl; } - public void setOmsDriverImpl(final String omsDriverImpl) { - this.omsDriverImpl = omsDriverImpl; + public void setDriverImpl(final String driverImpl) { + this.driverImpl = driverImpl; } - public String getOmsAccessPoints() { - return omsAccessPoints; + public String getAccessPoints() { + return accessPoints; } - public void setOmsAccessPoints(final String omsAccessPoints) { - this.omsAccessPoints = omsAccessPoints; + public void setAccessPoints(final String accessPoints) { + this.accessPoints = accessPoints; } - public String getOmsNamespace() { - return omsNamespace; + public String getNamespace() { + return namespace; } - public void setOmsNamespace(final String omsNamespace) { - this.omsNamespace = omsNamespace; + public void setNamespace(final String namespace) { + this.namespace = namespace; } - public String getOmsProducerId() { - return omsProducerId; + public String getProducerId() { + return producerId; } - public void setOmsProducerId(final String omsProducerId) { - this.omsProducerId = omsProducerId; + public void setProducerId(final String producerId) { + this.producerId = producerId; } - public String getOmsConsumerId() { - return omsConsumerId; + public String getConsumerId() { + return consumerId; } - public void setOmsConsumerId(final String omsConsumerId) { - this.omsConsumerId = omsConsumerId; + public void setConsumerId(final String consumerId) { + this.consumerId = consumerId; } - public int getOmsOperationTimeout() { - return omsOperationTimeout; + public int getOperationTimeout() { + return operationTimeout; } - public void setOmsOperationTimeout(final int omsOperationTimeout) { - this.omsOperationTimeout = omsOperationTimeout; + public void setOperationTimeout(final int operationTimeout) { + this.operationTimeout = operationTimeout; } - public String getOmsRoutingName() { - return omsRoutingName; + public String getRoutingSource() { + return routingSource; } - public void setOmsRoutingName(final String omsRoutingName) { - this.omsRoutingName = omsRoutingName; - } - - public String getOmsOperatorName() { - return omsOperatorName; - } - - public void setOmsOperatorName(final String omsOperatorName) { - this.omsOperatorName = omsOperatorName; - } - - public String getOmsDstQueue() { - return omsDstQueue; - } - - public void setOmsDstQueue(final String omsDstQueue) { - this.omsDstQueue = omsDstQueue; - } - - public String getOmsSrcTopic() { - return omsSrcTopic; - } - - public void setOmsSrcTopic(final String omsSrcTopic) { - this.omsSrcTopic = omsSrcTopic; + public void setRoutingSource(final String routingSource) { + this.routingSource = routingSource; } public String getRmqConsumerGroup() { @@ -191,4 +167,28 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getRoutingDestination() { + return routingDestination; + } + + public void setRoutingDestination(String routingDestination) { + this.routingDestination = routingDestination; + } + + public String getRoutingExpression() { + return routingExpression; + } + + public void setRoutingExpression(String routingExpression) { + this.routingExpression = routingExpression; + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 872d8fb5..93e60a73 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -91,11 +91,11 @@ class LocalMessageCache implements ServiceLifecycle { } MessageExt poll() { - return poll(clientConfig.getOmsOperationTimeout()); + return poll(clientConfig.getOperationTimeout()); } MessageExt poll(final KeyValue properties) { - int currentPollTimeout = clientConfig.getOmsOperationTimeout(); + int currentPollTimeout = clientConfig.getOperationTimeout(); if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 8bc7a770..225b09e4 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer { this.properties = properties; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String consumerGroup = clientConfig.getOmsConsumerId(); + String consumerGroup = clientConfig.getConsumerId(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } @@ -60,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 89106762..9bfd7c8b 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer { this.properties = properties; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); - String consumerGroup = clientConfig.getOmsConsumerId(); + String consumerGroup = clientConfig.getConsumerId(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 2e99fd6e..f7337566 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -52,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer = new DefaultMQProducer(); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } @@ -60,7 +60,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); String producerId = buildInstanceName(); - this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java index 054374b8..ef9236f0 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -171,7 +171,7 @@ public final class BeanUtils { } String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); try { - setProperties(clazz, obj, "setOms" + beanFieldNameWithCapitalization, properties.getString(key)); + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key)); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { //ignored... } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 7e81b40b..da2e8a08 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.PullConsumer; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; @@ -48,9 +49,9 @@ public class PullConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); consumer.attachQueue(queueName); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); @@ -58,7 +59,7 @@ public class PullConsumerImplTest { field.set(consumer, rocketmqPullConsumer); //Replace ClientConfig clientConfig = new ClientConfig(); - clientConfig.setOmsOperationTimeout(200); + clientConfig.setOperationTimeout(200); localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig)); field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 5caa2b69..b55816b8 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -18,6 +18,7 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.Message; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.MessageListener; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; @@ -47,9 +48,9 @@ public class PushConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); consumer = messagingAccessPoint.createPushConsumer( - OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); field.setAccessible(true); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java index 7b361792..fc6515ea 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -50,7 +50,7 @@ public class ProducerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); producer = messagingAccessPoint.createProducer(); Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java index 71ca11cc..1a431d98 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java @@ -92,9 +92,9 @@ public class BeanUtilsTest { @Test public void testPopulate_ExistObj() { CustomizedConfig config = new CustomizedConfig(); - config.setOmsConsumerId("NewConsumerId"); + config.setConsumerId("NewConsumerId"); - Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); + Assert.assertEquals(config.getConsumerId(), "NewConsumerId"); config = BeanUtils.populate(properties, config); -- GitLab