提交 5761b732 编写于 作者: A Alex Dima

Improve shape of RPC messages (#36972)

上级 cc549cee
......@@ -44,66 +44,88 @@ export class RPCProtocol {
private _receiveOneMessage(rawmsg: string): void {
if (this._isDisposed) {
console.warn('Received message after being shutdown: ', rawmsg);
return;
}
let parsedRawMsg = <RPCMessage>JSON.parse(rawmsg);
let msg: RPCMessage;
if (parsedRawMsg.revive) {
msg = marshalling.revive(parsedRawMsg, 0);
} else {
msg = parsedRawMsg;
let msg = <RPCMessage>JSON.parse(rawmsg);
switch (msg.type) {
case MessageType.Request:
this._receiveRequest(msg);
break;
case MessageType.FancyRequest:
this._receiveRequest(marshalling.revive(msg, 0));
break;
case MessageType.Cancel:
this._receiveCancel(msg);
break;
case MessageType.Reply:
this._receiveReply(msg);
break;
case MessageType.FancyReply:
this._receiveReply(marshalling.revive(msg, 0));
break;
case MessageType.ReplyErr:
this._receiveReplyErr(msg);
break;
}
}
if (isReplyMessage(msg)) {
if (!this._pendingRPCReplies.hasOwnProperty(msg.seq)) {
console.warn('Got reply to unknown seq');
return;
}
let reply = this._pendingRPCReplies[msg.seq];
delete this._pendingRPCReplies[msg.seq];
if (isReplyErrMessage(msg)) {
let err = msg.err;
if (msg.err.$isError) {
err = new Error();
err.name = msg.err.name;
err.message = msg.err.message;
err.stack = msg.err.stack;
}
reply.resolveErr(err);
return;
}
private _receiveRequest(msg: RequestMessage | FancyRequestMessage): void {
if (!this._bigHandler) {
throw new Error('got message before big handler attached!');
}
reply.resolveOk(msg.res);
return;
const callId = msg.id;
const proxyId = msg.proxyId;
this._invokedHandlers[callId] = this._invokeHandler(proxyId, msg.method, msg.args);
this._invokedHandlers[callId].then((r) => {
delete this._invokedHandlers[callId];
this._multiplexor.send(MessageFactory.replyOK(callId, r));
}, (err) => {
delete this._invokedHandlers[callId];
this._multiplexor.send(MessageFactory.replyErr(callId, err));
});
}
private _receiveCancel(msg: CancelMessage): void {
const callId = msg.id;
if (this._invokedHandlers[callId]) {
this._invokedHandlers[callId].cancel();
}
}
if (isCancelMessage(msg)) {
if (this._invokedHandlers[msg.cancel]) {
this._invokedHandlers[msg.cancel].cancel();
}
private _receiveReply(msg: ReplyMessage | FancyReplyMessage): void {
const callId = msg.id;
if (!this._pendingRPCReplies.hasOwnProperty(callId)) {
return;
}
const rpcId = msg.rpcId;
const pendingReply = this._pendingRPCReplies[callId];
delete this._pendingRPCReplies[callId];
if (!this._bigHandler) {
throw new Error('got message before big handler attached!');
}
pendingReply.resolveOk(msg.res);
}
const req = msg.req;
private _receiveReplyErr(msg: ReplyErrMessage): void {
const callId = msg.id;
if (!this._pendingRPCReplies.hasOwnProperty(callId)) {
return;
}
this._invokedHandlers[req] = this._invokeHandler(rpcId, msg.method, msg.args);
const pendingReply = this._pendingRPCReplies[callId];
delete this._pendingRPCReplies[callId];
this._invokedHandlers[req].then((r) => {
delete this._invokedHandlers[req];
this._multiplexor.send(MessageFactory.replyOK(req, r));
}, (err) => {
delete this._invokedHandlers[req];
this._multiplexor.send(MessageFactory.replyErr(req, err));
});
let err: Error = null;
if (msg.err && msg.err.$isError) {
err = new Error();
err.name = msg.err.name;
err.message = msg.err.message;
err.stack = msg.err.stack;
}
pendingReply.resolveErr(err);
}
private _invokeHandler(proxyId: string, methodName: string, args: any[]): TPromise<any> {
......@@ -177,59 +199,69 @@ class RPCMultiplexer {
class MessageFactory {
public static cancel(req: string): string {
return `{"revive":0,"cancel":"${req}"}`;
return `{"type":${MessageType.Cancel},"id":"${req}"}`;
}
public static request(req: string, rpcId: string, method: string, args: any[]): string {
return `{"revive":1,"req":"${req}","rpcId":"${rpcId}","method":"${method}","args":${marshalling.stringify(args)}}`;
return `{"type":${MessageType.FancyRequest},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${marshalling.stringify(args)}}`;
}
public static replyOK(req: string, res: any): string {
if (typeof res === 'undefined') {
return `{"revive":0,"seq":"${req}"}`;
return `{"type":${MessageType.Reply},"id":"${req}"}`;
}
return `{"revive":1,"seq":"${req}","res":${marshalling.stringify(res)}}`;
return `{"type":${MessageType.FancyReply},"id":"${req}","res":${marshalling.stringify(res)}}`;
}
public static replyErr(req: string, err: any): string {
if (typeof err === 'undefined') {
return `{"revive":0,"seq":"${req}","err":null}`;
if (err instanceof Error) {
return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${JSON.stringify(errors.transformErrorForSerialization(err))}}`;
}
return `{"revive":1,"seq":"${req}","err":${marshalling.stringify(errors.transformErrorForSerialization(err))}}`;
return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`;
}
}
interface RequestMessage {
revive: number;
req: string;
rpcId: string;
method: string;
args: any[];
export const enum MessageType {
Request = 1,
FancyRequest = 2,
Cancel = 3,
Reply = 4,
FancyReply = 5,
ReplyErr = 6
}
interface CancelMessage {
revive: number;
cancel: string;
class RequestMessage {
type: MessageType.Request;
id: string;
proxyId: string;
method: string;
args: any[];
}
function isCancelMessage(msg: RPCMessage): msg is CancelMessage {
return !!(<CancelMessage>msg).cancel;
class FancyRequestMessage {
type: MessageType.FancyRequest;
id: string;
proxyId: string;
method: string;
args: any[];
}
interface ReplyOKMessage {
revive: number;
seq: string;
res?: any;
class CancelMessage {
type: MessageType.Cancel;
id: string;
}
interface ReplyErrMessage {
revive: number;
seq: string;
err: any;
class ReplyMessage {
type: MessageType.Reply;
id: string;
res: any;
}
function isReplyMessage(msg: RPCMessage): msg is ReplyOKMessage | ReplyErrMessage {
return !!(<ReplyOKMessage | ReplyErrMessage>msg).seq;
class FancyReplyMessage {
type: MessageType.FancyReply;
id: string;
res: any;
}
function isReplyErrMessage(msg: ReplyOKMessage | ReplyErrMessage): msg is ReplyErrMessage {
return !!(<ReplyErrMessage>msg).err;
class ReplyErrMessage {
type: MessageType.ReplyErr;
id: string;
err: errors.SerializedError;
}
type RPCMessage = RequestMessage | CancelMessage | ReplyOKMessage | ReplyErrMessage;
type RPCMessage = RequestMessage | FancyRequestMessage | CancelMessage | ReplyMessage | FancyReplyMessage | ReplyErrMessage;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册