提交 7e221e51 编写于 作者: Q qqeasonchen

add example of request-response model

上级 1d3a5083
...@@ -17,22 +17,21 @@ ...@@ -17,22 +17,21 @@
package org.apache.rocketmq.client.utils; package org.apache.rocketmq.client.utils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor; import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
public class MessageUtil { public class MessageUtil {
public static Message createReplyMessage(final Message requestMessage) { public static Message createReplyMessage(final Message requestMessage) throws MQClientException {
if (requestMessage != null) { if (requestMessage != null) {
Message replyMessage = new Message(); Message replyMessage = new Message();
String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER); String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO); String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID); String requestUniqId = requestMessage.getProperty(MessageConst.PROPERTY_REQUEST_UNIQ_ID);
String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL); String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
if (cluster == null) { if (cluster != null) {
}
String replyTopic = MixAll.getReplyTopic(cluster); String replyTopic = MixAll.getReplyTopic(cluster);
replyMessage.setTopic(replyTopic); replyMessage.setTopic(replyTopic);
MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG); MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
...@@ -42,6 +41,7 @@ public class MessageUtil { ...@@ -42,6 +41,7 @@ public class MessageUtil {
return replyMessage; return replyMessage;
} }
return null; }
throw new MQClientException(-1, "create reply message fail.");
} }
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.rpc;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.RequestCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncRequestProducer {
private static final InternalLogger log = ClientLogger.getLog();
public static void main(String[] args) throws MQClientException, InterruptedException {
String producerGroup = "ProducerGroup-Name";
String namesrvAddr = "10.255.2.37:9876;10.255.2.37:9875";
String topic = "RequestTopic";
long ttl = 3000;
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.start();
try {
Message msg = new Message(topic,
"",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
long begin = System.currentTimeMillis();
producer.request(msg, new RequestCallback() {
@Override
public void onSuccess(Message message) {
long cost = System.currentTimeMillis() - begin;
System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);
}
@Override
public void onException(Throwable e) {
System.err.printf("request to <%s> fail.", topic);
}
}, ttl);
} catch (Exception e) {
log.warn("", e);
}
// producer.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.rpc;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RequestProducer {
private static final InternalLogger log = ClientLogger.getLog();
public static void main(String[] args) throws MQClientException, InterruptedException {
String producerGroup = "ProducerGroup-Name";
String namesrvAddr = "10.255.2.37:9876;10.255.2.37:9875";
String topic = "RequestTopic";
long ttl = 3000;
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.start();
try {
Message msg = new Message(topic,
"",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
long begin = System.currentTimeMillis();
Message retMsg = producer.request(msg, ttl);
long cost = System.currentTimeMillis() - begin;
System.err.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
} catch (Exception e) {
log.warn("", e);
}
producer.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.rpc;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class ResponseConsumer {
private static final InternalLogger log = ClientLogger.getLog();
public static void main(String[] args) throws InterruptedException, MQClientException {
String consumerGroup = "ConsumeGroup-Name";
String namesrvAddr = "10.255.2.37:9873";
String topic = "RequestTopic";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
for (MessageExt msg : msgs) {
try {
log.info("handle message: {} body={}", msg, new String(msg.getBody()));
String replyTo = msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO);
//You must use MessageUtil to creage reply message, otherwise reply message maybe wrong.
Message replyMessage = MessageUtil.createReplyMessage(msg);
replyMessage.setBody("reply message contents.".getBytes());
//maybe you should create a producer to send reply message.
SendResult sendResult = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(replyMessage, 3000);
System.out.printf("reply msg %s to %s , %s %n", replyMessage.toString(), replyTo, sendResult.toString());
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.subscribe(topic, "*");
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册