diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index 0655fa5ec19ac57d666f9ad0253aac3077dfe83b..b33c892c7b7bdb552ccc5ec28577b56eb3238482 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -19,13 +19,16 @@ package io.openmessaging.rocketmq; import io.openmessaging.KeyValue; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.consumer.Consumer; -import io.openmessaging.exception.OMSUnsupportException; import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; +import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; +import java.util.HashSet; +import java.util.Set; + public class MessagingAccessPointImpl implements MessagingAccessPoint { private final KeyValue accessPointProperties; @@ -54,12 +57,62 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override public Consumer createConsumer() { - return null; + return new PushConsumerImpl(accessPointProperties); } @Override public ResourceManager resourceManager() { - throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version."); + return new ResourceManager() { + + @Override + public void createNamespace(String nsName) { + accessPointProperties.put("CONSUMER_ID", nsName); + } + + @Override + public void deleteNamespace(String nsName) { + accessPointProperties.put("CONSUMER_ID", null); + } + + @Override + public void switchNamespace(String targetNamespace) { + accessPointProperties.put("CONSUMER_ID", targetNamespace); + } + + @Override + public Set listNamespaces() { + return new HashSet() { + { + add(accessPointProperties.getString("CONSUMER_ID")); + } + }; + } + + @Override + public void createQueue(String queueName) { + + } + + @Override + public void deleteQueue(String queueName) { + + } + + @Override + public Set listQueues(String nsName) { + return null; + } + + @Override + public void filter(String queueName, String filterString) { + + } + + @Override + public void routing(String sourceQueue, String targetQueue) { + + } + }; } @Override public MessageFactory messageFactory() { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 9ad9ac8fde5e267388488b19c966117ddfc54ff6..045f264e4343911e5556f42795f59f6594466c54 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -94,9 +94,6 @@ class LocalMessageCache implements ServiceLifecycle { MessageExt poll(final KeyValue properties) { int currentPollTimeout = clientConfig.getOperationTimeout(); -/* if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { - currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); - }*/ if (properties.containsKey("TIMEOUT")) { currentPollTimeout = properties.getInt("TIMEOUT"); } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 938ab4c23b64d65ecace8db4c1743bf1f3983c3c..7e6eaf4e9dc7d9d0f0545aafd745d0654daa774e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -88,7 +88,6 @@ public class PullConsumerImpl implements Consumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); -// properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); properties.put("TIMEOUT", consumerId); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 557e50c59a9beadbb19183241ae14f5d535ca011..1efa5b0475b78e424d8c43800ad33bd4a09ca6bb 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -88,7 +88,6 @@ public class PushConsumerImpl implements Consumer { String consumerId = OMSUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); -// properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); properties.put("CONSUMER_ID", consumerId); this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS); @@ -120,12 +119,18 @@ public class PushConsumerImpl implements Consumer { @Override public void bindQueue(String queueName) { - throw new UnsupportedOperationException(); + try { + rocketmqPushConsumer.subscribe(queueName, "*"); + } catch (MQClientException e) { + throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName)); + } } @Override public void bindQueue(List queueNames) { - throw new UnsupportedOperationException(); + for (String queueName : queueNames) { + bindQueue(queueName); + } } @Override diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 9f37b460e72d69088015c334f326e3947470dc36..7fcd86c97207dfa43f200923fe1e3f4f1cdec016 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -18,6 +18,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.*; import io.openmessaging.consumer.Consumer; +import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.Message; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -32,6 +34,7 @@ import java.lang.reflect.Field; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.assertj.core.api.Assertions.assertThat; @RunWith(MockitoJUnitRunner.class) public class PullConsumerImplTest { @@ -46,9 +49,12 @@ public class PullConsumerImplTest { public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - + final KeyValue attributes = messagingAccessPoint.attributes(); + final ResourceManager resourceManager = messagingAccessPoint.resourceManager(); + resourceManager.createNamespace("TestGroup"); + consumer = messagingAccessPoint.createConsumer(); // consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); -// consumer.attachQueue(queueName); + consumer.bindQueue(queueName); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); field.setAccessible(true); @@ -61,9 +67,7 @@ public class PullConsumerImplTest { field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); field.setAccessible(true); field.set(consumer, localMessageCache); - -// messagingAccessPoint.startup(); -// consumer.startup(); + consumer.start(); } @Test @@ -77,17 +81,17 @@ public class PullConsumerImplTest { when(localMessageCache.poll()).thenReturn(consumedMsg); -// Message message = consumer.receive(); -// assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); -// assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); + Message message = consumer.receive(3 * 1000); + assertThat(message.header().getMessageId()).isEqualTo("NewMsgId"); + assertThat(message.getData()).isEqualTo(testBody); } @Test public void testPoll_WithTimeout() { - //There is a default timeout value, @see ClientConfig#omsOperationTimeout. -// Message message = consumer.receive(); +// There is a default timeout value, @see ClientConfig#omsOperationTimeout. +// Message message = consumer.receive(3 * 1000); // assertThat(message).isNull(); -// + // message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100)); // assertThat(message).isNull(); } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 3f60e6c14789a2614495da2fe53461f1d00c7fb0..4a2a84a728a474da5150088a38bb00fe0ab295cd 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -19,6 +19,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.*; import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; +import io.openmessaging.manager.ResourceManager; +import io.openmessaging.message.Message; import io.openmessaging.rocketmq.domain.NonStandardKeys; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; @@ -33,6 +35,7 @@ import java.lang.reflect.Field; import java.util.Collections; import static org.mockito.Mockito.when; +import static org.assertj.core.api.Assertions.assertThat; @RunWith(MockitoJUnitRunner.class) public class PushConsumerImplTest { @@ -47,6 +50,9 @@ public class PushConsumerImplTest { .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); /* consumer = messagingAccessPoint.createPushConsumer( OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));*/ + final ResourceManager resourceManager = messagingAccessPoint.resourceManager(); + resourceManager.createNamespace("TestGroup"); + consumer = messagingAccessPoint.createConsumer(); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); field.setAccessible(true); @@ -54,8 +60,7 @@ public class PushConsumerImplTest { field.set(consumer, rocketmqPushConsumer); //Replace when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); -// messagingAccessPoint.startup(); -// consumer.startup(); + consumer.start(); } @Test @@ -67,14 +72,14 @@ public class PushConsumerImplTest { consumedMsg.setBody(testBody); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.setTopic("HELLO_QUEUE"); -/* consumer.attachQueue("HELLO_QUEUE", new MessageListener() { + consumer.bindQueue("HELLO_QUEUE", new MessageListener() { @Override public void onReceived(Message message, Context context) { - assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); - assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); + assertThat(message.header().getMessageId()).isEqualTo("NewMsgId"); + assertThat(message.getData()).isEqualTo(testBody); context.ack(); } - });*/ + }); ((MessageListenerConcurrently) rocketmqPushConsumer .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); }