提交 2169e3c0 编写于 作者: S shutian.lzh

Make code compatible to OMS 0.3.0

上级 48476ae5
......@@ -70,6 +70,7 @@ public class BrokerStartup {
}
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
......
......@@ -49,7 +49,10 @@ public class ClientLoggerTest {
rocketmqCommon.info("common message {}", i, new RuntimeException());
rocketmqRemoting.info("remoting message {}", i, new RuntimeException());
}
try {
Thread.sleep(10);
} catch (InterruptedException ignore) {
}
String content = MixAll.file2String(LOG_DIR + "/rocketmq_client.log");
Assert.assertTrue(content.contains("testClientlog"));
Assert.assertTrue(content.contains("RocketmqClient"));
......
......@@ -16,19 +16,20 @@
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.SendResult;
import io.openmessaging.OMS;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
public class SimpleProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final Producer producer = messagingAccessPoint.createProducer();
......@@ -38,39 +39,40 @@ public class SimpleProducer {
producer.startup();
System.out.printf("Producer startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
Message message = producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
}
final CountDownLatch countDownLatch = new CountDownLatch(1);
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
final Future<SendResult> result = producer.sendAsync(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new FutureListener<SendResult>() {
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
public void operationComplete(Future<SendResult> future) {
if (future.getThrowable() != null) {
System.out.printf("Send async message Failed, error: %s%n", future.getThrowable().getMessage());
} else {
System.out.printf("Send async message OK, msgId: %s%n", future.get().messageId());
}
countDownLatch.countDown();
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
try {
countDownLatch.await();
Thread.sleep(500); // Wait some time for one-way delivery.
} catch (InterruptedException ignore) {
}
producer.shutdown();
}
}
......@@ -17,42 +17,60 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PullConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
final Producer producer = messagingAccessPoint.createProducer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
final String queueName = "TopicTest";
producer.startup();
Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
producer.shutdown();
consumer.attachQueue(queueName);
consumer.startup();
System.out.printf("Consumer startup OK%n");
while (true) {
Message message = consumer.poll();
// Keep running until we find the one that has just been sent
boolean stop = false;
while (!stop) {
Message message = consumer.receive();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
if (!stop) {
stop = msgId.equalsIgnoreCase(sendResult.messageId());
}
} else {
System.out.printf("Return without any message%n");
}
}
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}
......@@ -17,22 +17,19 @@
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
public class SimplePushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
......@@ -47,8 +44,8 @@ public class SimplePushConsumer {
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
public void onReceived(Message message, Context context) {
System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
context.ack();
}
});
......
......@@ -29,7 +29,7 @@ public class Producer {
producer.start();
for (int i = 0; i < 10000000; i++)
for (int i = 0; i < 128; i++)
try {
{
Message msg = new Message("TopicTest",
......
......@@ -32,7 +32,7 @@ public class PullScheduleService {
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
scheduleService.setMessageModel(MessageModel.CLUSTERING);
scheduleService.registerPullTaskCallback("TopicTest1", new PullTaskCallback() {
scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context) {
......
......@@ -16,24 +16,21 @@
*/
package io.openmessaging.rocketmq;
import io.openmessaging.IterableConsumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.Producer;
import io.openmessaging.PullConsumer;
import io.openmessaging.PushConsumer;
import io.openmessaging.ResourceManager;
import io.openmessaging.SequenceProducer;
import io.openmessaging.ServiceEndPoint;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.consumer.StreamingConsumer;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.observer.Observer;
import io.openmessaging.producer.Producer;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties;
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
......@@ -41,10 +38,15 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return accessPointProperties;
}
@Override
public String implVersion() {
return "0.3.0";
}
@Override
public Producer createProducer() {
return new ProducerImpl(this.accessPointProperties);
......@@ -55,16 +57,6 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public SequenceProducer createSequenceProducer() {
return new SequenceProducerImpl(this.accessPointProperties);
}
@Override
public SequenceProducer createSequenceProducer(KeyValue properties) {
return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
......@@ -76,50 +68,30 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override
public PullConsumer createPullConsumer(String queueName) {
return new PullConsumerImpl(queueName, accessPointProperties);
public PullConsumer createPullConsumer() {
return new PullConsumerImpl(accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
public PullConsumer createPullConsumer(KeyValue attributes) {
return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, attributes));
}
@Override
public IterableConsumer createIterableConsumer(String queueName) {
throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
public StreamingConsumer createStreamingConsumer() {
return null;
}
@Override
public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) {
throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
public StreamingConsumer createStreamingConsumer(KeyValue attributes) {
return null;
}
@Override
public ResourceManager getResourceManager() {
public ResourceManager resourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
}
@Override
public ServiceEndPoint createServiceEndPoint() {
throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
}
@Override
public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
}
@Override
public void addObserver(Observer observer) {
//Ignore
}
@Override
public void deleteObserver(Observer observer) {
//Ignore
}
@Override
public void startup() {
//Ignore
......
......@@ -16,20 +16,20 @@
*/
package io.openmessaging.rocketmq.config;
import io.openmessaging.PropertyKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class ClientConfig implements PropertyKeys, NonStandardKeys {
private String omsDriverImpl;
private String omsAccessPoints;
private String omsNamespace;
private String omsProducerId;
private String omsConsumerId;
private int omsOperationTimeout = 5000;
private String omsRoutingName;
private String omsOperatorName;
private String omsDstQueue;
private String omsSrcTopic;
public class ClientConfig implements OMSBuiltinKeys, NonStandardKeys {
private String driverImpl;
private String accessPoints;
private String namespace;
private String producerId;
private String consumerId;
private int operationTimeout = 5000;
private String region;
private String routingSource;
private String routingDestination;
private String routingExpression;
private String rmqConsumerGroup;
private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
private int rmqMaxRedeliveryTimes = 16;
......@@ -40,84 +40,60 @@ public class ClientConfig implements PropertyKeys, NonStandardKeys {
private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000;
public String getOmsDriverImpl() {
return omsDriverImpl;
public String getDriverImpl() {
return driverImpl;
}
public void setOmsDriverImpl(final String omsDriverImpl) {
this.omsDriverImpl = omsDriverImpl;
public void setDriverImpl(final String driverImpl) {
this.driverImpl = driverImpl;
}
public String getOmsAccessPoints() {
return omsAccessPoints;
public String getAccessPoints() {
return accessPoints;
}
public void setOmsAccessPoints(final String omsAccessPoints) {
this.omsAccessPoints = omsAccessPoints;
public void setAccessPoints(final String accessPoints) {
this.accessPoints = accessPoints;
}
public String getOmsNamespace() {
return omsNamespace;
public String getNamespace() {
return namespace;
}
public void setOmsNamespace(final String omsNamespace) {
this.omsNamespace = omsNamespace;
public void setNamespace(final String namespace) {
this.namespace = namespace;
}
public String getOmsProducerId() {
return omsProducerId;
public String getProducerId() {
return producerId;
}
public void setOmsProducerId(final String omsProducerId) {
this.omsProducerId = omsProducerId;
public void setProducerId(final String producerId) {
this.producerId = producerId;
}
public String getOmsConsumerId() {
return omsConsumerId;
public String getConsumerId() {
return consumerId;
}
public void setOmsConsumerId(final String omsConsumerId) {
this.omsConsumerId = omsConsumerId;
public void setConsumerId(final String consumerId) {
this.consumerId = consumerId;
}
public int getOmsOperationTimeout() {
return omsOperationTimeout;
public int getOperationTimeout() {
return operationTimeout;
}
public void setOmsOperationTimeout(final int omsOperationTimeout) {
this.omsOperationTimeout = omsOperationTimeout;
public void setOperationTimeout(final int operationTimeout) {
this.operationTimeout = operationTimeout;
}
public String getOmsRoutingName() {
return omsRoutingName;
public String getRoutingSource() {
return routingSource;
}
public void setOmsRoutingName(final String omsRoutingName) {
this.omsRoutingName = omsRoutingName;
}
public String getOmsOperatorName() {
return omsOperatorName;
}
public void setOmsOperatorName(final String omsOperatorName) {
this.omsOperatorName = omsOperatorName;
}
public String getOmsDstQueue() {
return omsDstQueue;
}
public void setOmsDstQueue(final String omsDstQueue) {
this.omsDstQueue = omsDstQueue;
}
public String getOmsSrcTopic() {
return omsSrcTopic;
}
public void setOmsSrcTopic(final String omsSrcTopic) {
this.omsSrcTopic = omsSrcTopic;
public void setRoutingSource(final String routingSource) {
this.routingSource = routingSource;
}
public String getRmqConsumerGroup() {
......@@ -191,4 +167,28 @@ public class ClientConfig implements PropertyKeys, NonStandardKeys {
public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
public String getRoutingDestination() {
return routingDestination;
}
public void setRoutingDestination(String routingDestination) {
this.routingDestination = routingDestination;
}
public String getRoutingExpression() {
return routingExpression;
}
public void setRoutingExpression(String routingExpression) {
this.routingExpression = routingExpression;
}
}
......@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.PropertyKeys;
import io.openmessaging.Message;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
......@@ -37,11 +37,11 @@ import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
......@@ -91,13 +91,13 @@ class LocalMessageCache implements ServiceLifecycle {
}
MessageExt poll() {
return poll(clientConfig.getOmsOperationTimeout());
return poll(clientConfig.getOperationTimeout());
}
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOmsOperationTimeout();
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
}
return poll(currentPollTimeout);
}
......
......@@ -18,8 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
......@@ -34,28 +34,25 @@ import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
private boolean started = false;
private String targetQueueName;
private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
final static InternalLogger log = ClientLogger.getLog();
public PullConsumerImpl(final String queueName, final KeyValue properties) {
public PullConsumerImpl(final KeyValue properties) {
this.properties = properties;
this.targetQueueName = queueName;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String consumerGroup = clientConfig.getRmqConsumerGroup();
String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
}
......@@ -63,7 +60,7 @@ public class PullConsumerImpl implements PullConsumer {
this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
String accessPoints = clientConfig.getOmsAccessPoints();
String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
......@@ -76,24 +73,42 @@ public class PullConsumerImpl implements PullConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return properties;
}
@Override
public Message poll() {
public PullConsumer attachQueue(String queueName) {
registerPullTaskCallback(queueName);
return this;
}
@Override
public PullConsumer attachQueue(String queueName, KeyValue attributes) {
registerPullTaskCallback(queueName);
return this;
}
@Override
public PullConsumer detachQueue(String queueName) {
this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
return this;
}
@Override
public Message receive() {
MessageExt rmqMsg = localMessageCache.poll();
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
public Message poll(final KeyValue properties) {
public Message receive(final KeyValue properties) {
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
......@@ -112,7 +127,6 @@ public class PullConsumerImpl implements PullConsumer {
public synchronized void startup() {
if (!started) {
try {
registerPullTaskCallback();
this.pullConsumerScheduleService.start();
this.localMessageCache.startup();
} catch (MQClientException e) {
......@@ -122,7 +136,7 @@ public class PullConsumerImpl implements PullConsumer {
this.started = true;
}
private void registerPullTaskCallback() {
private void registerPullTaskCallback(final String targetQueueName) {
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
@Override
public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
......
......@@ -18,12 +18,12 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageListener;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
......@@ -52,13 +52,13 @@ public class PushConsumerImpl implements PushConsumer {
this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints();
String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
String consumerGroup = clientConfig.getRmqConsumerGroup();
String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
}
......@@ -70,13 +70,13 @@ public class PushConsumerImpl implements PushConsumer {
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return properties;
}
......@@ -90,6 +90,11 @@ public class PushConsumerImpl implements PushConsumer {
this.rocketmqPushConsumer.suspend();
}
@Override
public void suspend(long timeout) {
}
@Override
public boolean isSuspended() {
return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
......@@ -106,6 +111,32 @@ public class PushConsumerImpl implements PushConsumer {
return this;
}
@Override
public PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes) {
return this.attachQueue(queueName, listener);
}
@Override
public PushConsumer detachQueue(String queueName) {
this.subscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.unsubscribe(queueName);
} catch (Exception e) {
throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
}
return null;
}
@Override
public void addInterceptor(ConsumerInterceptor interceptor) {
}
@Override
public void removeInterceptor(ConsumerInterceptor interceptor) {
}
@Override
public synchronized void startup() {
if (!started) {
......@@ -146,9 +177,9 @@ public class PushConsumerImpl implements PushConsumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
ReceivedMessageContext context = new ReceivedMessageContext() {
MessageListener.Context context = new MessageListener.Context() {
@Override
public KeyValue properties() {
public KeyValue attributes() {
return contextProperties;
}
......@@ -158,16 +189,9 @@ public class PushConsumerImpl implements PushConsumer {
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
@Override
public void ack(final KeyValue properties) {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
};
long begin = System.currentTimeMillis();
listener.onMessage(omsMsg, context);
listener.onReceived(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try {
......
......@@ -23,13 +23,13 @@ import io.openmessaging.OMS;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class BytesMessageImpl implements BytesMessage {
private KeyValue headers;
private KeyValue properties;
private KeyValue sysHeaders;
private KeyValue userHeaders;
private byte[] body;
public BytesMessageImpl() {
this.headers = OMS.newKeyValue();
this.properties = OMS.newKeyValue();
this.sysHeaders = OMS.newKeyValue();
this.userHeaders = OMS.newKeyValue();
}
@Override
......@@ -44,60 +44,60 @@ public class BytesMessageImpl implements BytesMessage {
}
@Override
public KeyValue headers() {
return headers;
public KeyValue sysHeaders() {
return sysHeaders;
}
@Override
public KeyValue properties() {
return properties;
public KeyValue userHeaders() {
return userHeaders;
}
@Override
public Message putHeaders(final String key, final int value) {
headers.put(key, value);
public Message putSysHeaders(String key, int value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final long value) {
headers.put(key, value);
public Message putSysHeaders(String key, long value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final double value) {
headers.put(key, value);
public Message putSysHeaders(String key, double value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final String value) {
headers.put(key, value);
public Message putSysHeaders(String key, String value) {
sysHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final int value) {
properties.put(key, value);
public Message putUserHeaders(String key, int value) {
userHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final long value) {
properties.put(key, value);
public Message putUserHeaders(String key, long value) {
userHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final double value) {
properties.put(key, value);
public Message putUserHeaders(String key, double value) {
userHeaders.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final String value) {
properties.put(key, value);
public Message putUserHeaders(String key, String value) {
userHeaders.put(key, value);
return this;
}
......
package io.openmessaging.rocketmq.domain;
public interface RocketMQConstants {
String START_DELIVER_TIME = "__STARTDELIVERTIME";
}
......@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.domain;
import io.openmessaging.KeyValue;
import io.openmessaging.SendResult;
import io.openmessaging.producer.SendResult;
public class SendResultImpl implements SendResult {
private String messageId;
......@@ -33,7 +33,6 @@ public class SendResultImpl implements SendResult {
return messageId;
}
@Override
public KeyValue properties() {
return properties;
}
......
......@@ -20,8 +20,7 @@ import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageFactory;
import io.openmessaging.MessageHeader;
import io.openmessaging.PropertyKeys;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException;
......@@ -53,7 +52,7 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints();
String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
......@@ -61,10 +60,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerId = buildInstanceName();
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(PropertyKeys.PRODUCER_ID, producerId);
properties.put(OMSBuiltinKeys.PRODUCER_ID, producerId);
}
@Override
......@@ -121,18 +120,10 @@ abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
}
@Override
public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.TOPIC, topic);
return bytesMessage;
}
@Override
public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.QUEUE, queue);
return bytesMessage;
public BytesMessage createBytesMessage(String queue, byte[] body) {
BytesMessage message = new BytesMessageImpl();
message.setBody(body);
message.sysHeaders().put(Message.BuiltinKeys.DESTINATION, queue);
return message;
}
}
......@@ -19,12 +19,13 @@ package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.Producer;
import io.openmessaging.Promise;
import io.openmessaging.PropertyKeys;
import io.openmessaging.SendResult;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.producer.BatchMessageSender;
import io.openmessaging.producer.LocalTransactionExecutor;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.producer.SendCallback;
......@@ -39,7 +40,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
@Override
public KeyValue properties() {
public KeyValue attributes() {
return properties;
}
......@@ -50,11 +51,16 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public SendResult send(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return send(message, timeout);
}
@Override
public SendResult send(Message message, LocalTransactionExecutor branchExecutor, KeyValue attributes) {
return null;
}
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
......@@ -64,11 +70,11 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
}
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
throw checkProducerException(rmqMessage.getTopic(), message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID), e);
}
}
......@@ -79,8 +85,8 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
long timeout = properties.containsKey(Message.BuiltinKeys.TIMEOUT)
? properties.getInt(Message.BuiltinKeys.TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}
......@@ -92,7 +98,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
message.sysHeaders().put(Message.BuiltinKeys.MESSAGE_ID, rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
......@@ -121,4 +127,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
public void sendOneway(final Message message, final KeyValue properties) {
sendOneway(message);
}
@Override
public BatchMessageSender createBatchMessageSender() {
return null;
}
@Override
public void addInterceptor(ProducerInterceptor interceptor) {
}
@Override
public void removeInterceptor(ProducerInterceptor interceptor) {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.SequenceProducer;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
private BlockingQueue<Message> msgCacheQueue;
public SequenceProducerImpl(final KeyValue properties) {
super(properties);
this.msgCacheQueue = new LinkedBlockingQueue<>();
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public void send(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
try {
Validators.checkMessage(rmqMessage, this.rocketmqProducer);
} catch (MQClientException e) {
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
}
msgCacheQueue.add(message);
}
@Override
public void send(final Message message, final KeyValue properties) {
send(message);
}
@Override
public synchronized void commit() {
List<Message> messages = new ArrayList<>();
msgCacheQueue.drainTo(messages);
List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
for (Message message : messages) {
rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
}
if (rmqMessages.size() == 0) {
return;
}
try {
SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
String[] msgIdArray = sendResult.getMsgId().split(",");
for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
}
} catch (Exception e) {
throw checkProducerException("", "", e);
}
}
@Override
public synchronized void rollback() {
msgCacheQueue.clear();
}
}
......@@ -17,7 +17,7 @@
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.FutureListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -33,7 +33,7 @@ public class DefaultPromise<V> implements Promise<V> {
private long timeout;
private long createTime;
private Throwable exception = null;
private List<PromiseListener<V>> promiseListenerList;
private List<FutureListener<V>> promiseListenerList;
public DefaultPromise() {
createTime = System.currentTimeMillis();
......@@ -121,7 +121,7 @@ public class DefaultPromise<V> implements Promise<V> {
}
@Override
public void addListener(final PromiseListener<V> listener) {
public void addListener(final FutureListener<V> listener) {
if (listener == null) {
throw new NullPointerException("FutureListener is null");
}
......@@ -150,7 +150,7 @@ public class DefaultPromise<V> implements Promise<V> {
private void notifyListeners() {
if (promiseListenerList != null) {
for (PromiseListener<V> listener : promiseListenerList) {
for (FutureListener<V> listener : promiseListenerList) {
notifyListener(listener);
}
}
......@@ -199,12 +199,9 @@ public class DefaultPromise<V> implements Promise<V> {
return true;
}
private void notifyListener(final PromiseListener<V> listener) {
private void notifyListener(final FutureListener<V> listener) {
try {
if (exception != null)
listener.operationFailed(this);
else
listener.operationCompleted(this);
listener.operationComplete(this);
} catch (Throwable t) {
LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
}
......
......@@ -164,7 +164,7 @@ public final class BeanUtils {
final Set<String> keySet = properties.keySet();
for (String key : keySet) {
String[] keyGroup = key.split("\\.");
String[] keyGroup = key.split("[\\._]");
for (int i = 0; i < keyGroup.length; i++) {
keyGroup[i] = keyGroup[i].toLowerCase();
keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
......
......@@ -18,11 +18,11 @@ package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageHeader;
import io.openmessaging.Message.BuiltinKeys;
import io.openmessaging.OMS;
import io.openmessaging.SendResult;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
......@@ -48,25 +48,26 @@ public class OMSUtil {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody());
KeyValue headers = omsMessage.headers();
KeyValue properties = omsMessage.properties();
KeyValue sysHeaders = omsMessage.sysHeaders();
KeyValue userHeaders = omsMessage.userHeaders();
//All destinations in RocketMQ use Topic
if (headers.containsKey(MessageHeader.TOPIC)) {
rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
} else {
rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
rmqMessage.setTopic(sysHeaders.getString(BuiltinKeys.DESTINATION));
if (sysHeaders.containsKey(BuiltinKeys.START_TIME)) {
long deliverTime = sysHeaders.getLong(BuiltinKeys.START_TIME, 0);
if (deliverTime > 0) {
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
}
for (String key : properties.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
//Headers has a high priority
for (String key : headers.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
//System headers has a high priority
for (String key : sysHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, sysHeaders.getString(key));
}
return rmqMessage;
......@@ -76,8 +77,8 @@ public class OMSUtil {
BytesMessage omsMsg = new BytesMessageImpl();
omsMsg.setBody(rmqMsg.getBody());
KeyValue headers = omsMsg.headers();
KeyValue properties = omsMsg.properties();
KeyValue headers = omsMsg.sysHeaders();
KeyValue properties = omsMsg.userHeaders();
final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
......@@ -89,25 +90,22 @@ public class OMSUtil {
}
}
omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
} else {
omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
}
omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
omsMsg.putSysHeaders(BuiltinKeys.MESSAGE_ID, rmqMsg.getMsgId());
omsMsg.putSysHeaders(BuiltinKeys.DESTINATION, rmqMsg.getTopic());
omsMsg.putSysHeaders(BuiltinKeys.SEARCH_KEYS, rmqMsg.getKeys());
omsMsg.putSysHeaders(BuiltinKeys.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putSysHeaders(BuiltinKeys.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putSysHeaders(BuiltinKeys.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putSysHeaders(BuiltinKeys.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
return omsMsg;
}
public static boolean isOMSHeader(String value) {
for (Field field : MessageHeader.class.getDeclaredFields()) {
for (Field field : BuiltinKeys.class.getDeclaredFields()) {
try {
if (field.get(MessageHeader.class).equals(value)) {
if (field.get(BuiltinKeys.class).equals(value)) {
return true;
}
} catch (IllegalAccessException e) {
......
......@@ -18,12 +18,10 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
......@@ -50,18 +48,18 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPullConsumer(queueName,
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
consumer.attachQueue(queueName);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
field.set(consumer, rocketmqPullConsumer); //Replace
ClientConfig clientConfig = new ClientConfig();
clientConfig.setOmsOperationTimeout(200);
clientConfig.setOperationTimeout(200);
localMessageCache = spy(new LocalMessageCache(rocketmqPullConsumer, clientConfig));
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
......@@ -83,18 +81,18 @@ public class PullConsumerImplTest {
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.poll();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
Message message = consumer.receive();
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout.
Message message = consumer.poll();
Message message = consumer.receive();
assertThat(message).isNull();
message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
assertThat(message).isNull();
}
}
\ No newline at end of file
......@@ -18,13 +18,11 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
......@@ -49,10 +47,10 @@ public class PushConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
......@@ -75,8 +73,8 @@ public class PushConsumerImplTest {
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
public void onReceived(Message message, Context context) {
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
context.ack();
}
......
......@@ -17,9 +17,9 @@
package io.openmessaging.rocketmq.producer;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.OMS;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.producer.Producer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -49,8 +49,8 @@ public class ProducerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
......@@ -67,8 +67,8 @@ public class ProducerImplTest {
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.SendResult omsResult =
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
io.openmessaging.producer.SendResult omsResult =
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
......@@ -80,7 +80,7 @@ public class ProducerImplTest {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......@@ -91,7 +91,7 @@ public class ProducerImplTest {
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
producer.send(producer.createBytesMessage("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.SequenceProducer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SequenceProducerImplTest {
private SequenceProducer producer;
@Mock
private DefaultMQProducer rocketmqProducer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createSequenceProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
}
@Test
public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.commit();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
}
@Test
public void testRollback() {
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.rollback();
producer.commit(); //Commit nothing.
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
}
}
\ No newline at end of file
......@@ -16,8 +16,9 @@
*/
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Future;
import io.openmessaging.FutureListener;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.junit.Before;
import org.junit.Test;
......@@ -63,14 +64,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener() throws Exception {
promise.addListener(new PromiseListener<String>() {
promise.addListener(new FutureListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(promise.get()).isEqualTo("Done");
}
@Override
public void operationFailed(final Promise<String> promise) {
}
});
......@@ -80,15 +77,10 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_ListenerAfterSet() throws Exception {
promise.set("Done");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
assertThat(promise.get()).isEqualTo("Done");
}
promise.addListener(new FutureListener<String>() {
@Override
public void operationFailed(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(future.get()).isEqualTo("Done");
}
});
}
......@@ -97,13 +89,9 @@ public class DefaultPromiseTest {
public void testAddListener_WithException_ListenerAfterSet() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.setFailure(exception);
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
promise.addListener(new FutureListener<String>() {
@Override
public void operationFailed(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
......@@ -112,13 +100,9 @@ public class DefaultPromiseTest {
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
promise.addListener(new FutureListener<String>() {
@Override
public void operationFailed(final Promise<String> promise) {
public void operationComplete(Future<String> future) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
......
......@@ -92,9 +92,9 @@ public class BeanUtilsTest {
@Test
public void testPopulate_ExistObj() {
CustomizedConfig config = new CustomizedConfig();
config.setOmsConsumerId("NewConsumerId");
config.setConsumerId("NewConsumerId");
Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
Assert.assertEquals(config.getConsumerId(), "NewConsumerId");
config = BeanUtils.populate(properties, config);
......
......@@ -592,7 +592,7 @@
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>0.1.0-alpha</version>
<version>0.3.0-alpha-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册