提交 c8e84adf 编写于 作者: Y yukon 提交者: dongeforever

[ROCKETMQ-186] Implement the OpenMessaging specification 0.1.0-alpha version

......@@ -47,6 +47,7 @@
<useAllReactorProjects>true</useAllReactorProjects>
<includes>
<include>org.apache.rocketmq:rocketmq-client</include>
<include>org.apache.rocketmq:rocketmq-openmessaging</include>
</includes>
<binaries>
<outputDirectory>./</outputDirectory>
......
......@@ -68,6 +68,7 @@
<include>org.apache.rocketmq:rocketmq-filtersrv</include>
<include>org.apache.rocketmq:rocketmq-example</include>
<include>org.apache.rocketmq:rocketmq-filter</include>
<include>org.apache.rocketmq:rocketmq-openmessaging</include>
</includes>
<binaries>
<outputDirectory>lib/</outputDirectory>
......
......@@ -48,5 +48,14 @@
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-openmessaging</artifactId>
<version>4.1.0-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.SendResult;
import java.nio.charset.Charset;
public class SimpleProducer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final Producer producer = messagingAccessPoint.createProducer();
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
producer.startup();
System.out.printf("Producer startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
producer.shutdown();
messagingAccessPoint.shutdown();
}
}));
{
Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
SendResult sendResult = producer.send(message);
//final Void aVoid = result.get(3000L);
System.out.printf("Send async message OK, msgId: %s%n", sendResult.messageId());
}
{
final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
result.addListener(new PromiseListener<SendResult>() {
@Override
public void operationCompleted(Promise<SendResult> promise) {
System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
}
@Override
public void operationFailed(Promise<SendResult> promise) {
System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
}
});
}
{
producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
System.out.printf("Send oneway message OK%n");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PullConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.startup();
System.out.printf("Consumer startup OK%n");
while (true) {
Message message = consumer.poll();
if (message != null) {
String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class SimplePushConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
context.ack();
}
});
consumer.startup();
System.out.printf("Consumer startup OK%n");
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmq-all</artifactId>
<groupId>org.apache.rocketmq</groupId>
<version>4.1.0-incubating-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-openmessaging</artifactId>
<name>rocketmq-openmessaging ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
</dependencies>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq;
import io.openmessaging.IterableConsumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.Producer;
import io.openmessaging.PullConsumer;
import io.openmessaging.PushConsumer;
import io.openmessaging.ResourceManager;
import io.openmessaging.SequenceProducer;
import io.openmessaging.ServiceEndPoint;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.observer.Observer;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import io.openmessaging.rocketmq.producer.SequenceProducerImpl;
import io.openmessaging.rocketmq.utils.OMSUtil;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties;
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
this.accessPointProperties = accessPointProperties;
}
@Override
public KeyValue properties() {
return accessPointProperties;
}
@Override
public Producer createProducer() {
return new ProducerImpl(this.accessPointProperties);
}
@Override
public Producer createProducer(KeyValue properties) {
return new ProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public SequenceProducer createSequenceProducer() {
return new SequenceProducerImpl(this.accessPointProperties);
}
@Override
public SequenceProducer createSequenceProducer(KeyValue properties) {
return new SequenceProducerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
@Override
public PushConsumer createPushConsumer(KeyValue properties) {
return new PushConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public PullConsumer createPullConsumer(String queueName) {
return new PullConsumerImpl(queueName, accessPointProperties);
}
@Override
public PullConsumer createPullConsumer(String queueName, KeyValue properties) {
return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties));
}
@Override
public IterableConsumer createIterableConsumer(String queueName) {
throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
}
@Override
public IterableConsumer createIterableConsumer(String queueName, KeyValue properties) {
throw new OMSNotSupportedException("-1", "IterableConsumer is not supported in current version");
}
@Override
public ResourceManager getResourceManager() {
throw new OMSNotSupportedException("-1", "ResourceManager is not supported in current version.");
}
@Override
public ServiceEndPoint createServiceEndPoint() {
throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
}
@Override
public ServiceEndPoint createServiceEndPoint(KeyValue properties) {
throw new OMSNotSupportedException("-1", "ServiceEndPoint is not supported in current version.");
}
@Override
public void addObserver(Observer observer) {
//Ignore
}
@Override
public void deleteObserver(Observer observer) {
//Ignore
}
@Override
public void startup() {
//Ignore
}
@Override
public void shutdown() {
//Ignore
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.config;
import io.openmessaging.PropertyKeys;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
public class ClientConfig implements PropertyKeys, NonStandardKeys {
private String omsDriverImpl;
private String omsAccessPoints;
private String omsNamespace;
private String omsProducerId;
private String omsConsumerId;
private int omsOperationTimeout = 5000;
private String omsRoutingName;
private String omsOperatorName;
private String omsDstQueue;
private String omsSrcTopic;
private String rmqConsumerGroup;
private String rmqProducerGroup = "__OMS_PRODUCER_DEFAULT_GROUP";
private int rmqMaxRedeliveryTimes = 16;
private int rmqMessageConsumeTimeout = 15; //In minutes
private int rmqMaxConsumeThreadNums = 64;
private int rmqMinConsumeThreadNums = 20;
private String rmqMessageDestination;
private int rmqPullMessageBatchNums = 32;
private int rmqPullMessageCacheCapacity = 1000;
public String getOmsDriverImpl() {
return omsDriverImpl;
}
public void setOmsDriverImpl(final String omsDriverImpl) {
this.omsDriverImpl = omsDriverImpl;
}
public String getOmsAccessPoints() {
return omsAccessPoints;
}
public void setOmsAccessPoints(final String omsAccessPoints) {
this.omsAccessPoints = omsAccessPoints;
}
public String getOmsNamespace() {
return omsNamespace;
}
public void setOmsNamespace(final String omsNamespace) {
this.omsNamespace = omsNamespace;
}
public String getOmsProducerId() {
return omsProducerId;
}
public void setOmsProducerId(final String omsProducerId) {
this.omsProducerId = omsProducerId;
}
public String getOmsConsumerId() {
return omsConsumerId;
}
public void setOmsConsumerId(final String omsConsumerId) {
this.omsConsumerId = omsConsumerId;
}
public int getOmsOperationTimeout() {
return omsOperationTimeout;
}
public void setOmsOperationTimeout(final int omsOperationTimeout) {
this.omsOperationTimeout = omsOperationTimeout;
}
public String getOmsRoutingName() {
return omsRoutingName;
}
public void setOmsRoutingName(final String omsRoutingName) {
this.omsRoutingName = omsRoutingName;
}
public String getOmsOperatorName() {
return omsOperatorName;
}
public void setOmsOperatorName(final String omsOperatorName) {
this.omsOperatorName = omsOperatorName;
}
public String getOmsDstQueue() {
return omsDstQueue;
}
public void setOmsDstQueue(final String omsDstQueue) {
this.omsDstQueue = omsDstQueue;
}
public String getOmsSrcTopic() {
return omsSrcTopic;
}
public void setOmsSrcTopic(final String omsSrcTopic) {
this.omsSrcTopic = omsSrcTopic;
}
public String getRmqConsumerGroup() {
return rmqConsumerGroup;
}
public void setRmqConsumerGroup(final String rmqConsumerGroup) {
this.rmqConsumerGroup = rmqConsumerGroup;
}
public String getRmqProducerGroup() {
return rmqProducerGroup;
}
public void setRmqProducerGroup(final String rmqProducerGroup) {
this.rmqProducerGroup = rmqProducerGroup;
}
public int getRmqMaxRedeliveryTimes() {
return rmqMaxRedeliveryTimes;
}
public void setRmqMaxRedeliveryTimes(final int rmqMaxRedeliveryTimes) {
this.rmqMaxRedeliveryTimes = rmqMaxRedeliveryTimes;
}
public int getRmqMessageConsumeTimeout() {
return rmqMessageConsumeTimeout;
}
public void setRmqMessageConsumeTimeout(final int rmqMessageConsumeTimeout) {
this.rmqMessageConsumeTimeout = rmqMessageConsumeTimeout;
}
public int getRmqMaxConsumeThreadNums() {
return rmqMaxConsumeThreadNums;
}
public void setRmqMaxConsumeThreadNums(final int rmqMaxConsumeThreadNums) {
this.rmqMaxConsumeThreadNums = rmqMaxConsumeThreadNums;
}
public int getRmqMinConsumeThreadNums() {
return rmqMinConsumeThreadNums;
}
public void setRmqMinConsumeThreadNums(final int rmqMinConsumeThreadNums) {
this.rmqMinConsumeThreadNums = rmqMinConsumeThreadNums;
}
public String getRmqMessageDestination() {
return rmqMessageDestination;
}
public void setRmqMessageDestination(final String rmqMessageDestination) {
this.rmqMessageDestination = rmqMessageDestination;
}
public int getRmqPullMessageBatchNums() {
return rmqPullMessageBatchNums;
}
public void setRmqPullMessageBatchNums(final int rmqPullMessageBatchNums) {
this.rmqPullMessageBatchNums = rmqPullMessageBatchNums;
}
public int getRmqPullMessageCacheCapacity() {
return rmqPullMessageCacheCapacity;
}
public void setRmqPullMessageCacheCapacity(final int rmqPullMessageCacheCapacity) {
this.rmqPullMessageCacheCapacity = rmqPullMessageCacheCapacity;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.PropertyKeys;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.slf4j.Logger;
class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
private final Map<String, ConsumeRequest> consumedRequest;
private final ConcurrentHashMap<MessageQueue, Long> pullOffsetTable;
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final ClientConfig clientConfig;
private final ScheduledExecutorService cleanExpireMsgExecutors;
private final static Logger log = ClientLogger.getLog();
LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final ClientConfig clientConfig) {
consumeRequestCache = new LinkedBlockingQueue<>(clientConfig.getRmqPullMessageCacheCapacity());
this.consumedRequest = new ConcurrentHashMap<>();
this.pullOffsetTable = new ConcurrentHashMap<>();
this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"OMS_CleanExpireMsgScheduledThread_"));
}
int nextPullBatchNums() {
return Math.min(clientConfig.getRmqPullMessageBatchNums(), consumeRequestCache.remainingCapacity());
}
long nextPullOffset(MessageQueue remoteQueue) {
if (!pullOffsetTable.containsKey(remoteQueue)) {
try {
pullOffsetTable.putIfAbsent(remoteQueue,
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
} catch (MQClientException e) {
log.error("A error occurred in fetch consume offset process.", e);
}
}
return pullOffsetTable.get(remoteQueue);
}
void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) {
pullOffsetTable.put(remoteQueue, nextPullOffset);
}
void submitConsumeRequest(ConsumeRequest consumeRequest) {
try {
consumeRequestCache.put(consumeRequest);
} catch (InterruptedException ignore) {
}
}
MessageExt poll() {
return poll(clientConfig.getOmsOperationTimeout());
}
MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOmsOperationTimeout();
if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) {
currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT);
}
return poll(currentPollTimeout);
}
private MessageExt poll(long timeout) {
try {
ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS);
if (consumeRequest != null) {
MessageExt messageExt = consumeRequest.getMessageExt();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
consumedRequest.put(messageExt.getMsgId(), consumeRequest);
return messageExt;
}
} catch (InterruptedException ignore) {
}
return null;
}
void ack(final String messageId) {
ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
if (consumeRequest != null) {
long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt()));
try {
rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
}
}
}
void ack(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) {
consumedRequest.remove(messageExt.getMsgId());
long offset = processQueue.removeMessage(Collections.singletonList(messageExt));
try {
rocketmqPullConsumer.updateConsumeOffset(messageQueue, offset);
} catch (MQClientException e) {
log.error("A error occurred in update consume offset process.", e);
}
}
@Override
public void startup() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
}
@Override
public void shutdown() {
ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
}
private void cleanExpireMsg() {
for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
.getRebalanceImpl().getProcessQueueTable().entrySet()) {
ProcessQueue pq = next.getValue();
MessageQueue mq = next.getKey();
ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
if (lockTreeMap == null) {
log.error("Gets tree map lock in process queue error, may be has compatibility issue");
return;
}
TreeMap<Long, MessageExt> msgTreeMap = pq.getMsgTreeMap();
int loop = msgTreeMap.size();
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
lockTreeMap.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
} else {
break;
}
} else {
break;
}
} finally {
lockTreeMap.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("Gets expired message exception", e);
}
try {
rocketmqPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
ack(mq, pq, msg);
} catch (Exception e) {
log.error("Send back expired msg exception", e);
}
}
}
}
private ReadWriteLock getLockInProcessQueue(ProcessQueue pq) {
try {
return (ReadWriteLock) FieldUtils.readDeclaredField(pq, "lockTreeMap", true);
} catch (IllegalAccessException e) {
return null;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
private boolean started = false;
private String targetQueueName;
private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
final static Logger log = ClientLogger.getLog();
public PullConsumerImpl(final String queueName, final KeyValue properties) {
this.properties = properties;
this.targetQueueName = queueName;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String consumerGroup = clientConfig.getRmqConsumerGroup();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
}
pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
this.rocketmqPullConsumer.setConsumerGroup(consumerGroup);
int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public Message poll() {
MessageExt rmqMsg = localMessageCache.poll();
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
public Message poll(final KeyValue properties) {
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
}
@Override
public void ack(final String messageId) {
localMessageCache.ack(messageId);
}
@Override
public void ack(final String messageId, final KeyValue properties) {
localMessageCache.ack(messageId);
}
@Override
public synchronized void startup() {
if (!started) {
try {
registerPullTaskCallback();
this.pullConsumerScheduleService.start();
this.localMessageCache.startup();
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", e);
}
}
this.started = true;
}
private void registerPullTaskCallback() {
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
@Override
public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = localMessageCache.nextPullOffset(mq);
PullResult pullResult = consumer.pull(mq, "*",
offset, localMessageCache.nextPullBatchNums());
ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(mq);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pq != null) {
pq.putMessage(pullResult.getMsgFoundList());
for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
}
}
break;
default:
break;
}
localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("A error occurred in pull message process.", e);
}
}
});
}
@Override
public synchronized void shutdown() {
if (this.started) {
this.localMessageCache.shutdown();
this.pullConsumerScheduleService.shutdown();
this.rocketmqPullConsumer.shutdown();
}
this.started = false;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageListener;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumerImpl implements PushConsumer {
private final DefaultMQPushConsumer rocketmqPushConsumer;
private final KeyValue properties;
private boolean started = false;
private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
private final ClientConfig clientConfig;
public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer();
this.properties = properties;
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
String consumerGroup = clientConfig.getRmqConsumerGroup();
if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it.");
}
this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
this.rocketmqPushConsumer.setConsumeTimeout(clientConfig.getRmqMessageConsumeTimeout());
this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(PropertyKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public void resume() {
this.rocketmqPushConsumer.resume();
}
@Override
public void suspend() {
this.rocketmqPushConsumer.suspend();
}
@Override
public boolean isSuspended() {
return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
}
@Override
public PushConsumer attachQueue(final String queueName, final MessageListener listener) {
this.subscribeTable.put(queueName, listener);
try {
this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
return this;
}
@Override
public synchronized void startup() {
if (!started) {
try {
this.rocketmqPushConsumer.start();
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", e);
}
}
this.started = true;
}
@Override
public synchronized void shutdown() {
if (this.started) {
this.rocketmqPushConsumer.shutdown();
}
this.started = false;
}
class MessageListenerImpl implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
ConsumeConcurrentlyContext contextRMQ) {
MessageExt rmqMsg = rmqMsgList.get(0);
BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg);
MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic());
if (listener == null) {
throw new OMSRuntimeException("-1",
String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
}
final KeyValue contextProperties = OMS.newKeyValue();
final CountDownLatch sync = new CountDownLatch(1);
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
ReceivedMessageContext context = new ReceivedMessageContext() {
@Override
public KeyValue properties() {
return contextProperties;
}
@Override
public void ack() {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
@Override
public void ack(final KeyValue properties) {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
};
long begin = System.currentTimeMillis();
listener.onMessage(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try {
sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
}
return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.OMS;
import org.apache.commons.lang3.builder.ToStringBuilder;
public class BytesMessageImpl implements BytesMessage {
private KeyValue headers;
private KeyValue properties;
private byte[] body;
public BytesMessageImpl() {
this.headers = OMS.newKeyValue();
this.properties = OMS.newKeyValue();
}
@Override
public byte[] getBody() {
return body;
}
@Override
public BytesMessage setBody(final byte[] body) {
this.body = body;
return this;
}
@Override
public KeyValue headers() {
return headers;
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public Message putHeaders(final String key, final int value) {
headers.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final long value) {
headers.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final double value) {
headers.put(key, value);
return this;
}
@Override
public Message putHeaders(final String key, final String value) {
headers.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final int value) {
properties.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final long value) {
properties.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final double value) {
properties.put(key, value);
return this;
}
@Override
public Message putProperties(final String key, final String value) {
properties.put(key, value);
return this;
}
@Override
public String toString() {
return ToStringBuilder.reflectionToString(this);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
public class ConsumeRequest {
private final MessageExt messageExt;
private final MessageQueue messageQueue;
private final ProcessQueue processQueue;
private long startConsumeTimeMillis;
public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue,
final ProcessQueue processQueue) {
this.messageExt = messageExt;
this.messageQueue = messageQueue;
this.processQueue = processQueue;
}
public MessageExt getMessageExt() {
return messageExt;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public long getStartConsumeTimeMillis() {
return startConsumeTimeMillis;
}
public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) {
this.startConsumeTimeMillis = startConsumeTimeMillis;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
public interface NonStandardKeys {
String CONSUMER_GROUP = "rmq.consumer.group";
String PRODUCER_GROUP = "rmq.producer.group";
String MAX_REDELIVERY_TIMES = "rmq.max.redelivery.times";
String MESSAGE_CONSUME_TIMEOUT = "rmq.message.consume.timeout";
String MAX_CONSUME_THREAD_NUMS = "rmq.max.consume.thread.nums";
String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums";
String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status";
String MESSAGE_DESTINATION = "rmq.message.destination";
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.KeyValue;
import io.openmessaging.SendResult;
public class SendResultImpl implements SendResult {
private String messageId;
private KeyValue properties;
public SendResultImpl(final String messageId, final KeyValue properties) {
this.messageId = messageId;
this.properties = properties;
}
@Override
public String messageId() {
return messageId;
}
@Override
public KeyValue properties() {
return properties;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageFactory;
import io.openmessaging.MessageHeader;
import io.openmessaging.PropertyKeys;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSNotSupportedException;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.exception.OMSTimeOutException;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.utils.BeanUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.slf4j.Logger;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final static Logger log = ClientLogger.getLog();
final KeyValue properties;
final DefaultMQProducer rocketmqProducer;
private boolean started = false;
final ClientConfig clientConfig;
AbstractOMSProducer(final KeyValue properties) {
this.properties = properties;
this.rocketmqProducer = new DefaultMQProducer();
this.clientConfig = BeanUtils.populate(properties, ClientConfig.class);
String accessPoints = clientConfig.getOmsAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty.");
}
this.rocketmqProducer.setNamesrvAddr(accessPoints.replace(',', ';'));
this.rocketmqProducer.setProducerGroup(clientConfig.getRmqProducerGroup());
String producerId = buildInstanceName();
this.rocketmqProducer.setSendMsgTimeout(clientConfig.getOmsOperationTimeout());
this.rocketmqProducer.setInstanceName(producerId);
this.rocketmqProducer.setMaxMessageSize(1024 * 1024 * 4);
properties.put(PropertyKeys.PRODUCER_ID, producerId);
}
@Override
public synchronized void startup() {
if (!started) {
try {
this.rocketmqProducer.start();
} catch (MQClientException e) {
throw new OMSRuntimeException("-1", e);
}
}
this.started = true;
}
@Override
public synchronized void shutdown() {
if (this.started) {
this.rocketmqProducer.shutdown();
}
this.started = false;
}
OMSRuntimeException checkProducerException(String topic, String msgId, Throwable e) {
if (e instanceof MQClientException) {
if (e.getCause() != null) {
if (e.getCause() instanceof RemotingTimeoutException) {
return new OMSTimeOutException("-1", String.format("Send message to broker timeout, %dms, Topic=%s, msgId=%s",
this.rocketmqProducer.getSendMsgTimeout(), topic, msgId), e);
} else if (e.getCause() instanceof MQBrokerException || e.getCause() instanceof RemotingConnectException) {
MQBrokerException brokerException = (MQBrokerException) e.getCause();
return new OMSRuntimeException("-1", String.format("Received a broker exception, Topic=%s, msgId=%s, %s",
topic, msgId, brokerException.getErrorMessage()), e);
}
}
// Exception thrown by local.
else {
MQClientException clientException = (MQClientException) e;
if (-1 == clientException.getResponseCode()) {
return new OMSRuntimeException("-1", String.format("Topic does not exist, Topic=%s, msgId=%s",
topic, msgId), e);
} else if (ResponseCode.MESSAGE_ILLEGAL == clientException.getResponseCode()) {
return new OMSMessageFormatException("-1", String.format("A illegal message for RocketMQ, Topic=%s, msgId=%s",
topic, msgId), e);
}
}
}
return new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.", e);
}
protected void checkMessageType(Message message) {
if (!(message instanceof BytesMessage)) {
throw new OMSNotSupportedException("-1", "Only BytesMessage is supported.");
}
}
@Override
public BytesMessage createBytesMessageToTopic(final String topic, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.TOPIC, topic);
return bytesMessage;
}
@Override
public BytesMessage createBytesMessageToQueue(final String queue, final byte[] body) {
BytesMessage bytesMessage = new BytesMessageImpl();
bytesMessage.setBody(body);
bytesMessage.headers().put(MessageHeader.QUEUE, queue);
return bytesMessage;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.Producer;
import io.openmessaging.Promise;
import io.openmessaging.PropertyKeys;
import io.openmessaging.SendResult;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus;
import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
public class ProducerImpl extends AbstractOMSProducer implements Producer {
public ProducerImpl(final KeyValue properties) {
super(properties);
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public SendResult send(final Message message) {
return send(message, this.rocketmqProducer.getSendMsgTimeout());
}
@Override
public SendResult send(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return send(message, timeout);
}
private SendResult send(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
org.apache.rocketmq.client.producer.SendResult rmqResult = this.rocketmqProducer.send(rmqMessage, timeout);
if (!rmqResult.getSendStatus().equals(SendStatus.SEND_OK)) {
log.error(String.format("Send message to RocketMQ failed, %s", message));
throw new OMSRuntimeException("-1", "Send message to RocketMQ broker failed.");
}
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
}
}
@Override
public Promise<SendResult> sendAsync(final Message message) {
return sendAsync(message, this.rocketmqProducer.getSendMsgTimeout());
}
@Override
public Promise<SendResult> sendAsync(final Message message, final KeyValue properties) {
long timeout = properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)
? properties.getInt(PropertyKeys.OPERATION_TIMEOUT) : this.rocketmqProducer.getSendMsgTimeout();
return sendAsync(message, timeout);
}
private Promise<SendResult> sendAsync(final Message message, long timeout) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
final Promise<SendResult> promise = new DefaultPromise<>();
try {
this.rocketmqProducer.send(rmqMessage, new SendCallback() {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.headers().put(MessageHeader.MESSAGE_ID, rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
}
@Override
public void onException(final Throwable e) {
promise.setFailure(e);
}
}, timeout);
} catch (Exception e) {
promise.setFailure(e);
}
return promise;
}
@Override
public void sendOneway(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = msgConvert((BytesMessage) message);
try {
this.rocketmqProducer.sendOneway(rmqMessage);
} catch (Exception ignore) { //Ignore the oneway exception.
}
}
@Override
public void sendOneway(final Message message, final KeyValue properties) {
sendOneway(message);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.SequenceProducer;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {
private BlockingQueue<Message> msgCacheQueue;
public SequenceProducerImpl(final KeyValue properties) {
super(properties);
this.msgCacheQueue = new LinkedBlockingQueue<>();
}
@Override
public KeyValue properties() {
return properties;
}
@Override
public void send(final Message message) {
checkMessageType(message);
org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
try {
Validators.checkMessage(rmqMessage, this.rocketmqProducer);
} catch (MQClientException e) {
throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
}
msgCacheQueue.add(message);
}
@Override
public void send(final Message message, final KeyValue properties) {
send(message);
}
@Override
public synchronized void commit() {
List<Message> messages = new ArrayList<>();
msgCacheQueue.drainTo(messages);
List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();
for (Message message : messages) {
rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
}
if (rmqMessages.size() == 0) {
return;
}
try {
SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
String[] msgIdArray = sendResult.getMsgId().split(",");
for (int i = 0; i < messages.size(); i++) {
Message message = messages.get(i);
message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
}
} catch (Exception e) {
throw checkProducerException("", "", e);
}
}
@Override
public synchronized void rollback() {
msgCacheQueue.clear();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultPromise<V> implements Promise<V> {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPromise.class);
private final Object lock = new Object();
private volatile FutureState state = FutureState.DOING;
private V result = null;
private long timeout;
private long createTime;
private Throwable exception = null;
private List<PromiseListener<V>> promiseListenerList;
public DefaultPromise() {
createTime = System.currentTimeMillis();
promiseListenerList = new ArrayList<>();
timeout = 5000;
}
@Override
public boolean cancel(final boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return state.isCancelledState();
}
@Override
public boolean isDone() {
return state.isDoneState();
}
@Override
public V get() {
return result;
}
@Override
public V get(final long timeout) {
synchronized (lock) {
if (!isDoing()) {
return getValueOrThrowable();
}
if (timeout <= 0) {
try {
lock.wait();
} catch (Exception e) {
cancel(e);
}
return getValueOrThrowable();
} else {
long waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime > 0) {
for (;; ) {
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
LOG.error("promise get value interrupted,excepiton:{}", e.getMessage());
}
if (!isDoing()) {
break;
} else {
waitTime = timeout - (System.currentTimeMillis() - createTime);
if (waitTime <= 0) {
break;
}
}
}
}
if (isDoing()) {
timeoutSoCancel();
}
}
return getValueOrThrowable();
}
}
@Override
public boolean set(final V value) {
if (value == null)
return false;
this.result = value;
return done();
}
@Override
public boolean setFailure(final Throwable cause) {
if (cause == null)
return false;
this.exception = cause;
return done();
}
@Override
public void addListener(final PromiseListener<V> listener) {
if (listener == null) {
throw new NullPointerException("FutureListener is null");
}
boolean notifyNow = false;
synchronized (lock) {
if (!isDoing()) {
notifyNow = true;
} else {
if (promiseListenerList == null) {
promiseListenerList = new ArrayList<>();
}
promiseListenerList.add(listener);
}
}
if (notifyNow) {
notifyListener(listener);
}
}
@Override
public Throwable getThrowable() {
return exception;
}
private void notifyListeners() {
if (promiseListenerList != null) {
for (PromiseListener<V> listener : promiseListenerList) {
notifyListener(listener);
}
}
}
private boolean isSuccess() {
return isDone() && (exception == null);
}
private void timeoutSoCancel() {
synchronized (lock) {
if (!isDoing()) {
return;
}
state = FutureState.CANCELLED;
exception = new RuntimeException("Get request result is timeout or interrupted");
lock.notifyAll();
}
notifyListeners();
}
private V getValueOrThrowable() {
if (exception != null) {
Throwable e = exception.getCause() != null ? exception.getCause() : exception;
throw new OMSRuntimeException("-1", e);
}
notifyListeners();
return result;
}
private boolean isDoing() {
return state.isDoingState();
}
private boolean done() {
synchronized (lock) {
if (!isDoing()) {
return false;
}
state = FutureState.DONE;
lock.notifyAll();
}
notifyListeners();
return true;
}
private void notifyListener(final PromiseListener<V> listener) {
try {
if (exception != null)
listener.operationFailed(this);
else
listener.operationCompleted(this);
} catch (Throwable t) {
LOG.error("notifyListener {} Error:{}", listener.getClass().getSimpleName(), t);
}
}
private boolean cancel(Exception e) {
synchronized (lock) {
if (!isDoing()) {
return false;
}
state = FutureState.CANCELLED;
exception = e;
lock.notifyAll();
}
notifyListeners();
return true;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.promise;
public enum FutureState {
/**
* the task is doing
**/
DOING(0),
/**
* the task is done
**/
DONE(1),
/**
* ths task is cancelled
**/
CANCELLED(2);
public final int value;
private FutureState(int value) {
this.value = value;
}
public boolean isCancelledState() {
return this == CANCELLED;
}
public boolean isDoneState() {
return this == DONE;
}
public boolean isDoingState() {
return this == DOING;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.utils;
import io.openmessaging.KeyValue;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.log.ClientLogger;
import org.slf4j.Logger;
public final class BeanUtils {
final static Logger log = ClientLogger.getLog();
/**
* Maps primitive {@code Class}es to their corresponding wrapper {@code Class}.
*/
private static Map<Class<?>, Class<?>> primitiveWrapperMap = new HashMap<Class<?>, Class<?>>();
static {
primitiveWrapperMap.put(Boolean.TYPE, Boolean.class);
primitiveWrapperMap.put(Byte.TYPE, Byte.class);
primitiveWrapperMap.put(Character.TYPE, Character.class);
primitiveWrapperMap.put(Short.TYPE, Short.class);
primitiveWrapperMap.put(Integer.TYPE, Integer.class);
primitiveWrapperMap.put(Long.TYPE, Long.class);
primitiveWrapperMap.put(Double.TYPE, Double.class);
primitiveWrapperMap.put(Float.TYPE, Float.class);
primitiveWrapperMap.put(Void.TYPE, Void.TYPE);
}
private static Map<Class<?>, Class<?>> wrapperMap = new HashMap<Class<?>, Class<?>>();
static {
for (final Class<?> primitiveClass : primitiveWrapperMap.keySet()) {
final Class<?> wrapperClass = primitiveWrapperMap.get(primitiveClass);
if (!primitiveClass.equals(wrapperClass)) {
wrapperMap.put(wrapperClass, primitiveClass);
}
}
wrapperMap.put(String.class, String.class);
}
/**
* <p>Populate the JavaBeans properties of the specified bean, based on
* the specified name/value pairs. This method uses Java reflection APIs
* to identify corresponding "property setter" method names, and deals
* with setter arguments of type <Code>String</Code>, <Code>boolean</Code>,
* <Code>int</Code>, <Code>long</Code>, <Code>float</Code>, and
* <Code>double</Code>.</p>
*
* <p>The particular setter method to be called for each property is
* determined using the usual JavaBeans introspection mechanisms. Thus,
* you may identify custom setter methods using a BeanInfo class that is
* associated with the class of the bean itself. If no such BeanInfo
* class is available, the standard method name conversion ("set" plus
* the capitalized name of the property in question) is used.</p>
*
* <p><strong>NOTE</strong>: It is contrary to the JavaBeans Specification
* to have more than one setter method (with different argument
* signatures) for the same property.</p>
*
* @param clazz JavaBean class whose properties are being populated
* @param properties Map keyed by property name, with the corresponding (String or String[]) value(s) to be set
* @param <T> Class type
* @return Class instance
*/
public static <T> T populate(final Properties properties, final Class<T> clazz) {
T obj = null;
try {
obj = clazz.newInstance();
return populate(properties, obj);
} catch (Throwable e) {
log.warn("Error occurs !", e);
}
return obj;
}
public static <T> T populate(final KeyValue properties, final Class<T> clazz) {
T obj = null;
try {
obj = clazz.newInstance();
return populate(properties, obj);
} catch (Throwable e) {
log.warn("Error occurs !", e);
}
return obj;
}
public static Class<?> getMethodClass(Class<?> clazz, String methodName) {
Method[] methods = clazz.getMethods();
for (Method method : methods) {
if (method.getName().equalsIgnoreCase(methodName)) {
return method.getParameterTypes()[0];
}
}
return null;
}
public static void setProperties(Class<?> clazz, Object obj, String methodName,
Object value) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<?> parameterClass = getMethodClass(clazz, methodName);
Method setterMethod = clazz.getMethod(methodName, parameterClass);
if (parameterClass == Boolean.TYPE) {
setterMethod.invoke(obj, Boolean.valueOf(value.toString()));
} else if (parameterClass == Integer.TYPE) {
setterMethod.invoke(obj, Integer.valueOf(value.toString()));
} else if (parameterClass == Double.TYPE) {
setterMethod.invoke(obj, Double.valueOf(value.toString()));
} else if (parameterClass == Float.TYPE) {
setterMethod.invoke(obj, Float.valueOf(value.toString()));
} else if (parameterClass == Long.TYPE) {
setterMethod.invoke(obj, Long.valueOf(value.toString()));
} else
setterMethod.invoke(obj, value);
}
public static <T> T populate(final Properties properties, final T obj) {
Class<?> clazz = obj.getClass();
try {
Set<Map.Entry<Object, Object>> entries = properties.entrySet();
for (Map.Entry<Object, Object> entry : entries) {
String entryKey = entry.getKey().toString();
String[] keyGroup = entryKey.split("\\.");
for (int i = 0; i < keyGroup.length; i++) {
keyGroup[i] = keyGroup[i].toLowerCase();
keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
}
String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
try {
setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, entry.getValue());
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
//ignored...
}
}
} catch (RuntimeException e) {
log.warn("Error occurs !", e);
}
return obj;
}
public static <T> T populate(final KeyValue properties, final T obj) {
Class<?> clazz = obj.getClass();
try {
final Set<String> keySet = properties.keySet();
for (String key : keySet) {
String[] keyGroup = key.split("\\.");
for (int i = 0; i < keyGroup.length; i++) {
keyGroup[i] = keyGroup[i].toLowerCase();
keyGroup[i] = StringUtils.capitalize(keyGroup[i]);
}
String beanFieldNameWithCapitalization = StringUtils.join(keyGroup);
try {
setProperties(clazz, obj, "set" + beanFieldNameWithCapitalization, properties.getString(key));
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ignored) {
//ignored...
}
}
} catch (RuntimeException e) {
log.warn("Error occurs !", e);
}
return obj;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.utils;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue;
import io.openmessaging.MessageHeader;
import io.openmessaging.OMS;
import io.openmessaging.SendResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
public class OMSUtil {
/**
* Builds a OMS client instance name.
*
* @return a unique instance name
*/
public static String buildInstanceName() {
return Integer.toString(UtilAll.getPid()) + "%OpenMessaging" + "%" + System.nanoTime();
}
public static org.apache.rocketmq.common.message.Message msgConvert(BytesMessage omsMessage) {
org.apache.rocketmq.common.message.Message rmqMessage = new org.apache.rocketmq.common.message.Message();
rmqMessage.setBody(omsMessage.getBody());
KeyValue headers = omsMessage.headers();
KeyValue properties = omsMessage.properties();
//All destinations in RocketMQ use Topic
if (headers.containsKey(MessageHeader.TOPIC)) {
rmqMessage.setTopic(headers.getString(MessageHeader.TOPIC));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
} else {
rmqMessage.setTopic(headers.getString(MessageHeader.QUEUE));
rmqMessage.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "QUEUE");
}
for (String key : properties.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, properties.getString(key));
}
//Headers has a high priority
for (String key : headers.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, headers.getString(key));
}
return rmqMessage;
}
public static BytesMessage msgConvert(org.apache.rocketmq.common.message.MessageExt rmqMsg) {
BytesMessage omsMsg = new BytesMessageImpl();
omsMsg.setBody(rmqMsg.getBody());
KeyValue headers = omsMsg.headers();
KeyValue properties = omsMsg.properties();
final Set<Map.Entry<String, String>> entries = rmqMsg.getProperties().entrySet();
for (final Map.Entry<String, String> entry : entries) {
if (isOMSHeader(entry.getKey())) {
headers.put(entry.getKey(), entry.getValue());
} else {
properties.put(entry.getKey(), entry.getValue());
}
}
omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId());
if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) ||
rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) {
omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic());
} else {
omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic());
}
omsMsg.putHeaders(MessageHeader.SEARCH_KEY, rmqMsg.getKeys());
omsMsg.putHeaders(MessageHeader.BORN_HOST, String.valueOf(rmqMsg.getBornHost()));
omsMsg.putHeaders(MessageHeader.BORN_TIMESTAMP, rmqMsg.getBornTimestamp());
omsMsg.putHeaders(MessageHeader.STORE_HOST, String.valueOf(rmqMsg.getStoreHost()));
omsMsg.putHeaders(MessageHeader.STORE_TIMESTAMP, rmqMsg.getStoreTimestamp());
return omsMsg;
}
public static boolean isOMSHeader(String value) {
for (Field field : MessageHeader.class.getDeclaredFields()) {
try {
if (field.get(MessageHeader.class).equals(value)) {
return true;
}
} catch (IllegalAccessException e) {
return false;
}
}
return false;
}
/**
* Convert a RocketMQ SEND_OK SendResult instance to a OMS SendResult.
*/
public static SendResult sendResultConvert(org.apache.rocketmq.client.producer.SendResult rmqResult) {
assert rmqResult.getSendStatus().equals(SendStatus.SEND_OK);
return new SendResultImpl(rmqResult.getMsgId(), OMS.newKeyValue());
}
public static KeyValue buildKeyValue(KeyValue... keyValues) {
KeyValue keyValue = OMS.newKeyValue();
for (KeyValue properties : keyValues) {
for (String key : properties.keySet()) {
keyValue.put(key, properties.getString(key));
}
}
return keyValue;
}
/**
* Returns an iterator that cycles indefinitely over the elements of {@code Iterable}.
*/
public static <T> Iterator<T> cycle(final Iterable<T> iterable) {
return new Iterator<T>() {
Iterator<T> iterator = new Iterator<T>() {
@Override
public synchronized boolean hasNext() {
return false;
}
@Override
public synchronized T next() {
throw new NoSuchElementException();
}
@Override
public synchronized void remove() {
//Ignore
}
};
@Override
public synchronized boolean hasNext() {
return iterator.hasNext() || iterable.iterator().hasNext();
}
@Override
public synchronized T next() {
if (!iterator.hasNext()) {
iterator = iterable.iterator();
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
}
return iterator.next();
}
@Override
public synchronized void remove() {
iterator.remove();
}
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class LocalMessageCacheTest {
private LocalMessageCache localMessageCache;
@Mock
private DefaultMQPullConsumer rocketmqPullConsume;
@Mock
private ConsumeRequest consumeRequest;
@Before
public void init() {
ClientConfig clientConfig = new ClientConfig();
clientConfig.setRmqPullMessageBatchNums(512);
clientConfig.setRmqPullMessageCacheCapacity(1024);
localMessageCache = new LocalMessageCache(rocketmqPullConsume, clientConfig);
}
@Test
public void testNextPullBatchNums() throws Exception {
assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(512);
for (int i = 0; i < 513; i++) {
localMessageCache.submitConsumeRequest(consumeRequest);
}
assertThat(localMessageCache.nextPullBatchNums()).isEqualTo(511);
}
@Test
public void testNextPullOffset() throws Exception {
MessageQueue messageQueue = new MessageQueue();
when(rocketmqPullConsume.fetchConsumeOffset(any(MessageQueue.class), anyBoolean()))
.thenReturn(123L);
assertThat(localMessageCache.nextPullOffset(new MessageQueue())).isEqualTo(123L);
}
@Test
public void testUpdatePullOffset() throws Exception {
MessageQueue messageQueue = new MessageQueue();
localMessageCache.updatePullOffset(messageQueue, 124L);
assertThat(localMessageCache.nextPullOffset(messageQueue)).isEqualTo(124L);
}
@Test
public void testSubmitConsumeRequest() throws Exception {
byte [] body = new byte[]{'1', '2', '3'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(body);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
localMessageCache.submitConsumeRequest(consumeRequest);
assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PropertyKeys;
import io.openmessaging.PullConsumer;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest {
private PullConsumer consumer;
private String queueName = "HELLO_QUEUE";
@Mock
private DefaultMQPullConsumer rocketmqPullConsumer;
private LocalMessageCache localMessageCache =
spy(new LocalMessageCache(rocketmqPullConsumer, new ClientConfig()));
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPullConsumer(queueName,
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
field.set(consumer, rocketmqPullConsumer); //Replace
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
field.setAccessible(true);
field.set(consumer, localMessageCache);
messagingAccessPoint.startup();
consumer.startup();
}
@Test
public void testPoll() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
when(localMessageCache.poll()).thenReturn(consumedMsg);
Message message = consumer.poll();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
}
@Test
public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout.
Message message = consumer.poll();
assertThat(message).isNull();
message = consumer.poll(OMS.newKeyValue().put(PropertyKeys.OPERATION_TIMEOUT, 100));
assertThat(message).isNull();
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.Message;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessageListener;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.OMS;
import io.openmessaging.PushConsumer;
import io.openmessaging.ReceivedMessageContext;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest {
private PushConsumer consumer;
@Mock
private DefaultMQPushConsumer rocketmqPushConsumer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "TestGroup"));
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
field.set(consumer, rocketmqPushConsumer); //Replace
when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
messagingAccessPoint.startup();
consumer.startup();
}
@Test
public void testConsumeMessage() {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() {
@Override
public void onMessage(final Message message, final ReceivedMessageContext context) {
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody()).isEqualTo(testBody);
context.ack();
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.Producer;
import io.openmessaging.exception.OMSRuntimeException;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ProducerImplTest {
private Producer producer;
@Mock
private DefaultMQProducer rocketmqProducer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
}
@Test
public void testSend_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
io.openmessaging.SendResult omsResult =
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
assertThat(omsResult.messageId()).isEqualTo("TestMsgID");
}
@Test
public void testSend_Not_OK() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setSendStatus(SendStatus.FLUSH_DISK_TIMEOUT);
when(rocketmqProducer.send(any(Message.class), anyLong())).thenReturn(sendResult);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
}
}
@Test
public void testSend_WithException() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
when(rocketmqProducer.send(any(Message.class), anyLong())).thenThrow(MQClientException.class);
try {
producer.send(producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'}));
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (Exception e) {
assertThat(e).hasMessageContaining("Send message to RocketMQ broker failed.");
}
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.producer;
import io.openmessaging.BytesMessage;
import io.openmessaging.MessageHeader;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.MessagingAccessPointFactory;
import io.openmessaging.SequenceProducer;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SequenceProducerImplTest {
private SequenceProducer producer;
@Mock
private DefaultMQProducer rocketmqProducer;
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
.getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
producer = messagingAccessPoint.createSequenceProducer();
Field field = AbstractOMSProducer.class.getDeclaredField("rocketmqProducer");
field.setAccessible(true);
field.set(producer, rocketmqProducer);
messagingAccessPoint.startup();
producer.startup();
}
@Test
public void testSend_WithCommit() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
SendResult sendResult = new SendResult();
sendResult.setMsgId("TestMsgID");
sendResult.setSendStatus(SendStatus.SEND_OK);
when(rocketmqProducer.send(ArgumentMatchers.<Message>anyList())).thenReturn(sendResult);
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.commit();
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo("TestMsgID");
}
@Test
public void testRollback() {
when(rocketmqProducer.getMaxMessageSize()).thenReturn(1024);
final BytesMessage message = producer.createBytesMessageToTopic("HELLO_TOPIC", new byte[] {'a'});
producer.send(message);
producer.rollback();
producer.commit(); //Commit nothing.
assertThat(message.headers().getString(MessageHeader.MESSAGE_ID)).isEqualTo(null);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.promise;
import io.openmessaging.Promise;
import io.openmessaging.PromiseListener;
import io.openmessaging.exception.OMSRuntimeException;
import org.junit.Before;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
public class DefaultPromiseTest {
private Promise<String> promise;
@Before
public void init() {
promise = new DefaultPromise<>();
}
@Test
public void testIsCancelled() throws Exception {
assertThat(promise.isCancelled()).isEqualTo(false);
}
@Test
public void testIsDone() throws Exception {
assertThat(promise.isDone()).isEqualTo(false);
promise.set("Done");
assertThat(promise.isDone()).isEqualTo(true);
}
@Test
public void testGet() throws Exception {
promise.set("Done");
assertThat(promise.get()).isEqualTo("Done");
}
@Test
public void testGet_WithTimeout() throws Exception {
try {
promise.get(100);
failBecauseExceptionWasNotThrown(OMSRuntimeException.class);
} catch (OMSRuntimeException e) {
assertThat(e).hasMessageContaining("Get request result is timeout or interrupted");
}
}
@Test
public void testAddListener() throws Exception {
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
assertThat(promise.get()).isEqualTo("Done");
}
@Override
public void operationFailed(final Promise<String> promise) {
}
});
promise.set("Done");
}
@Test
public void testAddListener_ListenerAfterSet() throws Exception {
promise.set("Done");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
assertThat(promise.get()).isEqualTo("Done");
}
@Override
public void operationFailed(final Promise<String> promise) {
}
});
}
@Test
public void testAddListener_WithException_ListenerAfterSet() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.setFailure(exception);
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
@Override
public void operationFailed(final Promise<String> promise) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
}
@Test
public void testAddListener_WithException() throws Exception {
final Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.addListener(new PromiseListener<String>() {
@Override
public void operationCompleted(final Promise<String> promise) {
}
@Override
public void operationFailed(final Promise<String> promise) {
assertThat(promise.getThrowable()).isEqualTo(exception);
}
});
promise.setFailure(exception);
}
@Test
public void getThrowable() throws Exception {
assertThat(promise.getThrowable()).isNull();
Throwable exception = new OMSRuntimeException("-1", "Test Error");
promise.setFailure(exception);
assertThat(promise.getThrowable()).isEqualTo(exception);
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.utils;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BeanUtilsTest {
private KeyValue properties = OMS.newKeyValue();
public static class CustomizedConfig extends ClientConfig {
final static String STRING_TEST = "string.test";
String stringTest = "foobar";
final static String DOUBLE_TEST = "double.test";
double doubleTest = 123.0;
final static String LONG_TEST = "long.test";
long longTest = 123L;
String getStringTest() {
return stringTest;
}
public void setStringTest(String stringTest) {
this.stringTest = stringTest;
}
double getDoubleTest() {
return doubleTest;
}
public void setDoubleTest(final double doubleTest) {
this.doubleTest = doubleTest;
}
long getLongTest() {
return longTest;
}
public void setLongTest(final long longTest) {
this.longTest = longTest;
}
CustomizedConfig() {
}
}
@Before
public void init() {
properties.put(NonStandardKeys.MAX_REDELIVERY_TIMES, 120);
properties.put(CustomizedConfig.STRING_TEST, "kaka");
properties.put(NonStandardKeys.CONSUMER_GROUP, "Default_Consumer_Group");
properties.put(NonStandardKeys.MESSAGE_CONSUME_TIMEOUT, 101);
properties.put(CustomizedConfig.LONG_TEST, 1234567890L);
properties.put(CustomizedConfig.DOUBLE_TEST, 10.234);
}
@Test
public void testPopulate() {
CustomizedConfig config = BeanUtils.populate(properties, CustomizedConfig.class);
//RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
Assert.assertEquals(config.getStringTest(), "kaka");
Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
Assert.assertEquals(config.getLongTest(), 1234567890L);
Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
}
@Test
public void testPopulate_ExistObj() {
CustomizedConfig config = new CustomizedConfig();
config.setOmsConsumerId("NewConsumerId");
Assert.assertEquals(config.getOmsConsumerId(), "NewConsumerId");
config = BeanUtils.populate(properties, config);
//RemotingConfig config = BeanUtils.populate(properties, RemotingConfig.class);
Assert.assertEquals(config.getRmqMaxRedeliveryTimes(), 120);
Assert.assertEquals(config.getStringTest(), "kaka");
Assert.assertEquals(config.getRmqConsumerGroup(), "Default_Consumer_Group");
Assert.assertEquals(config.getRmqMessageConsumeTimeout(), 101);
Assert.assertEquals(config.getLongTest(), 1234567890L);
Assert.assertEquals(config.getDoubleTest(), 10.234, 0.000001);
}
}
\ No newline at end of file
......@@ -181,6 +181,7 @@
<module>filter</module>
<module>test</module>
<module>distribution</module>
<module>openmessaging</module>
</modules>
<build>
......@@ -617,6 +618,11 @@
<artifactId>guava</artifactId>
<version>19.0</version>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>0.1.0-alpha</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册
反馈
建议
客服 返回
顶部