提交 6f3d6097 编写于 作者: Z zhoubo0404

Improve the oms 1.0.0 consumer implementation

上级 611e1b9f
...@@ -19,13 +19,16 @@ package io.openmessaging.rocketmq; ...@@ -19,13 +19,16 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.Consumer;
import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.manager.ResourceManager; import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory; import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer; import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener; import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl;
import java.util.HashSet;
import java.util.Set;
public class MessagingAccessPointImpl implements MessagingAccessPoint { public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties; private final KeyValue accessPointProperties;
...@@ -54,12 +57,62 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { ...@@ -54,12 +57,62 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
} }
@Override public Consumer createConsumer() { @Override public Consumer createConsumer() {
return null; return new PushConsumerImpl(accessPointProperties);
} }
@Override @Override
public ResourceManager resourceManager() { public ResourceManager resourceManager() {
throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version."); return new ResourceManager() {
@Override
public void createNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", nsName);
}
@Override
public void deleteNamespace(String nsName) {
accessPointProperties.put("CONSUMER_ID", null);
}
@Override
public void switchNamespace(String targetNamespace) {
accessPointProperties.put("CONSUMER_ID", targetNamespace);
}
@Override
public Set<String> listNamespaces() {
return new HashSet<String>() {
{
add(accessPointProperties.getString("CONSUMER_ID"));
}
};
}
@Override
public void createQueue(String queueName) {
}
@Override
public void deleteQueue(String queueName) {
}
@Override
public Set<String> listQueues(String nsName) {
return null;
}
@Override
public void filter(String queueName, String filterString) {
}
@Override
public void routing(String sourceQueue, String targetQueue) {
}
};
} }
@Override public MessageFactory messageFactory() { @Override public MessageFactory messageFactory() {
......
...@@ -94,9 +94,6 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -94,9 +94,6 @@ class LocalMessageCache implements ServiceLifecycle {
MessageExt poll(final KeyValue properties) { MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOperationTimeout(); int currentPollTimeout = clientConfig.getOperationTimeout();
/* if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT);
}*/
if (properties.containsKey("TIMEOUT")) { if (properties.containsKey("TIMEOUT")) {
currentPollTimeout = properties.getInt("TIMEOUT"); currentPollTimeout = properties.getInt("TIMEOUT");
} }
......
...@@ -88,7 +88,6 @@ public class PullConsumerImpl implements Consumer { ...@@ -88,7 +88,6 @@ public class PullConsumerImpl implements Consumer {
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId); this.rocketmqPullConsumer.setInstanceName(consumerId);
// properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
properties.put("TIMEOUT", consumerId); properties.put("TIMEOUT", consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
......
...@@ -88,7 +88,6 @@ public class PushConsumerImpl implements Consumer { ...@@ -88,7 +88,6 @@ public class PushConsumerImpl implements Consumer {
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId); this.rocketmqPushConsumer.setInstanceName(consumerId);
// properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId);
properties.put("CONSUMER_ID", consumerId); properties.put("CONSUMER_ID", consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS); this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
...@@ -120,12 +119,18 @@ public class PushConsumerImpl implements Consumer { ...@@ -120,12 +119,18 @@ public class PushConsumerImpl implements Consumer {
@Override @Override
public void bindQueue(String queueName) { public void bindQueue(String queueName) {
throw new UnsupportedOperationException(); try {
rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
} }
@Override @Override
public void bindQueue(List<String> queueNames) { public void bindQueue(List<String> queueNames) {
throw new UnsupportedOperationException(); for (String queueName : queueNames) {
bindQueue(queueName);
}
} }
@Override @Override
......
...@@ -18,6 +18,8 @@ package io.openmessaging.rocketmq.consumer; ...@@ -18,6 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.*; import io.openmessaging.*;
import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.Consumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
...@@ -32,6 +34,7 @@ import java.lang.reflect.Field; ...@@ -32,6 +34,7 @@ import java.lang.reflect.Field;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest { public class PullConsumerImplTest {
...@@ -46,9 +49,12 @@ public class PullConsumerImplTest { ...@@ -46,9 +49,12 @@ public class PullConsumerImplTest {
public void init() throws NoSuchFieldException, IllegalAccessException { public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final KeyValue attributes = messagingAccessPoint.attributes();
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("TestGroup");
consumer = messagingAccessPoint.createConsumer();
// consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); // consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));
// consumer.attachQueue(queueName); consumer.bindQueue(queueName);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true); field.setAccessible(true);
...@@ -61,9 +67,7 @@ public class PullConsumerImplTest { ...@@ -61,9 +67,7 @@ public class PullConsumerImplTest {
field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
field.setAccessible(true); field.setAccessible(true);
field.set(consumer, localMessageCache); field.set(consumer, localMessageCache);
consumer.start();
// messagingAccessPoint.startup();
// consumer.startup();
} }
@Test @Test
...@@ -77,17 +81,17 @@ public class PullConsumerImplTest { ...@@ -77,17 +81,17 @@ public class PullConsumerImplTest {
when(localMessageCache.poll()).thenReturn(consumedMsg); when(localMessageCache.poll()).thenReturn(consumedMsg);
// Message message = consumer.receive(); Message message = consumer.receive(3 * 1000);
// assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
// assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); assertThat(message.getData()).isEqualTo(testBody);
} }
@Test @Test
public void testPoll_WithTimeout() { public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout. // There is a default timeout value, @see ClientConfig#omsOperationTimeout.
// Message message = consumer.receive(); // Message message = consumer.receive(3 * 1000);
// assertThat(message).isNull(); // assertThat(message).isNull();
//
// message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100)); // message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
// assertThat(message).isNull(); // assertThat(message).isNull();
} }
......
...@@ -19,6 +19,8 @@ package io.openmessaging.rocketmq.consumer; ...@@ -19,6 +19,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.*; import io.openmessaging.*;
import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageListener;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
...@@ -33,6 +35,7 @@ import java.lang.reflect.Field; ...@@ -33,6 +35,7 @@ import java.lang.reflect.Field;
import java.util.Collections; import java.util.Collections;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest { public class PushConsumerImplTest {
...@@ -47,6 +50,9 @@ public class PushConsumerImplTest { ...@@ -47,6 +50,9 @@ public class PushConsumerImplTest {
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
/* consumer = messagingAccessPoint.createPushConsumer( /* consumer = messagingAccessPoint.createPushConsumer(
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));*/ OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));*/
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace("TestGroup");
consumer = messagingAccessPoint.createConsumer();
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true); field.setAccessible(true);
...@@ -54,8 +60,7 @@ public class PushConsumerImplTest { ...@@ -54,8 +60,7 @@ public class PushConsumerImplTest {
field.set(consumer, rocketmqPushConsumer); //Replace field.set(consumer, rocketmqPushConsumer); //Replace
when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
// messagingAccessPoint.startup(); consumer.start();
// consumer.startup();
} }
@Test @Test
...@@ -67,14 +72,14 @@ public class PushConsumerImplTest { ...@@ -67,14 +72,14 @@ public class PushConsumerImplTest {
consumedMsg.setBody(testBody); consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE"); consumedMsg.setTopic("HELLO_QUEUE");
/* consumer.attachQueue("HELLO_QUEUE", new MessageListener() { consumer.bindQueue("HELLO_QUEUE", new MessageListener() {
@Override @Override
public void onReceived(Message message, Context context) { public void onReceived(Message message, Context context) {
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); assertThat(message.getData()).isEqualTo(testBody);
context.ack(); context.ack();
} }
});*/ });
((MessageListenerConcurrently) rocketmqPushConsumer ((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册