提交 b900b278 编写于 作者: A Alex Dima

Introduce VSBuffer

上级 9bb443d1
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
export class VSBuffer {
public static alloc(byteLength: number): VSBuffer {
return new VSBuffer(Buffer.allocUnsafe(byteLength));
}
public static wrap(actual: Buffer): VSBuffer {
return new VSBuffer(actual);
}
public static fromString(source: string): VSBuffer {
return new VSBuffer(Buffer.from(source));
}
public static concat(buffers: VSBuffer[], totalLength?: number): VSBuffer {
if (typeof totalLength === 'undefined') {
totalLength = 0;
for (let i = 0, len = buffers.length; i < len; i++) {
totalLength += buffers[i].byteLength;
}
}
const ret = VSBuffer.alloc(totalLength);
let offset = 0;
for (let i = 0, len = buffers.length; i < len; i++) {
const element = buffers[i];
ret.set(element, offset);
offset += element.byteLength;
}
return ret;
}
private readonly _actual: Buffer;
public readonly byteLength: number;
private constructor(buffer: Buffer) {
this._actual = buffer;
this.byteLength = this._actual.byteLength;
}
public toBuffer(): Buffer {
// TODO@Alex: deprecate this usage
return this._actual;
}
public toString(): string {
return this._actual.toString();
}
public slice(start?: number, end?: number): VSBuffer {
return new VSBuffer(this._actual.slice(start, end));
}
public set(array: VSBuffer, offset?: number): void {
this._actual.set(array._actual, offset);
}
public readUint32BE(offset: number): number {
return readUint32BE(this._actual, offset);
}
public writeUint32BE(value: number, offset: number): void {
writeUint32BE(this._actual, value, offset);
}
public readUint8(offset: number): number {
return readUint8(this._actual, offset);
}
public writeUint8(value: number, offset: number): void {
writeUint8(this._actual, value, offset);
}
}
function readUint32BE(source: Uint8Array, offset: number): number {
return (
source[offset] * 2 ** 24
+ source[offset + 1] * 2 ** 16
+ source[offset + 2] * 2 ** 8
+ source[offset + 3]
);
}
function writeUint32BE(destination: Uint8Array, value: number, offset: number): void {
destination[offset + 3] = value;
value = value >>> 8;
destination[offset + 2] = value;
value = value >>> 8;
destination[offset + 1] = value;
value = value >>> 8;
destination[offset] = value;
}
function readUint8(source: Uint8Array, offset: number): number {
return source[offset];
}
function writeUint8(destination: Uint8Array, value: number, offset: number): void {
destination[offset] = value;
}
......@@ -8,13 +8,14 @@ import { IPCClient } from 'vs/base/parts/ipc/node/ipc';
import { Protocol } from 'vs/base/parts/ipc/node/ipc.electron';
import { ipcRenderer } from 'electron';
import { IDisposable } from 'vs/base/common/lifecycle';
import { VSBuffer } from 'vs/base/common/buffer';
export class Client extends IPCClient implements IDisposable {
private protocol: Protocol;
private static createProtocol(): Protocol {
const onMessage = Event.fromNodeEventEmitter<Buffer>(ipcRenderer, 'ipc:message', (_, message: Buffer) => message);
const onMessage = Event.fromNodeEventEmitter<VSBuffer>(ipcRenderer, 'ipc:message', (_, message: Buffer) => VSBuffer.wrap(message));
ipcRenderer.send('ipc:hello');
return new Protocol(ipcRenderer, onMessage);
}
......
......@@ -8,16 +8,17 @@ import { IPCServer, ClientConnectionEvent } from 'vs/base/parts/ipc/node/ipc';
import { Protocol } from 'vs/base/parts/ipc/node/ipc.electron';
import { ipcMain } from 'electron';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { VSBuffer } from 'vs/base/common/buffer';
interface IIPCEvent {
event: { sender: Electron.WebContents; };
message: Buffer | null;
}
function createScopedOnMessageEvent(senderId: number, eventName: string): Event<Buffer | null> {
function createScopedOnMessageEvent(senderId: number, eventName: string): Event<VSBuffer | null> {
const onMessage = Event.fromNodeEventEmitter<IIPCEvent>(ipcMain, eventName, (event, message) => ({ event, message }));
const onMessageFromSender = Event.filter(onMessage, ({ event }) => event.sender.id === senderId);
return Event.map(onMessageFromSender, ({ message }) => message);
return Event.map(onMessageFromSender, ({ message }) => message ? VSBuffer.wrap(message) : message);
}
export class Server extends IPCServer {
......@@ -38,7 +39,7 @@ export class Server extends IPCServer {
const onDidClientReconnect = new Emitter<void>();
Server.Clients.set(id, toDisposable(() => onDidClientReconnect.fire()));
const onMessage = createScopedOnMessageEvent(id, 'ipc:message') as Event<Buffer>;
const onMessage = createScopedOnMessageEvent(id, 'ipc:message') as Event<VSBuffer>;
const onDidClientDisconnect = Event.any(Event.signal(createScopedOnMessageEvent(id, 'ipc:disconnect')), onDidClientReconnect.event);
const protocol = new Protocol(webContents, onMessage);
......
......@@ -14,6 +14,7 @@ import { isRemoteConsoleLog, log } from 'vs/base/common/console';
import { CancellationToken } from 'vs/base/common/cancellation';
import * as errors from 'vs/base/common/errors';
import { IChannel } from 'vs/base/parts/ipc/common/ipc';
import { VSBuffer } from 'vs/base/common/buffer';
/**
* This implementation doesn't perform well since it uses base64 encoding for buffers.
......@@ -26,11 +27,11 @@ export class Server<TContext extends string> extends IPCServer<TContext> {
send: r => {
try {
if (process.send) {
process.send(r.toString('base64'));
process.send(r.toBuffer().toString('base64'));
}
} catch (e) { /* not much to do */ }
},
onMessage: Event.fromNodeEventEmitter(process, 'message', msg => Buffer.from(msg, 'base64'))
onMessage: Event.fromNodeEventEmitter(process, 'message', msg => VSBuffer.wrap(Buffer.from(msg, 'base64')))
}, ctx);
process.once('disconnect', () => this.dispose());
......@@ -199,7 +200,7 @@ export class Client implements IChannelClient, IDisposable {
this.child = fork(this.modulePath, args, forkOpts);
const onMessageEmitter = new Emitter<Buffer>();
const onMessageEmitter = new Emitter<VSBuffer>();
const onRawMessage = Event.fromNodeEventEmitter(this.child, 'message', msg => msg);
onRawMessage(msg => {
......@@ -211,11 +212,11 @@ export class Client implements IChannelClient, IDisposable {
}
// Anything else goes to the outside
onMessageEmitter.fire(Buffer.from(msg, 'base64'));
onMessageEmitter.fire(VSBuffer.wrap(Buffer.from(msg, 'base64')));
});
const sender = this.options.useQueue ? createQueuedSender(this.child) : this.child;
const send = (r: Buffer) => this.child && this.child.connected && sender.send(r.toString('base64'));
const send = (r: VSBuffer) => this.child && this.child.connected && sender.send(r.toBuffer().toString('base64'));
const onMessage = onMessageEmitter.event;
const protocol = { send, onMessage };
......
......@@ -5,6 +5,7 @@
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { Event } from 'vs/base/common/event';
import { VSBuffer } from 'vs/base/common/buffer';
export interface Sender {
send(channel: string, msg: Buffer | null): void;
......@@ -12,11 +13,11 @@ export interface Sender {
export class Protocol implements IMessagePassingProtocol {
constructor(private sender: Sender, readonly onMessage: Event<Buffer>) { }
constructor(private sender: Sender, readonly onMessage: Event<VSBuffer>) { }
send(message: Buffer): void {
send(message: VSBuffer): void {
try {
this.sender.send('ipc:message', message);
this.sender.send('ipc:message', message.toBuffer());
} catch (e) {
// systems are going down
}
......
......@@ -11,6 +11,26 @@ import { tmpdir } from 'os';
import * as fs from 'fs';
import { generateUuid } from 'vs/base/common/uuid';
import { IDisposable } from 'vs/base/common/lifecycle';
import { VSBuffer } from 'vs/base/common/buffer';
// export interface ISocket {
// onData: Event<VSBuffer>;
// }
// export class NodeSocket implements ISocket {
// private readonly _socket: Socket;
// constructor(socket: Socket) {
// this._socket = socket;
// }
// public onData(listener: (e: VSBuffer) => void): IDisposable {
// this._socket.on('data', listener);
// return {
// dispose: () => this._socket.off('data', listener)
// };
// }
// }
export function generateRandomPipeName(): string {
const randomSuffix = generateUuid();
......@@ -22,7 +42,7 @@ export function generateRandomPipeName(): string {
}
}
function log(fd: number, msg: string, data?: Buffer): void {
function log(fd: number, msg: string, data?: VSBuffer): void {
const date = new Date();
fs.writeSync(fd, `[${date.getHours()}:${date.getMinutes()}:${date.getSeconds()}.${date.getMilliseconds()}] ${msg}\n`);
if (data) {
......@@ -32,11 +52,17 @@ function log(fd: number, msg: string, data?: Buffer): void {
fs.fdatasyncSync(fd);
}
const EMPTY_BUFFER = Buffer.allocUnsafe(0);
let emptyBuffer: VSBuffer | null = null;
function getEmptyBuffer(): VSBuffer {
if (!emptyBuffer) {
emptyBuffer = VSBuffer.alloc(0);
}
return emptyBuffer;
}
class ChunkStream {
private _chunks: Buffer[];
private _chunks: VSBuffer[];
private _totalLength: number;
public get byteLength() {
......@@ -48,14 +74,14 @@ class ChunkStream {
this._totalLength = 0;
}
public acceptChunk(buff: Buffer) {
public acceptChunk(buff: VSBuffer) {
this._chunks.push(buff);
this._totalLength += buff.byteLength;
}
public read(byteCount: number): Buffer {
public read(byteCount: number): VSBuffer {
if (byteCount === 0) {
return EMPTY_BUFFER;
return getEmptyBuffer();
}
if (byteCount > this._totalLength) {
......@@ -77,7 +103,7 @@ class ChunkStream {
return result;
}
let result = Buffer.allocUnsafe(byteCount);
let result = VSBuffer.alloc(byteCount);
let resultOffset = 0;
while (byteCount > 0) {
const chunk = this._chunks[0];
......@@ -85,7 +111,8 @@ class ChunkStream {
// this chunk will survive
this._chunks[0] = chunk.slice(byteCount);
chunk.copy(result, resultOffset, 0, byteCount);
const chunkPart = chunk.slice(0, byteCount);
result.set(chunkPart, resultOffset);
resultOffset += byteCount;
this._totalLength -= byteCount;
byteCount -= byteCount;
......@@ -93,7 +120,7 @@ class ChunkStream {
// this chunk will be entirely read
this._chunks.shift();
chunk.copy(result, resultOffset, 0, chunk.byteLength);
result.set(chunk, resultOffset);
resultOffset += chunk.byteLength;
this._totalLength -= chunk.byteLength;
byteCount -= chunk.byteLength;
......@@ -153,7 +180,7 @@ class ProtocolMessage {
public readonly type: ProtocolMessageType,
public readonly id: number,
public readonly ack: number,
public readonly data: Buffer
public readonly data: VSBuffer
) {
this.writtenTime = 0;
}
......@@ -186,12 +213,12 @@ class ProtocolReader {
this._socket = socket;
this._isDisposed = false;
this._incomingData = new ChunkStream();
this._socketDataListener = (data: Buffer) => this.acceptChunk(data);
this._socketDataListener = (data: Buffer) => this.acceptChunk(VSBuffer.wrap(data));
this._socket.on('data', this._socketDataListener);
this.lastReadTime = Date.now();
}
public acceptChunk(data: Buffer | null): void {
public acceptChunk(data: VSBuffer | null): void {
if (!data || data.byteLength === 0) {
return;
}
......@@ -209,10 +236,10 @@ class ProtocolReader {
// save new state => next time will read the body
this._state.readHead = false;
this._state.readLen = buff.readUInt32BE(9, true);
this._state.messageType = <ProtocolMessageType>buff.readUInt8(0, true);
this._state.id = buff.readUInt32BE(1, true);
this._state.ack = buff.readUInt32BE(5, true);
this._state.readLen = buff.readUint32BE(9);
this._state.messageType = <ProtocolMessageType>buff.readUint8(0);
this._state.id = buff.readUint32BE(1);
this._state.ack = buff.readUint32BE(5);
} else {
// buff is the body
const messageType = this._state.messageType;
......@@ -236,7 +263,7 @@ class ProtocolReader {
}
}
public readEntireBuffer(): Buffer {
public readEntireBuffer(): VSBuffer {
return this._incomingData.read(this._incomingData.byteLength);
}
......@@ -251,7 +278,7 @@ class ProtocolWriter {
private _isDisposed: boolean;
private readonly _socket: Socket;
private readonly _logFile: number;
private _data: Buffer[];
private _data: VSBuffer[];
private _totalLength: number;
public lastWriteTime: number;
......@@ -285,29 +312,29 @@ class ProtocolWriter {
}
msg.writtenTime = Date.now();
this.lastWriteTime = Date.now();
const header = Buffer.allocUnsafe(ProtocolConstants.HeaderLength);
header.writeUInt8(msg.type, 0, true);
header.writeUInt32BE(msg.id, 1, true);
header.writeUInt32BE(msg.ack, 5, true);
header.writeUInt32BE(msg.data.length, 9, true);
const header = VSBuffer.alloc(ProtocolConstants.HeaderLength);
header.writeUint8(msg.type, 0);
header.writeUint32BE(msg.id, 1);
header.writeUint32BE(msg.ack, 5);
header.writeUint32BE(msg.data.byteLength, 9);
this._writeSoon(header, msg.data);
}
private _bufferAdd(head: Buffer, body: Buffer): boolean {
private _bufferAdd(head: VSBuffer, body: VSBuffer): boolean {
const wasEmpty = this._totalLength === 0;
this._data.push(head, body);
this._totalLength += head.length + body.length;
this._totalLength += head.byteLength + body.byteLength;
return wasEmpty;
}
private _bufferTake(): Buffer {
const ret = Buffer.concat(this._data, this._totalLength);
private _bufferTake(): VSBuffer {
const ret = VSBuffer.concat(this._data, this._totalLength);
this._data.length = 0;
this._totalLength = 0;
return ret;
}
private _writeSoon(header: Buffer, data: Buffer): void {
private _writeSoon(header: VSBuffer, data: VSBuffer): void {
if (this._bufferAdd(header, data)) {
setImmediate(() => {
this._writeNow();
......@@ -328,7 +355,7 @@ class ProtocolWriter {
// > https://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
// > However, the false return value is only advisory and the writable stream will unconditionally
// > accept and buffer chunk even if it has not not been allowed to drain.
this._socket.write(this._bufferTake());
this._socket.write(this._bufferTake().toBuffer());
}
}
......@@ -357,8 +384,8 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
private _socketCloseListener: () => void;
private _onMessage = new Emitter<Buffer>();
readonly onMessage: Event<Buffer> = this._onMessage.event;
private _onMessage = new Emitter<VSBuffer>();
readonly onMessage: Event<VSBuffer> = this._onMessage.event;
private _onClose = new Emitter<void>();
readonly onClose: Event<void> = this._onClose.event;
......@@ -390,7 +417,7 @@ export class Protocol implements IDisposable, IMessagePassingProtocol {
return this._socket;
}
send(buffer: Buffer): void {
send(buffer: VSBuffer): void {
this._socketWriter.write(new ProtocolMessage(ProtocolMessageType.Regular, 0, 0, buffer));
}
}
......@@ -603,11 +630,11 @@ export class PersistentProtocol {
private readonly _socketEndListener: () => void;
private readonly _socketErrorListener: (err: any) => void;
private _onControlMessage = new Emitter<Buffer>();
readonly onControlMessage: Event<Buffer> = createBufferedEvent(this._onControlMessage.event);
private _onControlMessage = new Emitter<VSBuffer>();
readonly onControlMessage: Event<VSBuffer> = createBufferedEvent(this._onControlMessage.event);
private _onMessage = new Emitter<Buffer>();
readonly onMessage: Event<Buffer> = createBufferedEvent(this._onMessage.event);
private _onMessage = new Emitter<VSBuffer>();
readonly onMessage: Event<VSBuffer> = createBufferedEvent(this._onMessage.event);
private _onClose = new Emitter<void>();
readonly onClose: Event<void> = createBufferedEvent(this._onClose.event);
......@@ -622,7 +649,7 @@ export class PersistentProtocol {
return this._outgoingMsgId - this._outgoingAckId;
}
constructor(socket: Socket, initialChunk: Buffer | null = null, logFileName: string | null = null) {
constructor(socket: Socket, initialChunk: VSBuffer | null = null, logFileName: string | null = null) {
this._logFile = 0;
this._isReconnecting = false;
if (logFileName) {
......@@ -708,7 +735,7 @@ export class PersistentProtocol {
// sufficient time has passed since last message was written,
// and no message from our side needed to be sent in the meantime,
// so we will send a message containing only a keep alive.
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, EMPTY_BUFFER);
const msg = new ProtocolMessage(ProtocolMessageType.KeepAlive, 0, 0, getEmptyBuffer());
this._socketWriter.write(msg);
this._sendKeepAliveCheck();
return;
......@@ -743,7 +770,7 @@ export class PersistentProtocol {
return this._socket;
}
public beginAcceptReconnection(socket: Socket, initialDataChunk: Buffer | null): void {
public beginAcceptReconnection(socket: Socket, initialDataChunk: VSBuffer | null): void {
this._isReconnecting = true;
this._socketWriter.dispose();
......@@ -810,7 +837,7 @@ export class PersistentProtocol {
}
}
readEntireBuffer(): Buffer {
readEntireBuffer(): VSBuffer {
return this._socketReader.readEntireBuffer();
}
......@@ -818,7 +845,7 @@ export class PersistentProtocol {
this._socketWriter.flush();
}
send(buffer: Buffer): void {
send(buffer: VSBuffer): void {
const myId = ++this._outgoingMsgId;
this._incomingAckId = this._incomingMsgId;
const msg = new ProtocolMessage(ProtocolMessageType.Regular, myId, this._incomingAckId, buffer);
......@@ -833,7 +860,7 @@ export class PersistentProtocol {
* Send a message which will not be part of the regular acknowledge flow.
* Use this for early control messages which are repeated in case of reconnection.
*/
sendControl(buffer: Buffer): void {
sendControl(buffer: VSBuffer): void {
const msg = new ProtocolMessage(ProtocolMessageType.Control, 0, 0, buffer);
this._socketWriter.write(msg);
}
......@@ -896,7 +923,7 @@ export class PersistentProtocol {
}
this._incomingAckId = this._incomingMsgId;
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, EMPTY_BUFFER);
const msg = new ProtocolMessage(ProtocolMessageType.Ack, 0, this._incomingAckId, getEmptyBuffer());
this._socketWriter.write(msg);
}
}
......@@ -9,6 +9,7 @@ import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/com
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import * as errors from 'vs/base/common/errors';
import { IServerChannel, IChannel } from 'vs/base/parts/ipc/common/ipc';
import { VSBuffer } from 'vs/base/common/buffer';
export const enum RequestType {
Promise = 100,
......@@ -43,8 +44,8 @@ interface IHandler {
}
export interface IMessagePassingProtocol {
send(buffer: Buffer): void;
onMessage: Event<Buffer>;
send(buffer: VSBuffer): void;
onMessage: Event<VSBuffer>;
}
enum State {
......@@ -99,35 +100,35 @@ export interface IRoutingChannelClient<TContext = string> {
}
interface IReader {
read(bytes: number): Buffer;
read(bytes: number): VSBuffer;
}
interface IWriter {
write(buffer: Buffer): void;
write(buffer: VSBuffer): void;
}
class BufferReader implements IReader {
private pos = 0;
constructor(private buffer: Buffer) { }
constructor(private buffer: VSBuffer) { }
read(bytes: number): Buffer {
read(bytes: number): VSBuffer {
const result = this.buffer.slice(this.pos, this.pos + bytes);
this.pos += result.length;
this.pos += result.byteLength;
return result;
}
}
class BufferWriter implements IWriter {
private buffers: Buffer[] = [];
private buffers: VSBuffer[] = [];
get buffer(): Buffer {
return Buffer.concat(this.buffers);
get buffer(): VSBuffer {
return VSBuffer.concat(this.buffers);
}
write(buffer: Buffer): void {
write(buffer: VSBuffer): void {
this.buffers.push(buffer);
}
}
......@@ -136,39 +137,52 @@ enum DataType {
Undefined = 0,
String = 1,
Buffer = 2,
Array = 3,
Object = 4
VSBuffer = 3,
Array = 4,
Object = 5
}
function createSizeBuffer(size: number): Buffer {
const result = Buffer.allocUnsafe(4);
result.writeUInt32BE(size, 0);
function createSizeBuffer(size: number): VSBuffer {
const result = VSBuffer.alloc(4);
result.writeUint32BE(size, 0);
return result;
}
function readSizeBuffer(reader: IReader): number {
return reader.read(4).readUInt32BE(0);
return reader.read(4).readUint32BE(0);
}
function createOneByteBuffer(value: number): VSBuffer {
const result = VSBuffer.alloc(1);
result.writeUint8(value, 0);
return result;
}
const BufferPresets = {
Undefined: Buffer.alloc(1, DataType.Undefined),
String: Buffer.alloc(1, DataType.String),
Buffer: Buffer.alloc(1, DataType.Buffer),
Array: Buffer.alloc(1, DataType.Array),
Object: Buffer.alloc(1, DataType.Object)
Undefined: createOneByteBuffer(DataType.Undefined),
String: createOneByteBuffer(DataType.String),
Buffer: createOneByteBuffer(DataType.Buffer),
VSBuffer: createOneByteBuffer(DataType.VSBuffer),
Array: createOneByteBuffer(DataType.Array),
Object: createOneByteBuffer(DataType.Object),
};
function serialize(writer: IWriter, data: any): void {
if (typeof data === 'undefined') {
writer.write(BufferPresets.Undefined);
} else if (typeof data === 'string') {
const buffer = Buffer.from(data);
const buffer = VSBuffer.fromString(data);
writer.write(BufferPresets.String);
writer.write(createSizeBuffer(buffer.length));
writer.write(createSizeBuffer(buffer.byteLength));
writer.write(buffer);
} else if (Buffer.isBuffer(data)) {
const buffer = VSBuffer.wrap(data);
writer.write(BufferPresets.Buffer);
writer.write(createSizeBuffer(data.length));
writer.write(createSizeBuffer(buffer.byteLength));
writer.write(buffer);
} else if (data instanceof VSBuffer) {
writer.write(BufferPresets.VSBuffer);
writer.write(createSizeBuffer(data.byteLength));
writer.write(data);
} else if (Array.isArray(data)) {
writer.write(BufferPresets.Array);
......@@ -178,20 +192,21 @@ function serialize(writer: IWriter, data: any): void {
serialize(writer, el);
}
} else {
const buffer = Buffer.from(JSON.stringify(data));
const buffer = VSBuffer.fromString(JSON.stringify(data));
writer.write(BufferPresets.Object);
writer.write(createSizeBuffer(buffer.length));
writer.write(createSizeBuffer(buffer.byteLength));
writer.write(buffer);
}
}
function deserialize(reader: IReader): any {
const type = reader.read(1).readUInt8(0);
const type = reader.read(1).readUint8(0);
switch (type) {
case DataType.Undefined: return undefined;
case DataType.String: return reader.read(readSizeBuffer(reader)).toString();
case DataType.Buffer: return reader.read(readSizeBuffer(reader));
case DataType.Buffer: return reader.read(readSizeBuffer(reader)).toBuffer();
case DataType.VSBuffer: return reader.read(readSizeBuffer(reader));
case DataType.Array: {
const length = readSizeBuffer(reader);
const result: any[] = [];
......@@ -241,7 +256,7 @@ export class ChannelServer<TContext = string> implements IChannelServer<TContext
this.sendBuffer(writer.buffer);
}
private sendBuffer(message: Buffer): void {
private sendBuffer(message: VSBuffer): void {
try {
this.protocol.send(message);
} catch (err) {
......@@ -249,7 +264,7 @@ export class ChannelServer<TContext = string> implements IChannelServer<TContext
}
}
private onRawMessage(message: Buffer): void {
private onRawMessage(message: VSBuffer): void {
const reader = new BufferReader(message);
const header = deserialize(reader);
const body = deserialize(reader);
......@@ -483,7 +498,7 @@ export class ChannelClient implements IChannelClient, IDisposable {
this.sendBuffer(writer.buffer);
}
private sendBuffer(message: Buffer): void {
private sendBuffer(message: VSBuffer): void {
try {
this.protocol.send(message);
} catch (err) {
......@@ -491,7 +506,7 @@ export class ChannelClient implements IChannelClient, IDisposable {
}
}
private onBuffer(message: Buffer): void {
private onBuffer(message: VSBuffer): void {
const reader = new BufferReader(message);
const header = deserialize(reader);
const body = deserialize(reader);
......
......@@ -7,11 +7,12 @@ import * as assert from 'assert';
import { Socket } from 'net';
import { EventEmitter } from 'events';
import { Protocol, PersistentProtocol } from 'vs/base/parts/ipc/node/ipc.net';
import { VSBuffer } from 'vs/base/common/buffer';
class MessageStream {
private _currentComplete: ((data: Buffer) => void) | null;
private _messages: Buffer[];
private _currentComplete: ((data: VSBuffer) => void) | null;
private _messages: VSBuffer[];
constructor(x: Protocol | PersistentProtocol) {
this._currentComplete = null;
......@@ -36,8 +37,8 @@ class MessageStream {
complete(msg);
}
public waitForOne(): Promise<Buffer> {
return new Promise<Buffer>((complete) => {
public waitForOne(): Promise<VSBuffer> {
return new Promise<VSBuffer>((complete) => {
this._currentComplete = complete;
this._trigger();
});
......@@ -53,6 +54,9 @@ class EtherStream extends EventEmitter {
}
write(data: Buffer, cb?: Function): boolean {
if (!Buffer.isBuffer(data)) {
throw new Error(`Invalid data`);
}
this._ether.write(this._name, data);
return true;
}
......@@ -126,15 +130,15 @@ suite('IPC, Socket Protocol', () => {
const b = new Protocol(ether.b);
const bMessages = new MessageStream(b);
a.send(Buffer.from('foobarfarboo'));
a.send(VSBuffer.fromString('foobarfarboo'));
const msg1 = await bMessages.waitForOne();
assert.equal(msg1.toString(), 'foobarfarboo');
const buffer = Buffer.allocUnsafe(1);
buffer.writeInt8(123, 0);
const buffer = VSBuffer.alloc(1);
buffer.writeUint8(123, 0);
a.send(buffer);
const msg2 = await bMessages.waitForOne();
assert.equal(msg2.readInt8(0), 123);
assert.equal(msg2.readUint8(0), 123);
});
......@@ -151,7 +155,7 @@ suite('IPC, Socket Protocol', () => {
data: 'Hello World'.split('')
};
a.send(Buffer.from(JSON.stringify(data)));
a.send(VSBuffer.fromString(JSON.stringify(data)));
const msg = await bMessages.waitForOne();
assert.deepEqual(JSON.parse(msg.toString()), data);
});
......@@ -171,15 +175,15 @@ suite('PersistentProtocol reconnection', () => {
const b = new PersistentProtocol(ether.b);
const bMessages = new MessageStream(b);
a.send(Buffer.from('a1'));
a.send(VSBuffer.fromString('a1'));
assert.equal(a.unacknowledgedCount, 1);
assert.equal(b.unacknowledgedCount, 0);
a.send(Buffer.from('a2'));
a.send(VSBuffer.fromString('a2'));
assert.equal(a.unacknowledgedCount, 2);
assert.equal(b.unacknowledgedCount, 0);
a.send(Buffer.from('a3'));
a.send(VSBuffer.fromString('a3'));
assert.equal(a.unacknowledgedCount, 3);
assert.equal(b.unacknowledgedCount, 0);
......@@ -198,7 +202,7 @@ suite('PersistentProtocol reconnection', () => {
assert.equal(a.unacknowledgedCount, 3);
assert.equal(b.unacknowledgedCount, 0);
b.send(Buffer.from('b1'));
b.send(VSBuffer.fromString('b1'));
assert.equal(a.unacknowledgedCount, 3);
assert.equal(b.unacknowledgedCount, 1);
......@@ -207,7 +211,7 @@ suite('PersistentProtocol reconnection', () => {
assert.equal(a.unacknowledgedCount, 0);
assert.equal(b.unacknowledgedCount, 1);
a.send(Buffer.from('a4'));
a.send(VSBuffer.fromString('a4'));
assert.equal(a.unacknowledgedCount, 1);
assert.equal(b.unacknowledgedCount, 1);
......
......@@ -10,13 +10,14 @@ import { Emitter, Event } from 'vs/base/common/event';
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import { canceled } from 'vs/base/common/errors';
import { timeout } from 'vs/base/common/async';
import { VSBuffer } from 'vs/base/common/buffer';
class QueueProtocol implements IMessagePassingProtocol {
private buffering = true;
private buffers: Buffer[] = [];
private buffers: VSBuffer[] = [];
private _onMessage = new Emitter<Buffer>({
private _onMessage = new Emitter<VSBuffer>({
onFirstListenerDidAdd: () => {
for (const buffer of this.buffers) {
this._onMessage.fire(buffer);
......@@ -33,11 +34,11 @@ class QueueProtocol implements IMessagePassingProtocol {
readonly onMessage = this._onMessage.event;
other: QueueProtocol;
send(buffer: Buffer): void {
send(buffer: VSBuffer): void {
this.other.receive(buffer);
}
protected receive(buffer: Buffer): void {
protected receive(buffer: VSBuffer): void {
if (this.buffering) {
this.buffers.push(buffer);
} else {
......@@ -196,10 +197,10 @@ suite('Base IPC', function () {
test('createProtocolPair', async function () {
const [clientProtocol, serverProtocol] = createProtocolPair();
const b1 = Buffer.alloc(0);
const b1 = VSBuffer.alloc(0);
clientProtocol.send(b1);
const b3 = Buffer.alloc(0);
const b3 = VSBuffer.alloc(0);
serverProtocol.send(b3);
const b2 = await Event.toPromise(serverProtocol.onMessage);
......
......@@ -36,6 +36,7 @@ import { MessageType, createMessageOfType, isMessageOfType } from 'vs/workbench/
import { withNullAsUndefined } from 'vs/base/common/types';
import { IExtensionDescription } from 'vs/platform/extensions/common/extensions';
import { parseExtensionDevOptions } from '../common/extensionDevOptions';
import { VSBuffer } from 'vs/base/common/buffer';
export interface IExtensionHostStarter {
readonly onCrashed: Event<[number, string | null]>;
......@@ -372,7 +373,7 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
// Wait 60s for the initialized message
installTimeoutCheck();
protocol.send(Buffer.from(JSON.stringify(data)));
protocol.send(VSBuffer.fromString(JSON.stringify(data)));
});
return;
}
......
......@@ -13,6 +13,7 @@ import product from 'vs/platform/product/node/product';
import { IInitData } from 'vs/workbench/api/common/extHost.protocol';
import { MessageType, createMessageOfType, isMessageOfType, IExtHostSocketMessage, IExtHostReadyMessage } from 'vs/workbench/services/extensions/node/extensionHostProtocol';
import { exit, ExtensionHostMain } from 'vs/workbench/services/extensions/node/extensionHostMain';
import { VSBuffer } from 'vs/base/common/buffer';
// With Electron 2.x and node.js 8.x the "natives" module
// can cause a native crash (see https://github.com/nodejs/node/issues/19891 and
......@@ -58,7 +59,7 @@ function _createExtHostProtocol(): Promise<IMessagePassingProtocol> {
process.on('message', (msg: IExtHostSocketMessage, handle: net.Socket) => {
if (msg && msg.type === 'VSCODE_EXTHOST_IPC_SOCKET') {
const initialDataChunk = Buffer.from(msg.initialDataChunk, 'base64');
const initialDataChunk = VSBuffer.wrap(Buffer.from(msg.initialDataChunk, 'base64'));
if (protocol) {
// reconnection case
if (disconnectWaitTimer) {
......
......@@ -3,6 +3,8 @@
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
import { VSBuffer } from 'vs/base/common/buffer';
export interface IExtHostReadyMessage {
type: 'VSCODE_EXTHOST_IPC_READY';
}
......@@ -18,24 +20,24 @@ export const enum MessageType {
Terminate
}
export function createMessageOfType(type: MessageType): Buffer {
const result = Buffer.allocUnsafe(1);
export function createMessageOfType(type: MessageType): VSBuffer {
const result = VSBuffer.alloc(1);
switch (type) {
case MessageType.Initialized: result.writeUInt8(1, 0); break;
case MessageType.Ready: result.writeUInt8(2, 0); break;
case MessageType.Terminate: result.writeUInt8(3, 0); break;
case MessageType.Initialized: result.writeUint8(1, 0); break;
case MessageType.Ready: result.writeUint8(2, 0); break;
case MessageType.Terminate: result.writeUint8(3, 0); break;
}
return result;
}
export function isMessageOfType(message: Buffer, type: MessageType): boolean {
if (message.length !== 1) {
export function isMessageOfType(message: VSBuffer, type: MessageType): boolean {
if (message.byteLength !== 1) {
return false;
}
switch (message.readUInt8(0)) {
switch (message.readUint8(0)) {
case 1: return type === MessageType.Initialized;
case 2: return type === MessageType.Ready;
case 3: return type === MessageType.Terminate;
......
......@@ -13,6 +13,7 @@ import { IURITransformer, transformIncomingURIs } from 'vs/base/common/uriIpc';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { LazyPromise } from 'vs/workbench/services/extensions/node/lazyPromise';
import { IRPCProtocol, ProxyIdentifier, getStringIdentifierForProxy } from 'vs/workbench/services/extensions/common/proxyIdentifier';
import { VSBuffer } from 'vs/base/common/buffer';
export interface JSONStringifyReplacer {
(key: string, value: any): any;
......@@ -205,12 +206,12 @@ export class RPCProtocol extends Disposable implements IRPCProtocol {
}
}
private _receiveOneMessage(rawmsg: Buffer): void {
private _receiveOneMessage(rawmsg: VSBuffer): void {
if (this._isDisposed) {
return;
}
const msgLength = rawmsg.length;
const msgLength = rawmsg.byteLength;
const buff = MessageBuffer.read(rawmsg, 0);
const messageType = <MessageType>buff.readUInt8();
const req = buff.readUInt32();
......@@ -262,6 +263,11 @@ export class RPCProtocol extends Disposable implements IRPCProtocol {
this._receiveReply(msgLength, req, value);
break;
}
case MessageType.ReplyOKVSBuffer: {
let value = MessageIO.deserializeReplyOKVSBuffer(buff);
this._receiveReply(msgLength, req, value);
break;
}
case MessageType.ReplyErrError: {
let err = MessageIO.deserializeReplyErrError(buff);
if (this._uriTransformer) {
......@@ -435,24 +441,24 @@ export class RPCProtocol extends Disposable implements IRPCProtocol {
class MessageBuffer {
public static alloc(type: MessageType, req: number, messageSize: number): MessageBuffer {
let result = new MessageBuffer(Buffer.allocUnsafe(messageSize + 1 /* type */ + 4 /* req */), 0);
let result = new MessageBuffer(VSBuffer.alloc(messageSize + 1 /* type */ + 4 /* req */), 0);
result.writeUInt8(type);
result.writeUInt32(req);
return result;
}
public static read(buff: Buffer, offset: number): MessageBuffer {
public static read(buff: VSBuffer, offset: number): MessageBuffer {
return new MessageBuffer(buff, offset);
}
private _buff: Buffer;
private _buff: VSBuffer;
private _offset: number;
public get buffer(): Buffer {
public get buffer(): VSBuffer {
return this._buff;
}
private constructor(buff: Buffer, offset: number) {
private constructor(buff: VSBuffer, offset: number) {
this._buff = buff;
this._offset = offset;
}
......@@ -462,108 +468,136 @@ class MessageBuffer {
}
public writeUInt8(n: number): void {
this._buff.writeUInt8(n, this._offset, true); this._offset += 1;
this._buff.writeUint8(n, this._offset); this._offset += 1;
}
public readUInt8(): number {
const n = this._buff.readUInt8(this._offset, true); this._offset += 1;
const n = this._buff.readUint8(this._offset); this._offset += 1;
return n;
}
public writeUInt32(n: number): void {
this._buff.writeUInt32BE(n, this._offset, true); this._offset += 4;
this._buff.writeUint32BE(n, this._offset); this._offset += 4;
}
public readUInt32(): number {
const n = this._buff.readUInt32BE(this._offset, true); this._offset += 4;
const n = this._buff.readUint32BE(this._offset); this._offset += 4;
return n;
}
public static sizeShortString(str: string, strByteLength: number): number {
return 1 /* string length */ + strByteLength /* actual string */;
public static sizeShortString(str: VSBuffer): number {
return 1 /* string length */ + str.byteLength /* actual string */;
}
public writeShortString(str: string, strByteLength: number): void {
this._buff.writeUInt8(strByteLength, this._offset, true); this._offset += 1;
this._buff.write(str, this._offset, strByteLength, 'utf8'); this._offset += strByteLength;
public writeShortString(str: VSBuffer): void {
this._buff.writeUint8(str.byteLength, this._offset); this._offset += 1;
this._buff.set(str, this._offset); this._offset += str.byteLength;
}
public readShortString(): string {
const strLength = this._buff.readUInt8(this._offset, true); this._offset += 1;
const str = this._buff.toString('utf8', this._offset, this._offset + strLength); this._offset += strLength;
const strByteLength = this._buff.readUint8(this._offset); this._offset += 1;
const strBuff = this._buff.slice(this._offset, this._offset + strByteLength);
const str = strBuff.toString(); this._offset += strByteLength;
return str;
}
public static sizeLongString(str: string, strByteLength: number): number {
return 4 /* string length */ + strByteLength /* actual string */;
public static sizeLongString(str: VSBuffer): number {
return 4 /* string length */ + str.byteLength /* actual string */;
}
public writeLongString(str: string, strByteLength: number): void {
this._buff.writeUInt32LE(strByteLength, this._offset, true); this._offset += 4;
this._buff.write(str, this._offset, strByteLength, 'utf8'); this._offset += strByteLength;
public writeLongString(str: VSBuffer): void {
this._buff.writeUint32BE(str.byteLength, this._offset); this._offset += 4;
this._buff.set(str, this._offset); this._offset += str.byteLength;
}
public readLongString(): string {
const strLength = this._buff.readUInt32LE(this._offset, true); this._offset += 4;
const str = this._buff.toString('utf8', this._offset, this._offset + strLength); this._offset += strLength;
const strByteLength = this._buff.readUint32BE(this._offset); this._offset += 4;
const strBuff = this._buff.slice(this._offset, this._offset + strByteLength);
const str = strBuff.toString(); this._offset += strByteLength;
return str;
}
public static sizeBuffer(buff: Buffer, buffByteLength: number): number {
return 4 /* buffer length */ + buffByteLength /* actual buffer */;
public static sizeBuffer(buff: VSBuffer): number {
return 4 /* buffer length */ + buff.byteLength /* actual buffer */;
}
public writeBuffer(buff: Buffer, buffByteLength: number): void {
this._buff.writeUInt32LE(buffByteLength, this._offset, true); this._offset += 4;
buff.copy(this._buff, this._offset); this._offset += buffByteLength;
public writeBuffer(buff: VSBuffer): void {
this._buff.writeUint32BE(buff.byteLength, this._offset); this._offset += 4;
this._buff.set(buff, this._offset); this._offset += buff.byteLength;
}
public readBuffer(): Buffer {
const buffLength = this._buff.readUInt32LE(this._offset, true); this._offset += 4;
const buffLength = this._buff.readUint32BE(this._offset); this._offset += 4;
const buff = this._buff.slice(this._offset, this._offset + buffLength); this._offset += buffLength;
return buff.toBuffer();
}
public static sizeVSBuffer(buff: VSBuffer): number {
return 4 /* buffer length */ + buff.byteLength /* actual buffer */;
}
public writeVSBuffer(buff: VSBuffer): void {
this._buff.writeUint32BE(buff.byteLength, this._offset); this._offset += 4;
this._buff.set(buff, this._offset); this._offset += buff.byteLength;
}
public readVSBuffer(): VSBuffer {
const buffLength = this._buff.readUint32BE(this._offset); this._offset += 4;
const buff = this._buff.slice(this._offset, this._offset + buffLength); this._offset += buffLength;
return buff;
}
public static sizeMixedArray(arr: Array<string | Buffer>, arrLengths: number[]): number {
public static sizeMixedArray(arr: VSBuffer[], arrType: ArgType[]): number {
let size = 0;
size += 1; // arr length
for (let i = 0, len = arr.length; i < len; i++) {
const el = arr[i];
const elLength = arrLengths[i];
const elType = arrType[i];
size += 1; // arg type
if (typeof el === 'string') {
size += this.sizeLongString(el, elLength);
if (elType === ArgType.String) {
size += this.sizeLongString(el);
} else if (elType === ArgType.Buffer) {
size += this.sizeBuffer(el);
} else {
size += this.sizeBuffer(el, elLength);
size += this.sizeVSBuffer(el);
}
}
return size;
}
public writeMixedArray(arr: Array<string | Buffer>, arrLengths: number[]): void {
this._buff.writeUInt8(arr.length, this._offset, true); this._offset += 1;
public writeMixedArray(arr: VSBuffer[], arrType: ArgType[]): void {
this._buff.writeUint8(arr.length, this._offset); this._offset += 1;
for (let i = 0, len = arr.length; i < len; i++) {
const el = arr[i];
const elLength = arrLengths[i];
if (typeof el === 'string') {
const elType = arrType[i];
if (elType === ArgType.String) {
this.writeUInt8(ArgType.String);
this.writeLongString(el, elLength);
} else {
this.writeLongString(el);
} else if (elType === ArgType.Buffer) {
this.writeUInt8(ArgType.Buffer);
this.writeBuffer(el, elLength);
this.writeVSBuffer(el);
} else {
this.writeUInt8(ArgType.VSBuffer);
this.writeVSBuffer(el);
}
}
}
public readMixedArray(): Array<string | Buffer> {
const arrLen = this._buff.readUInt8(this._offset, true); this._offset += 1;
let arr: Array<string | Buffer> = new Array(arrLen);
public readMixedArray(): Array<string | Buffer | VSBuffer> {
const arrLen = this._buff.readUint8(this._offset); this._offset += 1;
let arr: Array<string | Buffer | VSBuffer> = new Array(arrLen);
for (let i = 0; i < arrLen; i++) {
const argType = <ArgType>this.readUInt8();
if (argType === ArgType.String) {
arr[i] = this.readLongString();
} else {
arr[i] = this.readBuffer();
switch (argType) {
case ArgType.String:
arr[i] = this.readLongString();
break;
case ArgType.Buffer:
arr[i] = this.readBuffer();
break;
case ArgType.VSBuffer:
arr[i] = this.readVSBuffer();
break;
}
}
return arr;
......@@ -577,42 +611,48 @@ class MessageIO {
if (Buffer.isBuffer(arr[i])) {
return true;
}
if (arr[i] instanceof VSBuffer) {
return true;
}
}
return false;
}
public static serializeRequest(req: number, rpcId: number, method: string, args: any[], usesCancellationToken: boolean, replacer: JSONStringifyReplacer | null): Buffer {
public static serializeRequest(req: number, rpcId: number, method: string, args: any[], usesCancellationToken: boolean, replacer: JSONStringifyReplacer | null): VSBuffer {
if (this._arrayContainsBuffer(args)) {
let massagedArgs: Array<string | Buffer> = new Array(args.length);
let argsLengths: number[] = new Array(args.length);
let massagedArgs: VSBuffer[] = [];
let massagedArgsType: ArgType[] = [];
for (let i = 0, len = args.length; i < len; i++) {
const arg = args[i];
if (Buffer.isBuffer(arg)) {
massagedArgs[i] = VSBuffer.wrap(arg);
massagedArgsType[i] = ArgType.Buffer;
} else if (arg instanceof VSBuffer) {
massagedArgs[i] = arg;
argsLengths[i] = arg.byteLength;
massagedArgsType[i] = ArgType.VSBuffer;
} else {
massagedArgs[i] = safeStringify(arg, replacer);
argsLengths[i] = Buffer.byteLength(massagedArgs[i], 'utf8');
massagedArgs[i] = VSBuffer.fromString(safeStringify(arg, replacer));
massagedArgsType[i] = ArgType.String;
}
}
return this._requestMixedArgs(req, rpcId, method, massagedArgs, argsLengths, usesCancellationToken);
return this._requestMixedArgs(req, rpcId, method, massagedArgs, massagedArgsType, usesCancellationToken);
}
return this._requestJSONArgs(req, rpcId, method, safeStringify(args, replacer), usesCancellationToken);
}
private static _requestJSONArgs(req: number, rpcId: number, method: string, args: string, usesCancellationToken: boolean): Buffer {
const methodByteLength = Buffer.byteLength(method, 'utf8');
const argsByteLength = Buffer.byteLength(args, 'utf8');
private static _requestJSONArgs(req: number, rpcId: number, method: string, args: string, usesCancellationToken: boolean): VSBuffer {
const methodBuff = VSBuffer.fromString(method);
const argsBuff = VSBuffer.fromString(args);
let len = 0;
len += MessageBuffer.sizeUInt8();
len += MessageBuffer.sizeShortString(method, methodByteLength);
len += MessageBuffer.sizeLongString(args, argsByteLength);
len += MessageBuffer.sizeShortString(methodBuff);
len += MessageBuffer.sizeLongString(argsBuff);
let result = MessageBuffer.alloc(usesCancellationToken ? MessageType.RequestJSONArgsWithCancellation : MessageType.RequestJSONArgs, req, len);
result.writeUInt8(rpcId);
result.writeShortString(method, methodByteLength);
result.writeLongString(args, argsByteLength);
result.writeShortString(methodBuff);
result.writeLongString(argsBuff);
return result.buffer;
}
......@@ -627,18 +667,18 @@ class MessageIO {
};
}
private static _requestMixedArgs(req: number, rpcId: number, method: string, args: Array<string | Buffer>, argsLengths: number[], usesCancellationToken: boolean): Buffer {
const methodByteLength = Buffer.byteLength(method, 'utf8');
private static _requestMixedArgs(req: number, rpcId: number, method: string, args: VSBuffer[], argsType: ArgType[], usesCancellationToken: boolean): VSBuffer {
const methodBuff = VSBuffer.fromString(method);
let len = 0;
len += MessageBuffer.sizeUInt8();
len += MessageBuffer.sizeShortString(method, methodByteLength);
len += MessageBuffer.sizeMixedArray(args, argsLengths);
len += MessageBuffer.sizeShortString(methodBuff);
len += MessageBuffer.sizeMixedArray(args, argsType);
let result = MessageBuffer.alloc(usesCancellationToken ? MessageType.RequestMixedArgsWithCancellation : MessageType.RequestMixedArgs, req, len);
result.writeUInt8(rpcId);
result.writeShortString(method, methodByteLength);
result.writeMixedArray(args, argsLengths);
result.writeShortString(methodBuff);
result.writeMixedArray(args, argsType);
return result.buffer;
}
......@@ -662,36 +702,49 @@ class MessageIO {
};
}
public static serializeAcknowledged(req: number): Buffer {
public static serializeAcknowledged(req: number): VSBuffer {
return MessageBuffer.alloc(MessageType.Acknowledged, req, 0).buffer;
}
public static serializeCancel(req: number): Buffer {
public static serializeCancel(req: number): VSBuffer {
return MessageBuffer.alloc(MessageType.Cancel, req, 0).buffer;
}
public static serializeReplyOK(req: number, res: any, replacer: JSONStringifyReplacer | null): Buffer {
public static serializeReplyOK(req: number, res: any, replacer: JSONStringifyReplacer | null): VSBuffer {
if (typeof res === 'undefined') {
return this._serializeReplyOKEmpty(req);
}
if (Buffer.isBuffer(res)) {
return this._serializeReplyOKBuffer(req, res);
}
if (res instanceof VSBuffer) {
return this._serializeReplyOKVSBuffer(req, res);
}
return this._serializeReplyOKJSON(req, safeStringify(res, replacer));
}
private static _serializeReplyOKEmpty(req: number): Buffer {
private static _serializeReplyOKEmpty(req: number): VSBuffer {
return MessageBuffer.alloc(MessageType.ReplyOKEmpty, req, 0).buffer;
}
private static _serializeReplyOKBuffer(req: number, res: Buffer): Buffer {
const resByteLength = res.byteLength;
private static _serializeReplyOKBuffer(req: number, res: Buffer): VSBuffer {
const buff = VSBuffer.wrap(res);
let len = 0;
len += MessageBuffer.sizeBuffer(res, resByteLength);
len += MessageBuffer.sizeBuffer(buff);
let result = MessageBuffer.alloc(MessageType.ReplyOKBuffer, req, len);
result.writeBuffer(res, resByteLength);
result.writeBuffer(buff);
return result.buffer;
}
private static _serializeReplyOKVSBuffer(req: number, res: VSBuffer): VSBuffer {
let len = 0;
len += MessageBuffer.sizeVSBuffer(res);
let result = MessageBuffer.alloc(MessageType.ReplyOKVSBuffer, req, len);
result.writeVSBuffer(res);
return result.buffer;
}
......@@ -699,14 +752,18 @@ class MessageIO {
return buff.readBuffer();
}
private static _serializeReplyOKJSON(req: number, res: string): Buffer {
const resByteLength = Buffer.byteLength(res, 'utf8');
public static deserializeReplyOKVSBuffer(buff: MessageBuffer): VSBuffer {
return buff.readVSBuffer();
}
private static _serializeReplyOKJSON(req: number, res: string): VSBuffer {
const resBuff = VSBuffer.fromString(res);
let len = 0;
len += MessageBuffer.sizeLongString(res, resByteLength);
len += MessageBuffer.sizeLongString(resBuff);
let result = MessageBuffer.alloc(MessageType.ReplyOKJSON, req, len);
result.writeLongString(res, resByteLength);
result.writeLongString(resBuff);
return result.buffer;
}
......@@ -715,22 +772,21 @@ class MessageIO {
return JSON.parse(res);
}
public static serializeReplyErr(req: number, err: any): Buffer {
public static serializeReplyErr(req: number, err: any): VSBuffer {
if (err instanceof Error) {
return this._serializeReplyErrEror(req, err);
}
return this._serializeReplyErrEmpty(req);
}
private static _serializeReplyErrEror(req: number, _err: Error): Buffer {
const err = safeStringify(errors.transformErrorForSerialization(_err), null);
const errByteLength = Buffer.byteLength(err, 'utf8');
private static _serializeReplyErrEror(req: number, _err: Error): VSBuffer {
const errBuff = VSBuffer.fromString(safeStringify(errors.transformErrorForSerialization(_err), null));
let len = 0;
len += MessageBuffer.sizeLongString(err, errByteLength);
len += MessageBuffer.sizeLongString(errBuff);
let result = MessageBuffer.alloc(MessageType.ReplyErrError, req, len);
result.writeLongString(err, errByteLength);
result.writeLongString(errBuff);
return result.buffer;
}
......@@ -739,7 +795,7 @@ class MessageIO {
return JSON.parse(err);
}
private static _serializeReplyErrEmpty(req: number): Buffer {
private static _serializeReplyErrEmpty(req: number): VSBuffer {
return MessageBuffer.alloc(MessageType.ReplyErrEmpty, req, 0).buffer;
}
}
......@@ -753,6 +809,7 @@ const enum MessageType {
Cancel = 6,
ReplyOKEmpty = 7,
ReplyOKBuffer = 8,
ReplyOKVSBuffer = 8,
ReplyOKJSON = 9,
ReplyErrError = 10,
ReplyErrEmpty = 11,
......@@ -760,5 +817,6 @@ const enum MessageType {
const enum ArgType {
String = 1,
Buffer = 2
Buffer = 2,
VSBuffer = 3
}
......@@ -9,20 +9,21 @@ import { Emitter, Event } from 'vs/base/common/event';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { ProxyIdentifier } from 'vs/workbench/services/extensions/common/proxyIdentifier';
import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol';
import { VSBuffer } from 'vs/base/common/buffer';
suite('RPCProtocol', () => {
class MessagePassingProtocol implements IMessagePassingProtocol {
private _pair: MessagePassingProtocol;
private readonly _onMessage = new Emitter<Buffer>();
public readonly onMessage: Event<Buffer> = this._onMessage.event;
private readonly _onMessage = new Emitter<VSBuffer>();
public readonly onMessage: Event<VSBuffer> = this._onMessage.event;
public setPair(other: MessagePassingProtocol) {
this._pair = other;
}
public send(buffer: Buffer): void {
public send(buffer: VSBuffer): void {
process.nextTick(() => {
this._pair._onMessage.fire(buffer);
});
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册