提交 4fb72cd1 编写于 作者: S shutian.lzh

Fix tests

上级 30abd95e
...@@ -49,7 +49,10 @@ public class ClientLoggerTest { ...@@ -49,7 +49,10 @@ public class ClientLoggerTest {
rocketmqCommon.info("common message {}", i, new RuntimeException()); rocketmqCommon.info("common message {}", i, new RuntimeException());
rocketmqRemoting.info("remoting message {}", i, new RuntimeException()); rocketmqRemoting.info("remoting message {}", i, new RuntimeException());
} }
try {
Thread.sleep(10);
} catch (InterruptedException ignore) {
}
String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log"); String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log");
Assert.assertTrue(content.contains("testClientlog")); Assert.assertTrue(content.contains("testClientlog"));
Assert.assertTrue(content.contains("RocketmqClient")); Assert.assertTrue(content.contains("RocketmqClient"));
......
...@@ -69,6 +69,7 @@ public class SimpleProducer { ...@@ -69,6 +69,7 @@ public class SimpleProducer {
try { try {
countDownLatch.await(); countDownLatch.await();
Thread.sleep(500); // Wait some time for one-way delivery.
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
......
...@@ -20,16 +20,16 @@ import io.openmessaging.OMSBuiltinKeys; ...@@ -20,16 +20,16 @@ import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
private String omsDriverImpl; private String driverImpl;
private String omsAccessPoints; private String accessPoints;
private String omsNamespace; private String namespace;
private String omsProducerId; private String producerId;
private String omsConsumerId; private String consumerId;
private int omsOperationTimeout = 5000; private int operationTimeout = 5000;
private String omsRoutingName; private String region;
private String omsOperatorName; private String routingSource;
private String omsDstQueue; private String routingDestination;
private String omsSrcTopic; private String routingExpression;
private String rmqConsumerGroup; private String rmqConsumerGroup;
private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP"; private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
private int rmqMaxRedeliveryTimes = 16; private int rmqMaxRedeliveryTimes = 16;
...@@ -40,84 +40,60 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { ...@@ -40,84 +40,60 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
private int rmqPullMessageBatchNums = 32; private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000; private int rmqPullMessageCacheCapacity = 1000;
public String getOmsDriverImpl() { public String getDriverImpl() {
return omsDriverImpl; return driverImpl;
} }
public void setOmsDriverImpl(final String omsDriverImpl) { public void setDriverImpl(final String driverImpl) {
this.omsDriverImpl = omsDriverImpl; this.driverImpl = driverImpl;
} }
public String getOmsAccessPoints() { public String getAccessPoints() {
return omsAccessPoints; return accessPoints;
} }
public void setOmsAccessPoints(final String omsAccessPoints) { public void setAccessPoints(final String accessPoints) {
this.omsAccessPoints = omsAccessPoints; this.accessPoints = accessPoints;
} }
public String getOmsNamespace() { public String getNamespace() {
return omsNamespace; return namespace;
} }
public void setOmsNamespace(final String omsNamespace) { public void setNamespace(final String namespace) {
this.omsNamespace = omsNamespace; this.namespace = namespace;
} }
public String getOmsProducerId() { public String getProducerId() {
return omsProducerId; return producerId;
} }
public void setOmsProducerId(final String omsProducerId) { public void setProducerId(final String producerId) {
this.omsProducerId = omsProducerId; this.producerId = producerId;
} }
public String getOmsConsumerId() { public String getConsumerId() {
return omsConsumerId; return consumerId;
} }
public void setOmsConsumerId(final String omsConsumerId) { public void setConsumerId(final String consumerId) {
this.omsConsumerId = omsConsumerId; this.consumerId = consumerId;
} }
public int getOmsOperationTimeout() { public int getOperationTimeout() {
return omsOperationTimeout; return operationTimeout;
} }
public void setOmsOperationTimeout(final int omsOperationTimeout) { public void setOperationTimeout(final int operationTimeout) {
this.omsOperationTimeout = omsOperationTimeout; this.operationTimeout = operationTimeout;
} }
public String getOmsRoutingName() { public String getRoutingSource() {
return omsRoutingName; return routingSource;
} }
public void setOmsRoutingName(final String omsRoutingName) { public void setRoutingSource(final String routingSource) {
this.omsRoutingName = omsRoutingName; this.routingSource = routingSource;
}
public String getOmsOperatorName() {
return omsOperatorName;
}
public void setOmsOperatorName(final String omsOperatorName) {
this.omsOperatorName = omsOperatorName;
}
public String getOmsDstQueue() {
return omsDstQueue;
}
public void setOmsDstQueue(final String omsDstQueue) {
this.omsDstQueue = omsDstQueue;
}
public String getOmsSrcTopic() {
return omsSrcTopic;
}
public void setOmsSrcTopic(final String omsSrcTopic) {
this.omsSrcTopic = omsSrcTopic;
} }
public String getRmqConsumerGroup() { public String getRmqConsumerGroup() {
...@@ -191,4 +167,28 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys { ...@@ -191,4 +167,28 @@ public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) { public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity; this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
} }
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
public String getRoutingDestination() {
return routingDestination;
}
public void setRoutingDestination(String routingDestination) {
this.routingDestination = routingDestination;
}
public String getRoutingExpression() {
return routingExpression;
}
public void setRoutingExpression(String routingExpression) {
this.routingExpression = routingExpression;
}
} }
...@@ -91,11 +91,11 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -91,11 +91,11 @@ class LocalMessageCache implements ServiceLifecycle {
} }
MessageExt poll() { MessageExt poll() {
return poll(clientConfig.getOmsOperationTimeout()); return poll(clientConfig.getOperationTimeout());
} }
MessageExt poll(final KeyValue properties) { MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOmsOperationTimeout(); int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
} }
......
...@@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -52,7 +52,7 @@ public class PullConsumerImpl implements PullConsumer {
this.properties = properties; this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String consumerGroup = clientConfig.getOmsConsumerId(); String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
} }
...@@ -60,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -60,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
String accessPoints = clientConfig.getOmsAccessPoints(); String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
} }
......
...@@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer {
this.properties = properties; this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints(); String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
} }
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
String consumerGroup = clientConfig.getOmsConsumerId(); String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
} }
......
...@@ -52,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -52,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer = new DefaultMQProducer(); this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class); this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints(); String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
} }
...@@ -60,7 +60,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { ...@@ -60,7 +60,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup()); this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerId = buildInstanceName(); String producerId = buildInstanceName();
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout()); this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId); this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4); this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId); properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
......
...@@ -171,7 +171,7 @@ public final class BeanUtils { ...@@ -171,7 +171,7 @@ public final class BeanUtils {
} }
String beanFieldNameWithCapitalization = StringUtils.join(keyGroup); String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
try { try {
setProperties(clazz, obj, "setOms" + beanFieldNameWithCapitalization, properties.getString(key)); setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) { } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
//ignored... //ignored...
} }
......
...@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage; ...@@ -20,6 +20,7 @@ import io.openmessaging.BytesMessage;
import io.openmessaging.Message; import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer; import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
...@@ -48,9 +49,9 @@ public class PullConsumerImplTest { ...@@ -48,9 +49,9 @@ public class PullConsumerImplTest {
@Before @Before
public void init() throws NoSuchFieldException, IllegalAccessException { public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
consumer.attachQueue(queueName); consumer.attachQueue(queueName);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
...@@ -58,7 +59,7 @@ public class PullConsumerImplTest { ...@@ -58,7 +59,7 @@ public class PullConsumerImplTest {
field.set(consumer, rocketmqPullConsumer); //Replace field.set(consumer, rocketmqPullConsumer); //Replace
ClientConfig clientConfig = new ClientConfig(); ClientConfig clientConfig = new ClientConfig();
clientConfig.setOmsOperationTimeout(200); clientConfig.setOperationTimeout(200);
localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig)); localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig));
field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
......
...@@ -18,6 +18,7 @@ package io.openmessaging.rocketmq.consumer; ...@@ -18,6 +18,7 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage; import io.openmessaging.BytesMessage;
import io.openmessaging.Message; import io.openmessaging.Message;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
...@@ -47,9 +48,9 @@ public class PushConsumerImplTest { ...@@ -47,9 +48,9 @@ public class PushConsumerImplTest {
@Before @Before
public void init() throws NoSuchFieldException, IllegalAccessException { public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer( consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup")); OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true); field.setAccessible(true);
......
...@@ -50,7 +50,7 @@ public class ProducerImplTest { ...@@ -50,7 +50,7 @@ public class ProducerImplTest {
@Before @Before
public void init() throws NoSuchFieldException, IllegalAccessException { public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer(); producer = messagingAccessPoint.createProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer"); Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
......
...@@ -92,9 +92,9 @@ public class BeanUtilsTest { ...@@ -92,9 +92,9 @@ public class BeanUtilsTest {
@Test @Test
public void testPopulate_ExistObj() { public void testPopulate_ExistObj() {
CustomizedConfig config = new CustomizedConfig(); CustomizedConfig config = new CustomizedConfig();
config.setOmsConsumerId("NewConsumerId"); config.setConsumerId("NewConsumerId");
Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId"); Assert.assertEquals(config.getConsumerId(), "NewConsumerId");
config = BeanUtils.populate(properties, config); config = BeanUtils.populate(properties, config);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册