From 7e221e51e898a0d4f73d854409fa8b97f7778408 Mon Sep 17 00:00:00 2001 From: qqeasonchen Date: Wed, 11 Sep 2019 19:40:04 +0800 Subject: [PATCH] add example of request-response model --- .../rocketmq/client/utils/MessageUtil.java | 22 ++--- .../example/rpc/AsyncRequestProducer.java | 64 ++++++++++++++ .../rocketmq/example/rpc/RequestProducer.java | 54 ++++++++++++ .../example/rpc/ResponseConsumer.java | 83 +++++++++++++++++++ 4 files changed, 212 insertions(+), 11 deletions(-) create mode 100644 example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java create mode 100644 example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java create mode 100644 example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java index 62dd36dd..81da669f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java +++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java @@ -17,31 +17,31 @@ package org.apache.rocketmq.client.utils; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageConst; public class MessageUtil { - public static Message createReplyMessage(final Message requestMessage) { + public static Message createReplyMessage(final Message requestMessage) throws MQClientException { if (requestMessage != null) { Message replyMessage = new Message(); String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER); String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL); - if (cluster == null) { + if (cluster != null) { + String replyTopic = MixAll.getReplyTopic(cluster); + replyMessage.setTopic(replyTopic); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo); + MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl); + return replyMessage; } - String replyTopic = MixAll.getReplyTopic(cluster); - replyMessage.setTopic(replyTopic); - MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); - MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_REQUEST_UNIQ_ID, requestUniqId); - MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO, replyTo); - MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl); - - return replyMessage; } - return null; + throw new MQClientException(-1, "create reply message fail."); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java new file mode 100644 index 00000000..d2475c16 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.rpc; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.RequestCallback; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class AsyncRequestProducer { + private static final InternalLogger log = ClientLogger.getLog(); + + public static void main(String[] args) throws MQClientException, InterruptedException { + String producerGroup = "ProducerGroup-Name"; + String namesrvAddr = "10.255.2.37:9876;10.255.2.37:9875"; + String topic = "RequestTopic"; + long ttl = 3000; + + DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(namesrvAddr); + producer.start(); + + try { + Message msg = new Message(topic, + "", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + + long begin = System.currentTimeMillis(); + producer.request(msg, new RequestCallback() { + @Override + public void onSuccess(Message message) { + long cost = System.currentTimeMillis() - begin; + System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message); + } + + @Override + public void onException(Throwable e) { + System.err.printf("request to <%s> fail.", topic); + } + }, ttl); + } catch (Exception e) { + log.warn("", e); + } +// producer.shutdown(); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java new file mode 100644 index 00000000..73397738 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.rpc; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class RequestProducer { + private static final InternalLogger log = ClientLogger.getLog(); + + public static void main(String[] args) throws MQClientException, InterruptedException { + String producerGroup = "ProducerGroup-Name"; + String namesrvAddr = "10.255.2.37:9876;10.255.2.37:9875"; + String topic = "RequestTopic"; + long ttl = 3000; + + DefaultMQProducer producer = new DefaultMQProducer(producerGroup); + producer.setNamesrvAddr(namesrvAddr); + producer.start(); + + try { + Message msg = new Message(topic, + "", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + + long begin = System.currentTimeMillis(); + Message retMsg = producer.request(msg, ttl); + long cost = System.currentTimeMillis() - begin; + System.err.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg); + } catch (Exception e) { + log.warn("", e); + } + producer.shutdown(); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java new file mode 100644 index 00000000..ff3652fe --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.rpc; + +import java.util.List; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.utils.MessageUtil; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.remoting.exception.RemotingException; + +public class ResponseConsumer { + private static final InternalLogger log = ClientLogger.getLog(); + + public static void main(String[] args) throws InterruptedException, MQClientException { + String consumerGroup = "ConsumeGroup-Name"; + String namesrvAddr = "10.255.2.37:9873"; + String topic = "RequestTopic"; + + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setNamesrvAddr(namesrvAddr); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); + + consumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); + for (MessageExt msg : msgs) { + try { + log.info("handle message: {} body={}", msg, new String(msg.getBody())); + String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); + + //You must use MessageUtil to creage reply message, otherwise reply message maybe wrong. + Message replyMessage = MessageUtil.createReplyMessage(msg); + replyMessage.setBody("reply message contents.".getBytes()); + + //maybe you should create a producer to send reply message. + SendResult sendResult = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, 3000); + System.out.printf("reply msg %s to %s , %s %n", replyMessage.toString(), replyTo, sendResult.toString()); + } catch (MQClientException e) { + e.printStackTrace(); + } catch (RemotingException e) { + e.printStackTrace(); + } catch (MQBrokerException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + + consumer.subscribe(topic, "*"); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } +} -- GitLab