提交 09aeea34 编写于 作者: O odbozhou

Improve the oms-1.0.0 consumer implementation

上级 6f3d6097
......@@ -23,9 +23,9 @@ import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import java.util.HashSet;
import java.util.Set;
......@@ -57,65 +57,76 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
}
@Override public Consumer createConsumer() {
String consumerId = accessPointProperties.getString("CONSUMER_ID");
String[] nsStrArr = consumerId.split("_");
if (nsStrArr.length < 2) {
return new PushConsumerImpl(accessPointProperties);
}
if ("pull".equals(nsStrArr[0])) {
return new PullConsumerImpl(accessPointProperties);
}
return new PushConsumerImpl(accessPointProperties);
}
@Override
public ResourceManager resourceManager() {
return new ResourceManager() {
DefaultResourceManager resourceManager = new DefaultResourceManager();
return resourceManager;
}
@Override public MessageFactory messageFactory() {
return null;
}
@Override
public void createNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", nsName);
}
class DefaultResourceManager implements ResourceManager {
@Override
public void deleteNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", null);
}
@Override
public void createNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", nsName);
}
@Override
public void switchNamespace(String targetNamespace) {
accessPointProperties.put("CONSUMER_ID", targetNamespace);
}
@Override
public void deleteNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", null);
}
@Override
public Set<String> listNamespaces() {
return new HashSet<String>() {
{
add(accessPointProperties.getString("CONSUMER_ID"));
}
};
}
@Override
public void switchNamespace(String targetNamespace) {
accessPointProperties.put("CONSUMER_ID", targetNamespace);
}
@Override
public void createQueue(String queueName) {
@Override
public Set<String> listNamespaces() {
return new HashSet<String>() {
{
add(accessPointProperties.getString("CONSUMER_ID"));
}
};
}
}
@Override
public void createQueue(String queueName) {
@Override
public void deleteQueue(String queueName) {
}
}
@Override
public void deleteQueue(String queueName) {
@Override
public Set<String> listQueues(String nsName) {
return null;
}
}
@Override
public void filter(String queueName, String filterString) {
@Override
public Set<String> listQueues(String nsName) {
return null;
}
}
@Override
public void filter(String queueName, String filterString) {
@Override
public void routing(String sourceQueue, String targetQueue) {
}
}
};
}
@Override
public void routing(String sourceQueue, String targetQueue) {
@Override public MessageFactory messageFactory() {
return null;
}
}
};
}
......@@ -23,6 +23,19 @@ import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
......@@ -35,10 +48,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.InternalLogger;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
class LocalMessageCache implements ServiceLifecycle {
private final BlockingQueue<ConsumeRequest> consumeRequestCache;
private final Map<String, ConsumeRequest> consumedRequest;
......
......@@ -33,7 +33,17 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import org.apache.rocketmq.client.consumer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
......@@ -44,11 +54,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class PullConsumerImpl implements Consumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
......@@ -273,16 +278,16 @@ public class PullConsumerImpl implements Consumer {
try {
pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, 4 * 1024 * 1024, timeout);
} catch (MQClientException e) {
e.printStackTrace();
log.error("A error occurred when pull message.", e);
return null;
} catch (RemotingException e) {
e.printStackTrace();
log.error("A error occurred when pull message.", e);
return null;
} catch (InterruptedException e) {
e.printStackTrace();
log.error("A error occurred when pull message.", e);
return null;
} catch (MQBrokerException e) {
e.printStackTrace();
log.error("A error occurred when pull message.", e);
return null;
}
if (null == pullResult) {
......
......@@ -34,6 +34,14 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
......@@ -45,11 +53,6 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class PushConsumerImpl implements Consumer {
private final static InternalLogger log = ClientLogger.getLog();
......
......@@ -18,7 +18,7 @@ package io.openmessaging.rocketmq.domain;
import io.openmessaging.message.Header;
public class MessageHeader implements Header{
public class MessageHeader implements Header {
private String destination;
......
......@@ -28,5 +28,5 @@ public interface NonStandardKeys {
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID ="CONSUMER_ID";
String CONSUMER_ID = "CONSUMER_ID";
}
......@@ -16,12 +16,15 @@
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.*;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Before;
......@@ -30,11 +33,10 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest {
......@@ -49,11 +51,9 @@ public class PullConsumerImplTest {
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final KeyValue attributes = messagingAccessPoint.attributes();
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("TestGroup");
resourceManager.createNamespace("pull_TestGroup");
consumer = messagingAccessPoint.createConsumer();
// consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
consumer.bindQueue(queueName);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
......@@ -78,9 +78,7 @@ public class PullConsumerImplTest {
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
when(localMessageCache.poll()).thenReturn(consumedMsg);
doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class));
Message message = consumer.receive(3 * 1000);
assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
assertThat(message.getData()).isEqualTo(testBody);
......@@ -88,11 +86,7 @@ public class PullConsumerImplTest {
@Test
public void testPoll_WithTimeout() {
// There is a default timeout value, @see ClientConfig#omsOperationTimeout.
// Message message = consumer.receive(3 * 1000);
// assertThat(message).isNull();
// message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
// assertThat(message).isNull();
Message message = consumer.receive(3 * 1000);
assertThat(message).isNull();
}
}
\ No newline at end of file
......@@ -48,10 +48,8 @@ public class PushConsumerImplTest {
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
/* consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));*/
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("TestGroup");
resourceManager.createNamespace("push_TestGroup");
consumer = messagingAccessPoint.createConsumer();
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册