diff --git a/src/vs/base/parts/ipc/common/ipc.net.ts b/src/vs/base/parts/ipc/common/ipc.net.ts index 7fdf873004979aa88e7be132247abde3568e7ccc..5ef925cc692e6decb541214376e25926f6ebd827 100644 --- a/src/vs/base/parts/ipc/common/ipc.net.ts +++ b/src/vs/base/parts/ipc/common/ipc.net.ts @@ -17,6 +17,7 @@ export interface ISocket { onEnd(listener: () => void): IDisposable; write(buffer: VSBuffer): void; end(): void; + dispose(): void; } let emptyBuffer: VSBuffer | null = null; @@ -27,7 +28,7 @@ function getEmptyBuffer(): VSBuffer { return emptyBuffer; } -class ChunkStream { +export class ChunkStream { private _chunks: VSBuffer[]; private _totalLength: number; @@ -47,6 +48,15 @@ class ChunkStream { } public read(byteCount: number): VSBuffer { + return this._read(byteCount, true); + } + + public peek(byteCount: number): VSBuffer { + return this._read(byteCount, false); + } + + private _read(byteCount: number, advance: boolean): VSBuffer { + if (byteCount === 0) { return getEmptyBuffer(); } @@ -57,39 +67,53 @@ class ChunkStream { if (this._chunks[0].byteLength === byteCount) { // super fast path, precisely first chunk must be returned - const result = this._chunks.shift()!; - this._totalLength -= byteCount; + const result = this._chunks[0]; + if (advance) { + this._chunks.shift(); + this._totalLength -= byteCount; + } return result; } if (this._chunks[0].byteLength > byteCount) { // fast path, the reading is entirely within the first chunk const result = this._chunks[0].slice(0, byteCount); - this._chunks[0] = this._chunks[0].slice(byteCount); - this._totalLength -= byteCount; + if (advance) { + this._chunks[0] = this._chunks[0].slice(byteCount); + this._totalLength -= byteCount; + } return result; } let result = VSBuffer.alloc(byteCount); let resultOffset = 0; + let chunkIndex = 0; while (byteCount > 0) { - const chunk = this._chunks[0]; + const chunk = this._chunks[chunkIndex]; if (chunk.byteLength > byteCount) { // this chunk will survive - this._chunks[0] = chunk.slice(byteCount); - const chunkPart = chunk.slice(0, byteCount); result.set(chunkPart, resultOffset); resultOffset += byteCount; - this._totalLength -= byteCount; + + if (advance) { + this._chunks[chunkIndex] = chunk.slice(byteCount); + this._totalLength -= byteCount; + } + byteCount -= byteCount; } else { // this chunk will be entirely read - this._chunks.shift(); - result.set(chunk, resultOffset); resultOffset += chunk.byteLength; - this._totalLength -= chunk.byteLength; + + if (advance) { + this._chunks.shift(); + this._totalLength -= chunk.byteLength; + } else { + chunkIndex++; + } + byteCount -= chunk.byteLength; } } @@ -154,7 +178,7 @@ class ProtocolReader extends Disposable { private readonly _incomingData: ChunkStream; public lastReadTime: number; - private readonly _onMessage = new Emitter(); + private readonly _onMessage = this._register(new Emitter()); public readonly onMessage: Event = this._onMessage.event; private readonly _state = { diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index c19108d24e7d4881c551e085be7f12aab37f0a3b..2dbefa2c42acb9785dea354c0b0c1f4ec90516c9 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -20,6 +20,10 @@ export class NodeSocket implements ISocket { this.socket = socket; } + public dispose(): void { + this.socket.destroy(); + } + public onData(_listener: (e: VSBuffer) => void): IDisposable { const listener = (buff: Buffer) => _listener(VSBuffer.wrap(buff)); this.socket.on('data', listener);