diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index 3360b004e345d28ff9981b7b2ee7d956eae80ef3..cd6474e039c0c25e8cb02b5bbd8becf22eceae67 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -164,7 +164,7 @@ export class Protocol implements IDisposable, IMessagePassingProtocol { } send(buffer: Buffer): void { - const header = Buffer.alloc(Protocol._headerLen); + const header = Buffer.allocUnsafe(Protocol._headerLen); header.writeUInt32BE(buffer.length, 0, true); this._writeSoon(header, buffer); } diff --git a/src/vs/workbench/api/electron-browser/mainThreadFileSystem.ts b/src/vs/workbench/api/electron-browser/mainThreadFileSystem.ts index 657cbbcdca5098f3daf1c5801f547ea693885be3..51a029b241e54e4abadb95d94477fc5773128b9b 100644 --- a/src/vs/workbench/api/electron-browser/mainThreadFileSystem.ts +++ b/src/vs/workbench/api/electron-browser/mainThreadFileSystem.ts @@ -99,15 +99,13 @@ class RemoteFileSystemProvider implements IFileSystemProvider { } readFile(resource: URI): TPromise { - return this._proxy.$readFile(this._handle, resource).then(encoded => { - return Buffer.from(encoded, 'base64'); - }); + return this._proxy.$readFile(this._handle, resource); } writeFile(resource: URI, content: Uint8Array, opts: FileWriteOptions): TPromise { let encoded = Buffer.isBuffer(content) - ? content.toString('base64') - : Buffer.from(content.buffer, content.byteOffset, content.byteLength).toString('base64'); + ? content + : Buffer.from(content.buffer, content.byteOffset, content.byteLength); return this._proxy.$writeFile(this._handle, resource, encoded, opts); } diff --git a/src/vs/workbench/api/node/extHost.protocol.ts b/src/vs/workbench/api/node/extHost.protocol.ts index e95a1c07de706a45cc96a1c2bfcd7535e34c8900..e37eae6024cf38d205f23fdec595cf7b45f5c24c 100644 --- a/src/vs/workbench/api/node/extHost.protocol.ts +++ b/src/vs/workbench/api/node/extHost.protocol.ts @@ -680,8 +680,8 @@ export interface ExtHostWorkspaceShape { export interface ExtHostFileSystemShape { $stat(handle: number, resource: UriComponents): TPromise; $readdir(handle: number, resource: UriComponents): TPromise<[string, FileType][]>; - $readFile(handle: number, resource: UriComponents): TPromise; - $writeFile(handle: number, resource: UriComponents, base64Encoded: string, opts: FileWriteOptions): TPromise; + $readFile(handle: number, resource: UriComponents): TPromise; + $writeFile(handle: number, resource: UriComponents, content: Buffer, opts: FileWriteOptions): TPromise; $rename(handle: number, resource: UriComponents, target: UriComponents, opts: FileOverwriteOptions): TPromise; $copy(handle: number, resource: UriComponents, target: UriComponents, opts: FileOverwriteOptions): TPromise; $mkdir(handle: number, resource: UriComponents): TPromise; diff --git a/src/vs/workbench/api/node/extHostFileSystem.ts b/src/vs/workbench/api/node/extHostFileSystem.ts index 67508b9b33957ee02440dc9505087a7d215eedb3..140efe9d6944b1e303c95cfcc8074eb205c49ec3 100644 --- a/src/vs/workbench/api/node/extHostFileSystem.ts +++ b/src/vs/workbench/api/node/extHostFileSystem.ts @@ -159,16 +159,16 @@ export class ExtHostFileSystem implements ExtHostFileSystemShape { return asWinJsPromise(() => this._fsProvider.get(handle).readDirectory(URI.revive(resource))); } - $readFile(handle: number, resource: UriComponents): TPromise { + $readFile(handle: number, resource: UriComponents): TPromise { return asWinJsPromise(() => { return this._fsProvider.get(handle).readFile(URI.revive(resource)); }).then(data => { - return Buffer.isBuffer(data) ? data.toString('base64') : Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString('base64'); + return Buffer.isBuffer(data) ? data : Buffer.from(data.buffer, data.byteOffset, data.byteLength); }); } - $writeFile(handle: number, resource: UriComponents, base64Content: string, opts: files.FileWriteOptions): TPromise { - return asWinJsPromise(() => this._fsProvider.get(handle).writeFile(URI.revive(resource), Buffer.from(base64Content, 'base64'), opts)); + $writeFile(handle: number, resource: UriComponents, content: Buffer, opts: files.FileWriteOptions): TPromise { + return asWinJsPromise(() => this._fsProvider.get(handle).writeFile(URI.revive(resource), content, opts)); } $delete(handle: number, resource: UriComponents, opts: files.FileDeleteOptions): TPromise { diff --git a/src/vs/workbench/services/extensions/electron-browser/extensionService.ts b/src/vs/workbench/services/extensions/electron-browser/extensionService.ts index 170a3dd9db78f67683651360495aa9e73366393b..f84965c20b9b7ed8555dd21349b06136869cdd3c 100644 --- a/src/vs/workbench/services/extensions/electron-browser/extensionService.ts +++ b/src/vs/workbench/services/extensions/electron-browser/extensionService.ts @@ -39,7 +39,7 @@ import { Event, Emitter } from 'vs/base/common/event'; import { ExtensionHostProfiler } from 'vs/workbench/services/extensions/electron-browser/extensionHostProfiler'; import product from 'vs/platform/node/product'; import * as strings from 'vs/base/common/strings'; -import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol'; +import { RPCProtocol, IRPCProtocolLogger } from 'vs/workbench/services/extensions/node/rpcProtocol'; import { INotificationService, Severity } from 'vs/platform/notification/common/notification'; import { isFalsyOrEmpty } from 'vs/base/common/arrays'; import { Schemas } from 'vs/base/common/network'; @@ -180,11 +180,12 @@ export class ExtensionHostProcessManager extends Disposable { private _createExtensionHostCustomers(protocol: IMessagePassingProtocol): ExtHostExtensionServiceShape { + let logger: IRPCProtocolLogger = null; if (logExtensionHostCommunication || this._environmentService.logExtensionHostCommunication) { - protocol = asLoggingProtocol(protocol); + logger = new RPCLogger(); } - this._extensionHostProcessRPCProtocol = new RPCProtocol(protocol); + this._extensionHostProcessRPCProtocol = new RPCProtocol(protocol, logger); const extHostContext: IExtHostContext = { getProxy: (identifier: ProxyIdentifier): T => this._extensionHostProcessRPCProtocol.getProxy(identifier), set: (identifier: ProxyIdentifier, instance: R): R => this._extensionHostProcessRPCProtocol.set(identifier, instance), @@ -954,20 +955,20 @@ export class ExtensionService extends Disposable implements IExtensionService { } } -function asLoggingProtocol(protocol: IMessagePassingProtocol): IMessagePassingProtocol { +class RPCLogger implements IRPCProtocolLogger { - protocol.onMessage(msg => { - console.log('%c[Extension \u2192 Window]%c[len: ' + strings.pad(msg.length, 5, ' ') + ']', 'color: darkgreen', 'color: grey', msg); - }); + private _totalIncoming = 0; + private _totalOutgoing = 0; - return { - onMessage: protocol.onMessage, + logIncoming(msgLength: number, str: string, data?: any): void { + this._totalIncoming += msgLength; + console.log(`%c[Extension \u2192 Window]%c[${strings.pad(this._totalIncoming, 7, ' ')}]%c[len: ${strings.pad(msgLength, 5, ' ')}]`, 'color: darkgreen', 'color: grey', 'color: grey', str, data); + } - send(msg: any) { - protocol.send(msg); - console.log('%c[Window \u2192 Extension]%c[len: ' + strings.pad(msg.length, 5, ' ') + ']', 'color: darkgreen', 'color: grey', msg); - } - }; + logOutgoing(msgLength: number, str: string, data?: any): void { + this._totalOutgoing += msgLength; + console.log(`%c[Window \u2192 Extension]%c[${strings.pad(this._totalOutgoing, 7, ' ')}]%c[len: ${strings.pad(msgLength, 5, ' ')}]`, 'color: darkgreen', 'color: grey', 'color: grey', str, data); + } } interface IExtensionCacheData { diff --git a/src/vs/workbench/services/extensions/node/rpcProtocol.ts b/src/vs/workbench/services/extensions/node/rpcProtocol.ts index a2b4a81e1452c1a6949ffd828dd9a238b93f7f04..effdbaf55188eb24ac82948ba5c01a32c11b0c89 100644 --- a/src/vs/workbench/services/extensions/node/rpcProtocol.ts +++ b/src/vs/workbench/services/extensions/node/rpcProtocol.ts @@ -85,8 +85,15 @@ function transformIncomingURIs(obj: any, transformer: IURITransformer): any { return result; } +export interface IRPCProtocolLogger { + logIncoming(msgLength: number, str: string, data?: any): void; + logOutgoing(msgLength: number, str: string, data?: any): void; +} + export class RPCProtocol implements IRPCProtocol { + private readonly _protocol: IMessagePassingProtocol; + private readonly _logger: IRPCProtocolLogger; private readonly _uriTransformer: IURITransformer; private _isDisposed: boolean; private readonly _locals: { [id: string]: any; }; @@ -94,9 +101,10 @@ export class RPCProtocol implements IRPCProtocol { private _lastMessageId: number; private readonly _invokedHandlers: { [req: string]: TPromise; }; private readonly _pendingRPCReplies: { [msgId: string]: LazyPromise; }; - private readonly _multiplexor: RPCMultiplexer; - constructor(protocol: IMessagePassingProtocol, transformer: IURITransformer = null) { + constructor(protocol: IMessagePassingProtocol, logger: IRPCProtocolLogger = null, transformer: IURITransformer = null) { + this._protocol = protocol; + this._logger = logger; this._uriTransformer = transformer; this._isDisposed = false; this._locals = Object.create(null); @@ -104,7 +112,7 @@ export class RPCProtocol implements IRPCProtocol { this._lastMessageId = 0; this._invokedHandlers = Object.create(null); this._pendingRPCReplies = {}; - this._multiplexor = new RPCMultiplexer(protocol, (msg) => this._receiveOneMessage(msg)); + this._protocol.onMessage((msg) => this._receiveOneMessage(msg)); } public dispose(): void { @@ -164,54 +172,107 @@ export class RPCProtocol implements IRPCProtocol { return; } - let msg = JSON.parse(rawmsg.toString()); - if (this._uriTransformer) { - msg = transformIncomingURIs(msg, this._uriTransformer); - } + const msgLength = rawmsg.length; + const buff = MessageBuffer.read(rawmsg, 0); + const messageType = buff.readUInt8(); + const req = buff.readUInt32(); - switch (msg.type) { - case MessageType.Request: - this._receiveRequest(msg); + switch (messageType) { + case MessageType.RequestJSONArgs: { + let { rpcId, method, args } = MessageIO.deserializeRequestJSONArgs(buff); + if (this._uriTransformer) { + args = transformIncomingURIs(args, this._uriTransformer); + } + this._receiveRequest(msgLength, req, rpcId, method, args); break; - case MessageType.Cancel: - this._receiveCancel(msg); + } + case MessageType.RequestMixedArgs: { + let { rpcId, method, args } = MessageIO.deserializeRequestMixedArgs(buff); + if (this._uriTransformer) { + args = transformIncomingURIs(args, this._uriTransformer); + } + this._receiveRequest(msgLength, req, rpcId, method, args); break; - case MessageType.Reply: - this._receiveReply(msg); + } + case MessageType.Cancel: { + this._receiveCancel(msgLength, req); break; - case MessageType.ReplyErr: - this._receiveReplyErr(msg); + } + case MessageType.ReplyOKEmpty: { + this._receiveReply(msgLength, req, undefined); break; + } + case MessageType.ReplyOKJSON: { + let value = MessageIO.deserializeReplyOKJSON(buff); + if (this._uriTransformer) { + value = transformIncomingURIs(value, this._uriTransformer); + } + this._receiveReply(msgLength, req, value); + break; + } + case MessageType.ReplyOKBuffer: { + let value = MessageIO.deserializeReplyOKBuffer(buff); + this._receiveReply(msgLength, req, value); + break; + } + case MessageType.ReplyErrError: { + let err = MessageIO.deserializeReplyErrError(buff); + if (this._uriTransformer) { + err = transformIncomingURIs(err, this._uriTransformer); + } + this._receiveReplyErr(req, err); + break; + } + case MessageType.ReplyErrEmpty: { + this._receiveReplyErr(req, undefined); + break; + } } } - private _receiveRequest(msg: RequestMessage): void { - const callId = msg.id; - const proxyId = msg.proxyId; + private _receiveRequest(msgLength: number, req: number, rpcId: string, method: string, args: any[]): void { + if (this._logger) { + this._logger.logIncoming(msgLength, `receiveRequest ${req}, ${rpcId}.${method}:`, args); + } + const callId = String(req); - this._invokedHandlers[callId] = this._invokeHandler(proxyId, msg.method, msg.args); + this._invokedHandlers[callId] = this._invokeHandler(rpcId, method, args); this._invokedHandlers[callId].then((r) => { delete this._invokedHandlers[callId]; if (this._uriTransformer) { r = transformOutgoingURIs(r, this._uriTransformer); } - this._multiplexor.send(Buffer.from(MessageFactory.replyOK(callId, r))); + const msg = MessageIO.serializeReplyOK(req, r); + if (this._logger) { + this._logger.logOutgoing(msg.byteLength, `replyOK ${req}:`, r); + } + this._protocol.send(msg); }, (err) => { delete this._invokedHandlers[callId]; - this._multiplexor.send(Buffer.from(MessageFactory.replyErr(callId, err))); + const msg = MessageIO.serializeReplyErr(req, err); + if (this._logger) { + this._logger.logOutgoing(msg.byteLength, `replyErr ${req}:`, err); + } + this._protocol.send(msg); }); } - private _receiveCancel(msg: CancelMessage): void { - const callId = msg.id; + private _receiveCancel(msgLength: number, req: number): void { + if (this._logger) { + this._logger.logIncoming(msgLength, `receiveCancel ${req}`); + } + const callId = String(req); if (this._invokedHandlers[callId]) { this._invokedHandlers[callId].cancel(); } } - private _receiveReply(msg: ReplyMessage): void { - const callId = msg.id; + private _receiveReply(msgLength: number, req: number, value: any): void { + if (this._logger) { + this._logger.logIncoming(msgLength, `receiveReply ${req}:`, value); + } + const callId = String(req); if (!this._pendingRPCReplies.hasOwnProperty(callId)) { return; } @@ -219,11 +280,11 @@ export class RPCProtocol implements IRPCProtocol { const pendingReply = this._pendingRPCReplies[callId]; delete this._pendingRPCReplies[callId]; - pendingReply.resolveOk(msg.res); + pendingReply.resolveOk(value); } - private _receiveReplyErr(msg: ReplyErrMessage): void { - const callId = msg.id; + private _receiveReplyErr(req: number, value: any): void { + const callId = String(req); if (!this._pendingRPCReplies.hasOwnProperty(callId)) { return; } @@ -232,11 +293,11 @@ export class RPCProtocol implements IRPCProtocol { delete this._pendingRPCReplies[callId]; let err: Error = null; - if (msg.err && msg.err.$isError) { + if (value && value.$isError) { err = new Error(); - err.name = msg.err.name; - err.message = msg.err.message; - err.stack = msg.err.stack; + err.name = value.name; + err.message = value.message; + err.stack = value.stack; } pendingReply.resolveErr(err); } @@ -266,123 +327,347 @@ export class RPCProtocol implements IRPCProtocol { return TPromise.wrapError(errors.canceled()); } - const callId = String(++this._lastMessageId); + const req = ++this._lastMessageId; + const callId = String(req); const result = new LazyPromise(() => { - this._multiplexor.send(Buffer.from(MessageFactory.cancel(callId))); + const msg = MessageIO.serializeCancel(req); + if (this._logger) { + this._logger.logOutgoing(msg.byteLength, `cancel ${req}`); + } + this._protocol.send(MessageIO.serializeCancel(req)); }); this._pendingRPCReplies[callId] = result; if (this._uriTransformer) { args = transformOutgoingURIs(args, this._uriTransformer); } - this._multiplexor.send(Buffer.from(MessageFactory.request(callId, proxyId, methodName, args))); + const msg = MessageIO.serializeRequest(req, proxyId, methodName, args); + if (this._logger) { + this._logger.logOutgoing(msg.byteLength, `request ${req}: ${proxyId}.${methodName}:`, args); + } + this._protocol.send(msg); return result; } } -/** - * Sends/Receives multiple messages in one go: - * - multiple messages to be sent from one stack get sent in bulk at `process.nextTick`. - * - each incoming message is handled in a separate `process.nextTick`. - */ -class RPCMultiplexer { +class MessageBuffer { - private readonly _protocol: IMessagePassingProtocol; - private readonly _sendAccumulatedBound: () => void; + public static alloc(type: MessageType, req: number, messageSize: number): MessageBuffer { + let result = new MessageBuffer(Buffer.allocUnsafe(messageSize + 1 /* type */ + 4 /* req */), 0); + result.writeUInt8(type); + result.writeUInt32(req); + return result; + } - private _messagesToSend: Buffer[]; + public static read(buff: Buffer, offset: number): MessageBuffer { + return new MessageBuffer(buff, offset); + } - constructor(protocol: IMessagePassingProtocol, onMessage: (msg: Buffer) => void) { - this._protocol = protocol; - this._sendAccumulatedBound = this._sendAccumulated.bind(this); + private _buff: Buffer; + private _offset: number; - this._messagesToSend = []; + public get buffer(): Buffer { + return this._buff; + } - this._protocol.onMessage(data => { - let i = 0; + private constructor(buff: Buffer, offset: number) { + this._buff = buff; + this._offset = offset; + } - while (i < data.length) { - const size = data.readUInt32BE(i); - onMessage(data.slice(i + 4, i + 4 + size)); - i += 4 + size; - } - }); + public writeUInt8(n: number): void { + this._buff.writeUInt8(n, this._offset, true); this._offset += 1; + } + + public readUInt8(): number { + const n = this._buff.readUInt8(this._offset, true); this._offset += 1; + return n; } - private _sendAccumulated(): void { - const size = this._messagesToSend.reduce((r, b) => r + 4 + b.byteLength, 0); - const buffer = Buffer.allocUnsafe(size); - let i = 0; + public writeUInt32(n: number): void { + this._buff.writeUInt32BE(n, this._offset, true); this._offset += 4; + } - for (const msg of this._messagesToSend) { - buffer.writeUInt32BE(msg.byteLength, i); - msg.copy(buffer, i + 4); - i += 4 + msg.byteLength; + public readUInt32(): number { + const n = this._buff.readUInt32BE(this._offset, true); this._offset += 4; + return n; + } + + public static sizeShortString(str: string, strByteLength: number): number { + return 1 /* string length */ + strByteLength /* actual string */; + } + + public writeShortString(str: string, strByteLength: number): void { + this._buff.writeUInt8(strByteLength, this._offset, true); this._offset += 1; + this._buff.write(str, this._offset, strByteLength, 'utf8'); this._offset += strByteLength; + } + + public readShortString(): string { + const strLength = this._buff.readUInt8(this._offset, true); this._offset += 1; + const str = this._buff.toString('utf8', this._offset, this._offset + strLength); this._offset += strLength; + return str; + } + + public static sizeLongString(str: string, strByteLength: number): number { + return 4 /* string length */ + strByteLength /* actual string */; + } + + public writeLongString(str: string, strByteLength: number): void { + this._buff.writeUInt32LE(strByteLength, this._offset, true); this._offset += 4; + this._buff.write(str, this._offset, strByteLength, 'utf8'); this._offset += strByteLength; + } + + public readLongString(): string { + const strLength = this._buff.readUInt32LE(this._offset, true); this._offset += 4; + const str = this._buff.toString('utf8', this._offset, this._offset + strLength); this._offset += strLength; + return str; + } + + public static sizeBuffer(buff: Buffer, buffByteLength: number): number { + return 4 /* buffer length */ + buffByteLength /* actual buffer */; + } + + public writeBuffer(buff: Buffer, buffByteLength: number): void { + this._buff.writeUInt32LE(buffByteLength, this._offset, true); this._offset += 4; + buff.copy(this._buff, this._offset); this._offset += buffByteLength; + } + + public readBuffer(): Buffer { + const buffLength = this._buff.readUInt32LE(this._offset, true); this._offset += 4; + const buff = this._buff.slice(this._offset, this._offset + buffLength); this._offset += buffLength; + return buff; + } + + public static sizeMixedArray(arr: (string | Buffer)[], arrLengths: number[]): number { + let size = 0; + size += 1; // arr length + for (let i = 0, len = arr.length; i < len; i++) { + const el = arr[i]; + const elLength = arrLengths[i]; + size += 1; // arg type + if (typeof el === 'string') { + size += this.sizeLongString(el, elLength); + } else { + size += this.sizeBuffer(el, elLength); + } } + return size; + } - this._messagesToSend = []; - this._protocol.send(buffer); + public writeMixedArray(arr: (string | Buffer)[], arrLengths: number[]): void { + this._buff.writeUInt8(arr.length, this._offset, true); this._offset += 1; + for (let i = 0, len = arr.length; i < len; i++) { + const el = arr[i]; + const elLength = arrLengths[i]; + if (typeof el === 'string') { + this.writeUInt8(ArgType.String); + this.writeLongString(el, elLength); + } else { + this.writeUInt8(ArgType.Buffer); + this.writeBuffer(el, elLength); + } + } } - public send(msg: Buffer): void { - if (this._messagesToSend.length === 0) { - process.nextTick(this._sendAccumulatedBound); + public readMixedArray(): (string | Buffer)[] { + const arrLen = this._buff.readUInt8(this._offset, true); this._offset += 1; + let arr: (string | Buffer)[] = new Array(arrLen); + for (let i = 0; i < arrLen; i++) { + const argType = this.readUInt8(); + if (argType === ArgType.String) { + arr[i] = this.readLongString(); + } else { + arr[i] = this.readBuffer(); + } } - this._messagesToSend.push(msg); + return arr; } } -class MessageFactory { - public static cancel(req: string): string { - return `{"type":${MessageType.Cancel},"id":"${req}"}`; +class MessageIO { + + private static _arrayContainsBuffer(arr: any[]): boolean { + for (let i = 0, len = arr.length; i < len; i++) { + if (Buffer.isBuffer(arr[i])) { + return true; + } + } + return false; } - public static request(req: string, rpcId: string, method: string, args: any[]): string { - return `{"type":${MessageType.Request},"id":"${req}","proxyId":"${rpcId}","method":"${method}","args":${JSON.stringify(args)}}`; + public static serializeRequest(req: number, rpcId: string, method: string, args: any[]): Buffer { + if (this._arrayContainsBuffer(args)) { + let massagedArgs: (string | Buffer)[] = new Array(args.length); + let argsLengths: number[] = new Array(args.length); + for (let i = 0, len = args.length; i < len; i++) { + const arg = args[i]; + if (Buffer.isBuffer(arg)) { + massagedArgs[i] = arg; + argsLengths[i] = arg.byteLength; + } else { + massagedArgs[i] = JSON.stringify(arg); + argsLengths[i] = Buffer.byteLength(massagedArgs[i], 'utf8'); + } + } + return this._requestMixedArgs(req, rpcId, method, massagedArgs, argsLengths); + } + return this._requestJSONArgs(req, rpcId, method, JSON.stringify(args)); } - public static replyOK(req: string, res: any): string { + private static _requestJSONArgs(req: number, rpcId: string, method: string, args: string): Buffer { + const rpcIdByteLength = Buffer.byteLength(rpcId, 'utf8'); + const methodByteLength = Buffer.byteLength(method, 'utf8'); + const argsByteLength = Buffer.byteLength(args, 'utf8'); + + let len = 0; + len += MessageBuffer.sizeShortString(rpcId, rpcIdByteLength); + len += MessageBuffer.sizeShortString(method, methodByteLength); + len += MessageBuffer.sizeLongString(args, argsByteLength); + + let result = MessageBuffer.alloc(MessageType.RequestJSONArgs, req, len); + result.writeShortString(rpcId, rpcIdByteLength); + result.writeShortString(method, methodByteLength); + result.writeLongString(args, argsByteLength); + return result.buffer; + } + + public static deserializeRequestJSONArgs(buff: MessageBuffer): { rpcId: string; method: string; args: any[]; } { + const rpcId = buff.readShortString(); + const method = buff.readShortString(); + const args = buff.readLongString(); + return { + rpcId: rpcId, + method: method, + args: JSON.parse(args) + }; + } + + private static _requestMixedArgs(req: number, rpcId: string, method: string, args: (string | Buffer)[], argsLengths: number[]): Buffer { + const rpcIdByteLength = Buffer.byteLength(rpcId, 'utf8'); + const methodByteLength = Buffer.byteLength(method, 'utf8'); + + let len = 0; + len += MessageBuffer.sizeShortString(rpcId, rpcIdByteLength); + len += MessageBuffer.sizeShortString(method, methodByteLength); + len += MessageBuffer.sizeMixedArray(args, argsLengths); + + let result = MessageBuffer.alloc(MessageType.RequestMixedArgs, req, len); + result.writeShortString(rpcId, rpcIdByteLength); + result.writeShortString(method, methodByteLength); + result.writeMixedArray(args, argsLengths); + return result.buffer; + } + + public static deserializeRequestMixedArgs(buff: MessageBuffer): { rpcId: string; method: string; args: any[]; } { + const rpcId = buff.readShortString(); + const method = buff.readShortString(); + const rawargs = buff.readMixedArray(); + const args: any[] = new Array(rawargs.length); + for (let i = 0, len = rawargs.length; i < len; i++) { + const rawarg = rawargs[i]; + if (typeof rawarg === 'string') { + args[i] = JSON.parse(rawarg); + } else { + args[i] = rawarg; + } + } + return { + rpcId: rpcId, + method: method, + args: args + }; + } + + public static serializeCancel(req: number): Buffer { + return MessageBuffer.alloc(MessageType.Cancel, req, 0).buffer; + } + + public static serializeReplyOK(req: number, res: any): Buffer { if (typeof res === 'undefined') { - return `{"type":${MessageType.Reply},"id":"${req}"}`; + return this._serializeReplyOKEmpty(req); + } + if (Buffer.isBuffer(res)) { + return this._serializeReplyOKBuffer(req, res); } - return `{"type":${MessageType.Reply},"id":"${req}","res":${JSON.stringify(res)}}`; + return this._serializeReplyOKJSON(req, JSON.stringify(res)); + } + + private static _serializeReplyOKEmpty(req: number): Buffer { + return MessageBuffer.alloc(MessageType.ReplyOKEmpty, req, 0).buffer; + } + + private static _serializeReplyOKBuffer(req: number, res: Buffer): Buffer { + const resByteLength = res.byteLength; + + let len = 0; + len += MessageBuffer.sizeBuffer(res, resByteLength); + + let result = MessageBuffer.alloc(MessageType.ReplyOKBuffer, req, len); + result.writeBuffer(res, resByteLength); + return result.buffer; + } + + public static deserializeReplyOKBuffer(buff: MessageBuffer): Buffer { + return buff.readBuffer(); + } + + private static _serializeReplyOKJSON(req: number, res: string): Buffer { + const resByteLength = Buffer.byteLength(res, 'utf8'); + + let len = 0; + len += MessageBuffer.sizeLongString(res, resByteLength); + + let result = MessageBuffer.alloc(MessageType.ReplyOKJSON, req, len); + result.writeLongString(res, resByteLength); + return result.buffer; } - public static replyErr(req: string, err: any): string { + public static deserializeReplyOKJSON(buff: MessageBuffer): any { + const res = buff.readLongString(); + return JSON.parse(res); + } + + public static serializeReplyErr(req: number, err: any): Buffer { if (err instanceof Error) { - return `{"type":${MessageType.ReplyErr},"id":"${req}","err":${JSON.stringify(errors.transformErrorForSerialization(err))}}`; + return this._serializeReplyErrEror(req, err); } - return `{"type":${MessageType.ReplyErr},"id":"${req}","err":null}`; + return this._serializeReplyErrEmpty(req); + } + + private static _serializeReplyErrEror(req: number, _err: Error): Buffer { + const err = JSON.stringify(errors.transformErrorForSerialization(_err)); + const errByteLength = Buffer.byteLength(err, 'utf8'); + + let len = 0; + len += MessageBuffer.sizeLongString(err, errByteLength); + + let result = MessageBuffer.alloc(MessageType.ReplyErrError, req, len); + result.writeLongString(err, errByteLength); + return result.buffer; + } + + public static deserializeReplyErrError(buff: MessageBuffer): Error { + const err = buff.readLongString(); + return JSON.parse(err); + } + + private static _serializeReplyErrEmpty(req: number): Buffer { + return MessageBuffer.alloc(MessageType.ReplyErrEmpty, req, 0).buffer; } } const enum MessageType { - Request = 1, - Cancel = 2, - Reply = 3, - ReplyErr = 4 + RequestJSONArgs = 1, + RequestMixedArgs = 2, + Cancel = 3, + ReplyOKEmpty = 4, + ReplyOKBuffer = 5, + ReplyOKJSON = 6, + ReplyErrError = 7, + ReplyErrEmpty = 8, } -class RequestMessage { - type: MessageType.Request; - id: string; - proxyId: string; - method: string; - args: any[]; -} -class CancelMessage { - type: MessageType.Cancel; - id: string; +const enum ArgType { + String = 1, + Buffer = 2 } -class ReplyMessage { - type: MessageType.Reply; - id: string; - res: any; -} -class ReplyErrMessage { - type: MessageType.ReplyErr; - id: string; - err: errors.SerializedError; -} - -type RPCMessage = RequestMessage | CancelMessage | ReplyMessage | ReplyErrMessage; diff --git a/src/vs/workbench/services/extensions/test/node/rpcProtocol.test.ts b/src/vs/workbench/services/extensions/test/node/rpcProtocol.test.ts new file mode 100644 index 0000000000000000000000000000000000000000..602a7d37d625c7f06e0a57aed0a87a12efeb7d19 --- /dev/null +++ b/src/vs/workbench/services/extensions/test/node/rpcProtocol.test.ts @@ -0,0 +1,144 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) Microsoft Corporation. All rights reserved. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +'use strict'; + +import * as assert from 'assert'; +import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol'; +import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc'; +import { Event, Emitter } from 'vs/base/common/event'; +import { ProxyIdentifier } from 'vs/workbench/services/extensions/node/proxyIdentifier'; +import { TPromise } from 'vs/base/common/winjs.base'; + +suite('RPCProtocol', () => { + + class MessagePassingProtocol implements IMessagePassingProtocol { + private _pair: MessagePassingProtocol; + + private readonly _onMessage: Emitter = new Emitter(); + public readonly onMessage: Event = this._onMessage.event; + + public setPair(other: MessagePassingProtocol) { + this._pair = other; + } + + public send(buffer: Buffer): void { + process.nextTick(() => { + this._pair._onMessage.fire(buffer); + }); + } + } + + let delegate: (a1: any, a2: any) => any; + let bProxy: BClass; + class BClass { + $m(a1: any, a2: any): TPromise { + return TPromise.as(delegate.call(null, a1, a2)); + } + } + + setup(() => { + let a_protocol = new MessagePassingProtocol(); + let b_protocol = new MessagePassingProtocol(); + a_protocol.setPair(b_protocol); + b_protocol.setPair(a_protocol); + + let A = new RPCProtocol(a_protocol); + let B = new RPCProtocol(b_protocol); + + delegate = null; + + const bIdentifier = new ProxyIdentifier(false, 'bb'); + const bInstance = new BClass(); + B.set(bIdentifier, bInstance); + bProxy = A.getProxy(bIdentifier); + }); + + test('simple call', function (done) { + delegate = (a1: number, a2: number) => a1 + a2; + bProxy.$m(4, 1).then((res: number) => { + assert.equal(res, 5); + done(null); + }, done); + }); + + test('simple call without result', function (done) { + delegate = (a1: number, a2: number) => { }; + bProxy.$m(4, 1).then((res: number) => { + assert.equal(res, undefined); + done(null); + }, done); + }); + + test('passing buffer as argument', function (done) { + delegate = (a1: Buffer, a2: number) => { + assert.ok(Buffer.isBuffer(a1)); + return a1[a2]; + }; + let b = Buffer.allocUnsafe(4); + b[0] = 1; + b[1] = 2; + b[2] = 3; + b[3] = 4; + bProxy.$m(b, 2).then((res: number) => { + assert.equal(res, 3); + done(null); + }, done); + }); + + test('returning a buffer', function (done) { + delegate = (a1: number, a2: number) => { + let b = Buffer.allocUnsafe(4); + b[0] = 1; + b[1] = 2; + b[2] = 3; + b[3] = 4; + return b; + }; + bProxy.$m(4, 1).then((res: Buffer) => { + assert.ok(Buffer.isBuffer(res)); + assert.equal(res[0], 1); + assert.equal(res[1], 2); + assert.equal(res[2], 3); + assert.equal(res[3], 4); + done(null); + }, done); + }); + + test('cancelling a call', function () { + delegate = (a1: number, a2: number) => a1 + a2; + let p = bProxy.$m(4, 1); + p.then((res: number) => { + assert.fail('should not receive result'); + }); + p.cancel(); + }); + + test('throwing an error', function (done) { + delegate = (a1: number, a2: number) => { + throw new Error(`nope`); + }; + bProxy.$m(4, 1).then((res) => { + assert.fail('unexpected'); + done(null); + }, (err) => { + assert.equal(err.message, 'nope'); + done(null); + }); + }); + + test('error promise', function (done) { + delegate = (a1: number, a2: number) => { + return TPromise.wrapError(undefined); + }; + bProxy.$m(4, 1).then((res) => { + assert.fail('unexpected'); + done(null); + }, (err) => { + assert.equal(err, undefined); + done(null); + }); + }); +});