diff --git a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java index 9fe0d8b9c4501221ce76d21f02f8de304d13c08f..4888186b1e7a22d85fa6830a7b1e3f9b5c2780a1 100644 --- a/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/log/ClientLoggerTest.java @@ -49,7 +49,10 @@ public class ClientLoggerTest { rocketmqCommon.info("common message {}", i, new RuntimeException()); rocketmqRemoting.info("remoting message {}", i, new RuntimeException()); } - + try { + Thread.sleep(10); + } catch (InterruptedException ignore) { + } String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log"); Assert.assertTrue(content.contains("testClientlog")); Assert.assertTrue(content.contains("RocketmqClient")); diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java index f993225179b50654e868e3421e0dc5151cf458f9..dbe1d10568804e684e4098d94d7f107ce4496e34 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimpleProducer.java @@ -69,6 +69,7 @@ public class SimpleProducer { try { countDownLatch.await(); + Thread.sleep(500); // Wait some time for one-way delivery. } catch (InterruptedException ignore) { } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java index 774a7bc32f7b3c3c6681698b20e80d906f9c1678..a5dfe49484a58b37f89a09be7930b84ee8dec3f7 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/ClientConfig.java @@ -20,16 +20,16 @@ import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys; public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { - private String omsDriverImpl; - private String omsAccessPoints; - private String omsNamespace; - private String omsProducerId; - private String omsConsumerId; - private int omsOperationTimeout = 5000; - private String omsRoutingName; - private String omsOperatorName; - private String omsDstQueue; - private String omsSrcTopic; + private String driverImpl; + private String accessPoints; + private String namespace; + private String producerId; + private String consumerId; + private int operationTimeout = 5000; + private String region; + private String routingSource; + private String routingDestination; + private String routingExpression; private String rmqConsumerGroup; private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; private int rmqMaxRedeliveryTimes = 16; @@ -40,84 +40,60 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { private int rmqPullMessageBatchNums = 32; private int rmqPullMessageCacheCapacity = 1000; - public String getOmsDriverImpl() { - return omsDriverImpl; + public String getDriverImpl() { + return driverImpl; } - public void setOmsDriverImpl(final String omsDriverImpl) { - this.omsDriverImpl = omsDriverImpl; + public void setDriverImpl(final String driverImpl) { + this.driverImpl = driverImpl; } - public String getOmsAccessPoints() { - return omsAccessPoints; + public String getAccessPoints() { + return accessPoints; } - public void setOmsAccessPoints(final String omsAccessPoints) { - this.omsAccessPoints = omsAccessPoints; + public void setAccessPoints(final String accessPoints) { + this.accessPoints = accessPoints; } - public String getOmsNamespace() { - return omsNamespace; + public String getNamespace() { + return namespace; } - public void setOmsNamespace(final String omsNamespace) { - this.omsNamespace = omsNamespace; + public void setNamespace(final String namespace) { + this.namespace = namespace; } - public String getOmsProducerId() { - return omsProducerId; + public String getProducerId() { + return producerId; } - public void setOmsProducerId(final String omsProducerId) { - this.omsProducerId = omsProducerId; + public void setProducerId(final String producerId) { + this.producerId = producerId; } - public String getOmsConsumerId() { - return omsConsumerId; + public String getConsumerId() { + return consumerId; } - public void setOmsConsumerId(final String omsConsumerId) { - this.omsConsumerId = omsConsumerId; + public void setConsumerId(final String consumerId) { + this.consumerId = consumerId; } - public int getOmsOperationTimeout() { - return omsOperationTimeout; + public int getOperationTimeout() { + return operationTimeout; } - public void setOmsOperationTimeout(final int omsOperationTimeout) { - this.omsOperationTimeout = omsOperationTimeout; + public void setOperationTimeout(final int operationTimeout) { + this.operationTimeout = operationTimeout; } - public String getOmsRoutingName() { - return omsRoutingName; + public String getRoutingSource() { + return routingSource; } - public void setOmsRoutingName(final String omsRoutingName) { - this.omsRoutingName = omsRoutingName; - } - - public String getOmsOperatorName() { - return omsOperatorName; - } - - public void setOmsOperatorName(final String omsOperatorName) { - this.omsOperatorName = omsOperatorName; - } - - public String getOmsDstQueue() { - return omsDstQueue; - } - - public void setOmsDstQueue(final String omsDstQueue) { - this.omsDstQueue = omsDstQueue; - } - - public String getOmsSrcTopic() { - return omsSrcTopic; - } - - public void setOmsSrcTopic(final String omsSrcTopic) { - this.omsSrcTopic = omsSrcTopic; + public void setRoutingSource(final String routingSource) { + this.routingSource = routingSource; } public String getRmqConsumerGroup() { @@ -191,4 +167,28 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; } + + public String getRegion() { + return region; + } + + public void setRegion(String region) { + this.region = region; + } + + public String getRoutingDestination() { + return routingDestination; + } + + public void setRoutingDestination(String routingDestination) { + this.routingDestination = routingDestination; + } + + public String getRoutingExpression() { + return routingExpression; + } + + public void setRoutingExpression(String routingExpression) { + this.routingExpression = routingExpression; + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 872d8fb5a6cefc46a88c1a2bc7b7f9c35672a074..93e60a738e94a6e041dd8f6f0c98749b3c5a5159 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -91,11 +91,11 @@ class LocalMessageCache implements ServiceLifecycle { } MessageExt poll() { - return poll(clientConfig.getOmsOperationTimeout()); + return poll(clientConfig.getOperationTimeout()); } MessageExt poll(final KeyValue properties) { - int currentPollTimeout = clientConfig.getOmsOperationTimeout(); + int currentPollTimeout = clientConfig.getOperationTimeout(); if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 8bc7a7701ee01d7b97b8f7806859e49f1f529c3d..225b09e47a6a0cf7e6946bba851fbb2ee3ff18bb 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer { this.properties = properties; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String consumerGroup = clientConfig.getOmsConsumerId(); + String consumerGroup = clientConfig.getConsumerId(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } @@ -60,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer { this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 89106762a95059f43b6eeaa216fde7b9274a1e42..9bfd7c8b6228a51cfdebe3e4e97fb4be35b82ec5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer { this.properties = properties; this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); - String consumerGroup = clientConfig.getOmsConsumerId(); + String consumerGroup = clientConfig.getConsumerId(); if (null == consumerGroup || consumerGroup.isEmpty()) { throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 2e99fd6e72219fc216efab1a129383235461570f..f7337566098492cf9c8722392bcb45ad28dea807 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -52,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer = new DefaultMQProducer(); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); - String accessPoints = clientConfig.getOmsAccessPoints(); + String accessPoints = clientConfig.getAccessPoints(); if (accessPoints == null || accessPoints.isEmpty()) { throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); } @@ -60,7 +60,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); String producerId = buildInstanceName(); - this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); + this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout()); this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java index 054374b8f9b8a5e1ee9837b40848f80ef3a31c12..ef9236f09c131eaad9b0cdebd5f87a5e7949b6bc 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/BeanUtils.java @@ -171,7 +171,7 @@ public final class BeanUtils { } String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); try { - setProperties(clazz, obj, "setOms" + beanFieldNameWithCapitalization, properties.getString(key)); + setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key)); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { //ignored... } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 7e81b40bcc2690241d7fadcb4982ff5370c956d6..da2e8a084fe89e44b7e933351e5b9ee2a48422ae 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage; import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.PullConsumer; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; @@ -48,9 +49,9 @@ public class PullConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); consumer.attachQueue(queueName); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); @@ -58,7 +59,7 @@ public class PullConsumerImplTest { field.set(consumer, rocketmqPullConsumer); //Replace ClientConfig clientConfig = new ClientConfig(); - clientConfig.setOmsOperationTimeout(200); + clientConfig.setOperationTimeout(200); localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig)); field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 5caa2b69ada5f7e3937528d432b27f171b01b888..b55816b898958f942d8522cd8104cc28830498a9 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -18,6 +18,7 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.BytesMessage; import io.openmessaging.Message; +import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.MessageListener; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; @@ -47,9 +48,9 @@ public class PushConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); consumer = messagingAccessPoint.createPushConsumer( - OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); + OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); field.setAccessible(true); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java index 7b3617925eea5f5afd2f2a7a24c9054120608887..fc6515ea8d14cb5936cd2817c45fc2689863dae7 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/producer/ProducerImplTest.java @@ -50,7 +50,7 @@ public class ProducerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); producer = messagingAccessPoint.createProducer(); Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java index 71ca11ccfdd38d02d3425b14618e85cb5cf1ab10..1a431d988fd1fe5fc24d90b5b81c5a64ed2af170 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/utils/BeanUtilsTest.java @@ -92,9 +92,9 @@ public class BeanUtilsTest { @Test public void testPopulate_ExistObj() { CustomizedConfig config = new CustomizedConfig(); - config.setOmsConsumerId("NewConsumerId"); + config.setConsumerId("NewConsumerId"); - Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); + Assert.assertEquals(config.getConsumerId(), "NewConsumerId"); config = BeanUtils.populate(properties, config);