rpc_request.md 6.8 KB
Newer Older
K
keranbingaa 已提交
1 2 3
# “Request-Reply”特性
---

K
keranbingaa 已提交
4
## 1 使用场景
K
keranbingaa 已提交
5 6 7 8
随着服务规模的扩大,单机服务无法满足性能和容量的要求,此时需要将服务拆分为更小粒度的服务或者部署多个服务实例构成集群来提供服务。在分布式场景下,RPC是最常用的联机调用的方式。

在构建分布式应用时,有些领域,例如金融服务领域,常常使用消息队列来构建服务总线,实现联机调用的目的。消息队列的主要场景是解耦、削峰填谷,在联机调用的场景下,需要将服务的调用抽象成基于消息的交互,并增强同步调用的这种交互逻辑。为了更好地支持消息队列在联机调用场景下的应用,rocketmq-4.7.0推出了“Request-Reply”特性来支持RPC调用。

K
keranbingaa 已提交
9
## 2 设计思路
K
keranbingaa 已提交
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
在rocketmq中,整个同步调用主要包括两个过程:

(1)请求方生成消息,发送给响应方,并等待响应方回包;

(2)响应方收到请求消息后,消费这条消息,并发出一条响应消息给请求方。

整个过程实质上是两个消息收发过程的组合。所以这里最关键的问题是如何将异步的消息收发过程构建成一个同步的过程。其中主要有两个问题需要解决:

* 请求方如何同步等待回包
这个问题的解决方案中,一个关键的数据结构是RequestResponseFuture。

```
public class RequestResponseFuture {
    private final String correlationId;
    private final RequestCallback requestCallback;
    private final long beginTimestamp = System.currentTimeMillis();
    private final Message requestMsg = null;
    private long timeoutMillis;
    private CountDownLatch countDownLatch = new CountDownLatch(1);
    private volatile Message responseMsg = null;
    private volatile boolean sendRequestOk = true;
    private volatile Throwable cause = null;
}
```
RequestResponseFuture中,利用correlationId来标识一个请求。如下图所示,Producer发送request时创建一个RequestResponseFuture,以correlationId为key,RequestResponseFuture为value存入map,同时请求中带上RequestResponseFuture中的correlationId,收到回包后根据correlationId拿到对应的RequestResponseFuture,并设置回包内容。
![](image/producer_send_request.png)

* consumer消费消息后,如何准确回包
K
keranbingaa 已提交
38

K
keranbingaa 已提交
39 40
(1)producer在发送消息的时候,会给每条消息生成唯一的标识符,同时还带上了producer的clientId。当consumer收到并消费消息后,从消息中取出消息的标识符correlationId和producer的标识符clientId,放入响应消息,用来确定此响应消息是哪条请求消息的回包,以及此响应消息应该发给哪个producer。同时响应消息中设置了消息的类型以及响应消息的topic,然后consumer将消息发给broker,如下图所示。
![](image/consumer_reply.png)
K
keranbingaa 已提交
41

K
keranbingaa 已提交
42 43 44 45 46 47
(2)broker收到响应消息后,需要将消息发回给指定的producer。Broker如何知道发回给哪个producer?因为消息中包含了producer的标识符clientId,在ProducerManager中,维护了标识符和channel信息的对应关系,通过这个对应关系,就能把回包发给对应的producer。

响应消息发送和一般的消息发送流程区别在于,响应消息不需要producer拉取,而是由broker直接推给producer。同时选择broker的策略也有变化:请求消息从哪个broker发过来,响应消息也发到对应的broker上。

Producer收到响应消息后,根据消息中的唯一标识符,从RequestResponseFuture的map中找到对应的RequestResponseFuture结构,设置响应消息,同时计数器减一,解除等待状态,使请求方收到响应消息。

K
keranbingaa 已提交
48
## 3 使用方法
K
keranbingaa 已提交
49 50 51

同步调用的示例在example文件夹的rpc目录下。

K
keranbingaa 已提交
52
### 3.1 Producer
K
keranbingaa 已提交
53 54 55 56 57 58 59 60 61 62 63 64
```
Message msg = new Message(topic,
                "",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

            long begin = System.currentTimeMillis();
            Message retMsg = producer.request(msg, ttl);
            long cost = System.currentTimeMillis() - begin;
            System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
```
调用接口替换为request即可。

K
keranbingaa 已提交
65
### 3.2 Consumer
K
keranbingaa 已提交
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
需要启动一个producer,同时在覆写consumeMessage方法的时候,自定义响应消息并发送。

```
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                for (MessageExt msg : msgs) {
                    try {
                        System.out.printf("handle message: %s", msg.toString());
                        String replyTo = MessageUtil.getReplyToClient(msg);
                        byte[] replyContent = "reply message contents.".getBytes();
                        // create reply message with given util, do not create reply message by yourself
                        Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);

                        // send reply message with producer
                        SendResult replyResult = replyProducer.send(replyMessage, 3000);
                        System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
```

K
keranbingaa 已提交
91
## 4 接口参数
K
keranbingaa 已提交
92
* public Message request(Message msg,long timeout)
K
keranbingaa 已提交
93

K
keranbingaa 已提交
94
msg:待发送的消息
K
keranbingaa 已提交
95

K
keranbingaa 已提交
96 97
timeout:同步调用超时时间
* public void request(Message msg, final RequestCallback requestCallback, long timeout)
K
keranbingaa 已提交
98

K
keranbingaa 已提交
99
msg:待发送的消息
K
keranbingaa 已提交
100

K
keranbingaa 已提交
101
requestCallback:回调函数
K
keranbingaa 已提交
102

K
keranbingaa 已提交
103 104
timeout:同步调用超时时间
* public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,final long timeout)
K
keranbingaa 已提交
105

K
keranbingaa 已提交
106
msg:待发送的消息
K
keranbingaa 已提交
107

K
keranbingaa 已提交
108
selector:消息队列选择器
K
keranbingaa 已提交
109

K
keranbingaa 已提交
110
arg:消息队列选择器需要的参数
K
keranbingaa 已提交
111

K
keranbingaa 已提交
112 113
timeout:同步调用超时时间
* public void request(final Message msg, final MessageQueueSelector selector, final Object arg,final RequestCallback requestCallback, final long timeout)
K
keranbingaa 已提交
114

K
keranbingaa 已提交
115
msg:待发送的消息
K
keranbingaa 已提交
116

K
keranbingaa 已提交
117
selector:消息队列选择器
K
keranbingaa 已提交
118

K
keranbingaa 已提交
119
arg:消息队列选择器需要的参数
K
keranbingaa 已提交
120

K
keranbingaa 已提交
121
requestCallback:回调函数
K
keranbingaa 已提交
122

K
keranbingaa 已提交
123 124
timeout:同步调用超时时间
*	public Message request(final Message msg, final MessageQueue mq, final long timeout)
K
keranbingaa 已提交
125

K
keranbingaa 已提交
126
msg:待发送的消息
K
keranbingaa 已提交
127

K
keranbingaa 已提交
128
mq:目标消息队列
K
keranbingaa 已提交
129

K
keranbingaa 已提交
130 131
timeout:同步调用超时时间
*	public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
K
keranbingaa 已提交
132

K
keranbingaa 已提交
133
msg:待发送的消息
K
keranbingaa 已提交
134

K
keranbingaa 已提交
135
mq:目标消息队列
K
keranbingaa 已提交
136

K
keranbingaa 已提交
137
requestCallback:回调函数
K
keranbingaa 已提交
138

K
keranbingaa 已提交
139
timeout:同步调用超时时间