未验证 提交 acfadb48 编写于 作者: A Alexandru Dima 提交者: GitHub

Merge pull request #57144 from Microsoft/alex/binary-rpc

Convert renderer <-> ext host communication to a binary format
......@@ -164,7 +164,7 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
}
send(buffer: Buffer): void {
const header = Buffer.alloc(Protocol._headerLen);
const header = Buffer.allocUnsafe(Protocol._headerLen);
header.writeUInt32BE(buffer.length, 0, true);
this._writeSoon(header, buffer);
}
......
......@@ -99,15 +99,13 @@ class RemoteFileSystemProvider implements IFileSystemProvider {
}
readFile(resource: URI): TPromise<Uint8Array> {
return this._proxy.$readFile(this._handle, resource).then(encoded => {
return Buffer.from(encoded, 'base64');
});
return this._proxy.$readFile(this._handle, resource);
}
writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): TPromise<void> {
let encoded = Buffer.isBuffer(content)
? content.toString('base64')
: Buffer.from(content.buffer, content.byteOffset, content.byteLength).toString('base64');
? content
: Buffer.from(content.buffer, content.byteOffset, content.byteLength);
return this._proxy.$writeFile(this._handle, resource, encoded, opts);
}
......
......@@ -680,8 +680,8 @@ export interface ExtHostWorkspaceShape {
export interface ExtHostFileSystemShape {
$stat(handle: number, resource: UriComponents): TPromise<IStat>;
$readdir(handle: number, resource: UriComponents): TPromise<[string, FileType][]>;
$readFile(handle: number, resource: UriComponents): TPromise<string>;
$writeFile(handle: number, resource: UriComponents, base64Encoded: string, opts: FileWriteOptions): TPromise<void>;
$readFile(handle: number, resource: UriComponents): TPromise<Buffer>;
$writeFile(handle: number, resource: UriComponents, content: Buffer, opts: FileWriteOptions): TPromise<void>;
$rename(handle: number, resource: UriComponents, target: UriComponents, opts: FileOverwriteOptions): TPromise<void>;
$copy(handle: number, resource: UriComponents, target: UriComponents, opts: FileOverwriteOptions): TPromise<void>;
$mkdir(handle: number, resource: UriComponents): TPromise<void>;
......
......@@ -159,16 +159,16 @@ export class ExtHostFileSystem implements ExtHostFileSystemShape {
return asWinJsPromise(() => this._fsProvider.get(handle).readDirectory(URI.revive(resource)));
}
$readFile(handle: number, resource: UriComponents): TPromise<string> {
$readFile(handle: number, resource: UriComponents): TPromise<Buffer> {
return asWinJsPromise(() => {
return this._fsProvider.get(handle).readFile(URI.revive(resource));
}).then(data => {
return Buffer.isBuffer(data) ? data.toString('base64') : Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString('base64');
return Buffer.isBuffer(data) ? data : Buffer.from(data.buffer, data.byteOffset, data.byteLength);
});
}
$writeFile(handle: number, resource: UriComponents, base64Content: string, opts: files.FileWriteOptions): TPromise<void> {
return asWinJsPromise(() => this._fsProvider.get(handle).writeFile(URI.revive(resource), Buffer.from(base64Content, 'base64'), opts));
$writeFile(handle: number, resource: UriComponents, content: Buffer, opts: files.FileWriteOptions): TPromise<void> {
return asWinJsPromise(() => this._fsProvider.get(handle).writeFile(URI.revive(resource), content, opts));
}
$delete(handle: number, resource: UriComponents, opts: files.FileDeleteOptions): TPromise<void> {
......
......@@ -39,7 +39,7 @@ import { Event, Emitter } from 'vs/base/common/event';
import { ExtensionHostProfiler } from 'vs/workbench/services/extensions/electron-browser/extensionHostProfiler';
import product from 'vs/platform/node/product';
import * as strings from 'vs/base/common/strings';
import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol';
import { RPCProtocol, IRPCProtocolLogger } from 'vs/workbench/services/extensions/node/rpcProtocol';
import { INotificationService, Severity } from 'vs/platform/notification/common/notification';
import { isFalsyOrEmpty } from 'vs/base/common/arrays';
import { Schemas } from 'vs/base/common/network';
......@@ -180,11 +180,12 @@ export class ExtensionHostProcessManager extends Disposable {
private _createExtensionHostCustomers(protocol: IMessagePassingProtocol): ExtHostExtensionServiceShape {
let logger: IRPCProtocolLogger = null;
if (logExtensionHostCommunication || this._environmentService.logExtensionHostCommunication) {
protocol = asLoggingProtocol(protocol);
logger = new RPCLogger();
}
this._extensionHostProcessRPCProtocol = new RPCProtocol(protocol);
this._extensionHostProcessRPCProtocol = new RPCProtocol(protocol, logger);
const extHostContext: IExtHostContext = {
getProxy: <T>(identifier: ProxyIdentifier<T>): T => this._extensionHostProcessRPCProtocol.getProxy(identifier),
set: <T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R => this._extensionHostProcessRPCProtocol.set(identifier, instance),
......@@ -954,20 +955,20 @@ export class ExtensionService extends Disposable implements IExtensionService {
}
}
function asLoggingProtocol(protocol: IMessagePassingProtocol): IMessagePassingProtocol {
class RPCLogger implements IRPCProtocolLogger {
protocol.onMessage(msg => {
console.log('%c[Extension \u2192 Window]%c[len: ' + strings.pad(msg.length, 5, ' ') + ']', 'color: darkgreen', 'color: grey', msg);
});
private _totalIncoming = 0;
private _totalOutgoing = 0;
return {
onMessage: protocol.onMessage,
logIncoming(msgLength: number, str: string, data?: any): void {
this._totalIncoming += msgLength;
console.log(`%c[Extension \u2192 Window]%c[${strings.pad(this._totalIncoming, 7, ' ')}]%c[len: ${strings.pad(msgLength, 5, ' ')}]`, 'color: darkgreen', 'color: grey', 'color: grey', str, data);
}
send(msg: any) {
protocol.send(msg);
console.log('%c[Window \u2192 Extension]%c[len: ' + strings.pad(msg.length, 5, ' ') + ']', 'color: darkgreen', 'color: grey', msg);
}
};
logOutgoing(msgLength: number, str: string, data?: any): void {
this._totalOutgoing += msgLength;
console.log(`%c[Window \u2192 Extension]%c[${strings.pad(this._totalOutgoing, 7, ' ')}]%c[len: ${strings.pad(msgLength, 5, ' ')}]`, 'color: darkgreen', 'color: grey', 'color: grey', str, data);
}
}
interface IExtensionCacheData {
......
......@@ -85,8 +85,15 @@ function transformIncomingURIs(obj: any, transformer: IURITransformer): any {
return result;
}
export interface IRPCProtocolLogger {
logIncoming(msgLength: number, str: string, data?: any): void;
logOutgoing(msgLength: number, str: string, data?: any): void;
}
export class RPCProtocol implements IRPCProtocol {
private readonly _protocol: IMessagePassingProtocol;
private readonly _logger: IRPCProtocolLogger;
private readonly _uriTransformer: IURITransformer;
private _isDisposed: boolean;
private readonly _locals: { [id: string]: any; };
......@@ -94,9 +101,10 @@ export class RPCProtocol implements IRPCProtocol {
private _lastMessageId: number;
private readonly _invokedHandlers: { [req: string]: TPromise<any>; };
private readonly _pendingRPCReplies: { [msgId: string]: LazyPromise; };
private readonly _multiplexor: RPCMultiplexer;
constructor(protocol: IMessagePassingProtocol, transformer: IURITransformer = null) {
constructor(protocol: IMessagePassingProtocol, logger: IRPCProtocolLogger = null, transformer: IURITransformer = null) {
this._protocol = protocol;
this._logger = logger;
this._uriTransformer = transformer;
this._isDisposed = false;
this._locals = Object.create(null);
......@@ -104,7 +112,7 @@ export class RPCProtocol implements IRPCProtocol {
this._lastMessageId = 0;
this._invokedHandlers = Object.create(null);
this._pendingRPCReplies = {};
this._multiplexor = new RPCMultiplexer(protocol, (msg) => this._receiveOneMessage(msg));
this._protocol.onMessage((msg) => this._receiveOneMessage(msg));
}
public dispose(): void {
......@@ -164,54 +172,107 @@ export class RPCProtocol implements IRPCProtocol {
return;
}
let msg = <RPCMessage>JSON.parse(rawmsg.toString());
if (this._uriTransformer) {
msg = transformIncomingURIs(msg, this._uriTransformer);
}
const msgLength = rawmsg.length;
const buff = MessageBuffer.read(rawmsg, 0);
const messageType = <MessageType>buff.readUInt8();
const req = buff.readUInt32();
switch (msg.type) {
case MessageType.Request:
this._receiveRequest(msg);
switch (messageType) {
case MessageType.RequestJSONArgs: {
let { rpcId, method, args } = MessageIO.deserializeRequestJSONArgs(buff);
if (this._uriTransformer) {
args = transformIncomingURIs(args, this._uriTransformer);
}
this._receiveRequest(msgLength, req, rpcId, method, args);
break;
case MessageType.Cancel:
this._receiveCancel(msg);
}
case MessageType.RequestMixedArgs: {
let { rpcId, method, args } = MessageIO.deserializeRequestMixedArgs(buff);
if (this._uriTransformer) {
args = transformIncomingURIs(args, this._uriTransformer);
}
this._receiveRequest(msgLength, req, rpcId, method, args);
break;
case MessageType.Reply:
this._receiveReply(msg);
}
case MessageType.Cancel: {
this._receiveCancel(msgLength, req);
break;
case MessageType.ReplyErr:
this._receiveReplyErr(msg);
}
case MessageType.ReplyOKEmpty: {
this._receiveReply(msgLength, req, undefined);
break;
}
case MessageType.ReplyOKJSON: {
let value = MessageIO.deserializeReplyOKJSON(buff);
if (this._uriTransformer) {
value = transformIncomingURIs(value, this._uriTransformer);
}
this._receiveReply(msgLength, req, value);
break;
}
case MessageType.ReplyOKBuffer: {
let value = MessageIO.deserializeReplyOKBuffer(buff);
this._receiveReply(msgLength, req, value);
break;
}
case MessageType.ReplyErrError: {
let err = MessageIO.deserializeReplyErrError(buff);
if (this._uriTransformer) {
err = transformIncomingURIs(err, this._uriTransformer);
}
this._receiveReplyErr(req, err);
break;
}
case MessageType.ReplyErrEmpty: {
this._receiveReplyErr(req, undefined);
break;
}
}
}
private _receiveRequest(msg: RequestMessage): void {
const callId = msg.id;
const proxyId = msg.proxyId;
private _receiveRequest(msgLength: number, req: number, rpcId: string, method: string, args: any[]): void {
if (this._logger) {
this._logger.logIncoming(msgLength, `receiveRequest ${req}, ${rpcId}.${method}:`, args);
}
const callId = String(req);
this._invokedHandlers[callId] = this._invokeHandler(proxyId, msg.method, msg.args);
this._invokedHandlers[callId] = this._invokeHandler(rpcId, method, args);
this._invokedHandlers[callId].then((r) => {
delete this._invokedHandlers[callId];
if (this._uriTransformer) {
r = transformOutgoingURIs(r, this._uriTransformer);
}
this._multiplexor.send(Buffer.from(MessageFactory.replyOK(callId, r)));
const msg = MessageIO.serializeReplyOK(req, r);
if (this._logger) {
this._logger.logOutgoing(msg.byteLength, `replyOK ${req}:`, r);
}
this._protocol.send(msg);
}, (err) => {
delete this._invokedHandlers[callId];
this._multiplexor.send(Buffer.from(MessageFactory.replyErr(callId, err)));
const msg = MessageIO.serializeReplyErr(req, err);
if (this._logger) {
this._logger.logOutgoing(msg.byteLength, `replyErr ${req}:`, err);
}
this._protocol.send(msg);
});
}
private _receiveCancel(msg: CancelMessage): void {
const callId = msg.id;
private _receiveCancel(msgLength: number, req: number): void {
if (this._logger) {
this._logger.logIncoming(msgLength, `receiveCancel ${req}`);
}
const callId = String(req);
if (this._invokedHandlers[callId]) {
this._invokedHandlers[callId].cancel();
}
}
private _receiveReply(msg: ReplyMessage): void {
const callId = msg.id;
private _receiveReply(msgLength: number, req: number, value: any): void {
if (this._logger) {
this._logger.logIncoming(msgLength, `receiveReply ${req}:`, value);
}
const callId = String(req);
if (!this._pendingRPCReplies.hasOwnProperty(callId)) {
return;
}
......@@ -219,11 +280,11 @@ export class RPCProtocol implements IRPCProtocol {
const pendingReply = this._pendingRPCReplies[callId];
delete this._pendingRPCReplies[callId];
pendingReply.resolveOk(msg.res);
pendingReply.resolveOk(value);
}
private _receiveReplyErr(msg: ReplyErrMessage): void {
const callId = msg.id;
private _receiveReplyErr(req: number, value: any): void {
const callId = String(req);
if (!this._pendingRPCReplies.hasOwnProperty(callId)) {
return;
}
......@@ -232,11 +293,11 @@ export class RPCProtocol implements IRPCProtocol {
delete this._pendingRPCReplies[callId];
let err: Error = null;
if (msg.err && msg.err.$isError) {
if (value && value.$isError) {
err = new Error();
err.name = msg.err.name;
err.message = msg.err.message;
err.stack = msg.err.stack;
err.name = value.name;
err.message = value.message;
err.stack = value.stack;
}
pendingReply.resolveErr(err);
}
......@@ -266,123 +327,347 @@ export class RPCProtocol implements IRPCProtocol {
return TPromise.wrapError<any>(errors.canceled());
}
const callId = String(++this._lastMessageId);
const req = ++this._lastMessageId;
const callId = String(req);
const result = new LazyPromise(() => {
this._multiplexor.send(Buffer.from(MessageFactory.cancel(callId)));
const msg = MessageIO.serializeCancel(req);
if (this._logger) {
this._logger.logOutgoing(msg.byteLength, `cancel ${req}`);
}
this._protocol.send(MessageIO.serializeCancel(req));
});
this._pendingRPCReplies[callId] = result;
if (this._uriTransformer) {
args = transformOutgoingURIs(args, this._uriTransformer);
}
this._multiplexor.send(Buffer.from(MessageFactory.request(callId, proxyId, methodName, args)));
const msg = MessageIO.serializeRequest(req, proxyId, methodName, args);
if (this._logger) {
this._logger.logOutgoing(msg.byteLength, `request ${req}: ${proxyId}.${methodName}:`, args);
}
this._protocol.send(msg);
return result;
}
}
/**
* Sends/Receives multiple messages in one go:
* - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`.
* - each incoming message is handled in a separate `process.nextTick`.
*/
class RPCMultiplexer {
class MessageBuffer {
private readonly _protocol: IMessagePassingProtocol;
private readonly _sendAccumulatedBound: () => void;
public static alloc(type: MessageType, req: number, messageSize: number): MessageBuffer {
let result = new MessageBuffer(Buffer.allocUnsafe(messageSize + 1 /* type */ + 4 /* req */), 0);
result.writeUInt8(type);
result.writeUInt32(req);
return result;
}
private _messagesToSend: Buffer[];
public static read(buff: Buffer, offset: number): MessageBuffer {
return new MessageBuffer(buff, offset);
}
constructor(protocol: IMessagePassingProtocol, onMessage: (msg: Buffer) => void) {
this._protocol = protocol;
this._sendAccumulatedBound = this._sendAccumulated.bind(this);
private _buff: Buffer;
private _offset: number;
this._messagesToSend = [];
public get buffer(): Buffer {
return this._buff;
}
this._protocol.onMessage(data => {
let i = 0;
private constructor(buff: Buffer, offset: number) {
this._buff = buff;
this._offset = offset;
}
while (i < data.length) {
const size = data.readUInt32BE(i);
onMessage(data.slice(i + 4, i + 4 + size));
i += 4 + size;
}
});
public writeUInt8(n: number): void {
this._buff.writeUInt8(n, this._offset, true); this._offset += 1;
}
public readUInt8(): number {
const n = this._buff.readUInt8(this._offset, true); this._offset += 1;
return n;
}
private _sendAccumulated(): void {
const size = this._messagesToSend.reduce((r, b) => r + 4 + b.byteLength, 0);
const buffer = Buffer.allocUnsafe(size);
let i = 0;
public writeUInt32(n: number): void {
this._buff.writeUInt32BE(n, this._offset, true); this._offset += 4;
}
for (const msg of this._messagesToSend) {
buffer.writeUInt32BE(msg.byteLength, i);
msg.copy(buffer, i + 4);
i += 4 + msg.byteLength;
public readUInt32(): number {
const n = this._buff.readUInt32BE(this._offset, true); this._offset += 4;
return n;
}
public static sizeShortString(str: string, strByteLength: number): number {
return 1 /* string length */ + strByteLength /* actual string */;
}
public writeShortString(str: string, strByteLength: number): void {
this._buff.writeUInt8(strByteLength, this._offset, true); this._offset += 1;
this._buff.write(str, this._offset, strByteLength, 'utf8'); this._offset += strByteLength;
}
public readShortString(): string {
const strLength = this._buff.readUInt8(this._offset, true); this._offset += 1;
const str = this._buff.toString('utf8', this._offset, this._offset + strLength); this._offset += strLength;
return str;
}
public static sizeLongString(str: string, strByteLength: number): number {
return 4 /* string length */ + strByteLength /* actual string */;
}
public writeLongString(str: string, strByteLength: number): void {
this._buff.writeUInt32LE(strByteLength, this._offset, true); this._offset += 4;
this._buff.write(str, this._offset, strByteLength, 'utf8'); this._offset += strByteLength;
}
public readLongString(): string {
const strLength = this._buff.readUInt32LE(this._offset, true); this._offset += 4;
const str = this._buff.toString('utf8', this._offset, this._offset + strLength); this._offset += strLength;
return str;
}
public static sizeBuffer(buff: Buffer, buffByteLength: number): number {
return 4 /* buffer length */ + buffByteLength /* actual buffer */;
}
public writeBuffer(buff: Buffer, buffByteLength: number): void {
this._buff.writeUInt32LE(buffByteLength, this._offset, true); this._offset += 4;
buff.copy(this._buff, this._offset); this._offset += buffByteLength;
}
public readBuffer(): Buffer {
const buffLength = this._buff.readUInt32LE(this._offset, true); this._offset += 4;
const buff = this._buff.slice(this._offset, this._offset + buffLength); this._offset += buffLength;
return buff;
}
public static sizeMixedArray(arr: (string | Buffer)[], arrLengths: number[]): number {
let size = 0;
size += 1; // arr length
for (let i = 0, len = arr.length; i < len; i++) {
const el = arr[i];
const elLength = arrLengths[i];
size += 1; // arg type
if (typeof el === 'string') {
size += this.sizeLongString(el, elLength);
} else {
size += this.sizeBuffer(el, elLength);
}
}
return size;
}
this._messagesToSend = [];
this._protocol.send(buffer);
public writeMixedArray(arr: (string | Buffer)[], arrLengths: number[]): void {
this._buff.writeUInt8(arr.length, this._offset, true); this._offset += 1;
for (let i = 0, len = arr.length; i < len; i++) {
const el = arr[i];
const elLength = arrLengths[i];
if (typeof el === 'string') {
this.writeUInt8(ArgType.String);
this.writeLongString(el, elLength);
} else {
this.writeUInt8(ArgType.Buffer);
this.writeBuffer(el, elLength);
}
}
}
public send(msg: Buffer): void {
if (this._messagesToSend.length === 0) {
process.nextTick(this._sendAccumulatedBound);
public readMixedArray(): (string | Buffer)[] {
const arrLen = this._buff.readUInt8(this._offset, true); this._offset += 1;
let arr: (string | Buffer)[] = new Array(arrLen);
for (let i = 0; i < arrLen; i++) {
const argType = <ArgType>this.readUInt8();
if (argType === ArgType.String) {
arr[i] = this.readLongString();
} else {
arr[i] = this.readBuffer();
}
}
this._messagesToSend.push(msg);
return arr;
}
}
class MessageFactory {
public static cancel(req: string): string {
return `{"type":${MessageType.Cancel},"id":"${req}"}`;
class MessageIO {
private static _arrayContainsBuffer(arr: any[]): boolean {
for (let i = 0, len = arr.length; i < len; i++) {
if (Buffer.isBuffer(arr[i])) {
return true;
}
}
return false;
}
public static request(req: string, rpcId: string, method: string, args: any[]): string {
return `{"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args)}}`;
public static serializeRequest(req: number, rpcId: string, method: string, args: any[]): Buffer {
if (this._arrayContainsBuffer(args)) {
let massagedArgs: (string | Buffer)[] = new Array(args.length);
let argsLengths: number[] = new Array(args.length);
for (let i = 0, len = args.length; i < len; i++) {
const arg = args[i];
if (Buffer.isBuffer(arg)) {
massagedArgs[i] = arg;
argsLengths[i] = arg.byteLength;
} else {
massagedArgs[i] = JSON.stringify(arg);
argsLengths[i] = Buffer.byteLength(massagedArgs[i], 'utf8');
}
}
return this._requestMixedArgs(req, rpcId, method, massagedArgs, argsLengths);
}
return this._requestJSONArgs(req, rpcId, method, JSON.stringify(args));
}
public static replyOK(req: string, res: any): string {
private static _requestJSONArgs(req: number, rpcId: string, method: string, args: string): Buffer {
const rpcIdByteLength = Buffer.byteLength(rpcId, 'utf8');
const methodByteLength = Buffer.byteLength(method, 'utf8');
const argsByteLength = Buffer.byteLength(args, 'utf8');
let len = 0;
len += MessageBuffer.sizeShortString(rpcId, rpcIdByteLength);
len += MessageBuffer.sizeShortString(method, methodByteLength);
len += MessageBuffer.sizeLongString(args, argsByteLength);
let result = MessageBuffer.alloc(MessageType.RequestJSONArgs, req, len);
result.writeShortString(rpcId, rpcIdByteLength);
result.writeShortString(method, methodByteLength);
result.writeLongString(args, argsByteLength);
return result.buffer;
}
public static deserializeRequestJSONArgs(buff: MessageBuffer): { rpcId: string; method: string; args: any[]; } {
const rpcId = buff.readShortString();
const method = buff.readShortString();
const args = buff.readLongString();
return {
rpcId: rpcId,
method: method,
args: JSON.parse(args)
};
}
private static _requestMixedArgs(req: number, rpcId: string, method: string, args: (string | Buffer)[], argsLengths: number[]): Buffer {
const rpcIdByteLength = Buffer.byteLength(rpcId, 'utf8');
const methodByteLength = Buffer.byteLength(method, 'utf8');
let len = 0;
len += MessageBuffer.sizeShortString(rpcId, rpcIdByteLength);
len += MessageBuffer.sizeShortString(method, methodByteLength);
len += MessageBuffer.sizeMixedArray(args, argsLengths);
let result = MessageBuffer.alloc(MessageType.RequestMixedArgs, req, len);
result.writeShortString(rpcId, rpcIdByteLength);
result.writeShortString(method, methodByteLength);
result.writeMixedArray(args, argsLengths);
return result.buffer;
}
public static deserializeRequestMixedArgs(buff: MessageBuffer): { rpcId: string; method: string; args: any[]; } {
const rpcId = buff.readShortString();
const method = buff.readShortString();
const rawargs = buff.readMixedArray();
const args: any[] = new Array(rawargs.length);
for (let i = 0, len = rawargs.length; i < len; i++) {
const rawarg = rawargs[i];
if (typeof rawarg === 'string') {
args[i] = JSON.parse(rawarg);
} else {
args[i] = rawarg;
}
}
return {
rpcId: rpcId,
method: method,
args: args
};
}
public static serializeCancel(req: number): Buffer {
return MessageBuffer.alloc(MessageType.Cancel, req, 0).buffer;
}
public static serializeReplyOK(req: number, res: any): Buffer {
if (typeof res === 'undefined') {
return `{"type":${MessageType.Reply},"id":"${req}"}`;
return this._serializeReplyOKEmpty(req);
}
if (Buffer.isBuffer(res)) {
return this._serializeReplyOKBuffer(req, res);
}
return `{"type":${MessageType.Reply},"id":"${req}","res":${JSON.stringify(res)}}`;
return this._serializeReplyOKJSON(req, JSON.stringify(res));
}
private static _serializeReplyOKEmpty(req: number): Buffer {
return MessageBuffer.alloc(MessageType.ReplyOKEmpty, req, 0).buffer;
}
private static _serializeReplyOKBuffer(req: number, res: Buffer): Buffer {
const resByteLength = res.byteLength;
let len = 0;
len += MessageBuffer.sizeBuffer(res, resByteLength);
let result = MessageBuffer.alloc(MessageType.ReplyOKBuffer, req, len);
result.writeBuffer(res, resByteLength);
return result.buffer;
}
public static deserializeReplyOKBuffer(buff: MessageBuffer): Buffer {
return buff.readBuffer();
}
private static _serializeReplyOKJSON(req: number, res: string): Buffer {
const resByteLength = Buffer.byteLength(res, 'utf8');
let len = 0;
len += MessageBuffer.sizeLongString(res, resByteLength);
let result = MessageBuffer.alloc(MessageType.ReplyOKJSON, req, len);
result.writeLongString(res, resByteLength);
return result.buffer;
}
public static replyErr(req: string, err: any): string {
public static deserializeReplyOKJSON(buff: MessageBuffer): any {
const res = buff.readLongString();
return JSON.parse(res);
}
public static serializeReplyErr(req: number, err: any): Buffer {
if (err instanceof Error) {
return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${JSON.stringify(errors.transformErrorForSerialization(err))}}`;
return this._serializeReplyErrEror(req, err);
}
return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`;
return this._serializeReplyErrEmpty(req);
}
private static _serializeReplyErrEror(req: number, _err: Error): Buffer {
const err = JSON.stringify(errors.transformErrorForSerialization(_err));
const errByteLength = Buffer.byteLength(err, 'utf8');
let len = 0;
len += MessageBuffer.sizeLongString(err, errByteLength);
let result = MessageBuffer.alloc(MessageType.ReplyErrError, req, len);
result.writeLongString(err, errByteLength);
return result.buffer;
}
public static deserializeReplyErrError(buff: MessageBuffer): Error {
const err = buff.readLongString();
return JSON.parse(err);
}
private static _serializeReplyErrEmpty(req: number): Buffer {
return MessageBuffer.alloc(MessageType.ReplyErrEmpty, req, 0).buffer;
}
}
const enum MessageType {
Request = 1,
Cancel = 2,
Reply = 3,
ReplyErr = 4
RequestJSONArgs = 1,
RequestMixedArgs = 2,
Cancel = 3,
ReplyOKEmpty = 4,
ReplyOKBuffer = 5,
ReplyOKJSON = 6,
ReplyErrError = 7,
ReplyErrEmpty = 8,
}
class RequestMessage {
type: MessageType.Request;
id: string;
proxyId: string;
method: string;
args: any[];
}
class CancelMessage {
type: MessageType.Cancel;
id: string;
const enum ArgType {
String = 1,
Buffer = 2
}
class ReplyMessage {
type: MessageType.Reply;
id: string;
res: any;
}
class ReplyErrMessage {
type: MessageType.ReplyErr;
id: string;
err: errors.SerializedError;
}
type RPCMessage = RequestMessage | CancelMessage | ReplyMessage | ReplyErrMessage;
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import * as assert from 'assert';
import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { Event, Emitter } from 'vs/base/common/event';
import { ProxyIdentifier } from 'vs/workbench/services/extensions/node/proxyIdentifier';
import { TPromise } from 'vs/base/common/winjs.base';
suite('RPCProtocol', () => {
class MessagePassingProtocol implements IMessagePassingProtocol {
private _pair: MessagePassingProtocol;
private readonly _onMessage: Emitter<Buffer> = new Emitter<Buffer>();
public readonly onMessage: Event<Buffer> = this._onMessage.event;
public setPair(other: MessagePassingProtocol) {
this._pair = other;
}
public send(buffer: Buffer): void {
process.nextTick(() => {
this._pair._onMessage.fire(buffer);
});
}
}
let delegate: (a1: any, a2: any) => any;
let bProxy: BClass;
class BClass {
$m(a1: any, a2: any): TPromise<any> {
return TPromise.as(delegate.call(null, a1, a2));
}
}
setup(() => {
let a_protocol = new MessagePassingProtocol();
let b_protocol = new MessagePassingProtocol();
a_protocol.setPair(b_protocol);
b_protocol.setPair(a_protocol);
let A = new RPCProtocol(a_protocol);
let B = new RPCProtocol(b_protocol);
delegate = null;
const bIdentifier = new ProxyIdentifier<BClass>(false, 'bb');
const bInstance = new BClass();
B.set(bIdentifier, bInstance);
bProxy = A.getProxy(bIdentifier);
});
test('simple call', function (done) {
delegate = (a1: number, a2: number) => a1 + a2;
bProxy.$m(4, 1).then((res: number) => {
assert.equal(res, 5);
done(null);
}, done);
});
test('simple call without result', function (done) {
delegate = (a1: number, a2: number) => { };
bProxy.$m(4, 1).then((res: number) => {
assert.equal(res, undefined);
done(null);
}, done);
});
test('passing buffer as argument', function (done) {
delegate = (a1: Buffer, a2: number) => {
assert.ok(Buffer.isBuffer(a1));
return a1[a2];
};
let b = Buffer.allocUnsafe(4);
b[0] = 1;
b[1] = 2;
b[2] = 3;
b[3] = 4;
bProxy.$m(b, 2).then((res: number) => {
assert.equal(res, 3);
done(null);
}, done);
});
test('returning a buffer', function (done) {
delegate = (a1: number, a2: number) => {
let b = Buffer.allocUnsafe(4);
b[0] = 1;
b[1] = 2;
b[2] = 3;
b[3] = 4;
return b;
};
bProxy.$m(4, 1).then((res: Buffer) => {
assert.ok(Buffer.isBuffer(res));
assert.equal(res[0], 1);
assert.equal(res[1], 2);
assert.equal(res[2], 3);
assert.equal(res[3], 4);
done(null);
}, done);
});
test('cancelling a call', function () {
delegate = (a1: number, a2: number) => a1 + a2;
let p = bProxy.$m(4, 1);
p.then((res: number) => {
assert.fail('should not receive result');
});
p.cancel();
});
test('throwing an error', function (done) {
delegate = (a1: number, a2: number) => {
throw new Error(`nope`);
};
bProxy.$m(4, 1).then((res) => {
assert.fail('unexpected');
done(null);
}, (err) => {
assert.equal(err.message, 'nope');
done(null);
});
});
test('error promise', function (done) {
delegate = (a1: number, a2: number) => {
return TPromise.wrapError(undefined);
};
bProxy.$m(4, 1).then((res) => {
assert.fail('unexpected');
done(null);
}, (err) => {
assert.equal(err, undefined);
done(null);
});
});
});
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册