提交 ae266f38 编写于 作者: A Alex Dima

Allow the Protocol to be disposed and extract buffered data

上级 1e82a6c1
......@@ -12,6 +12,8 @@ import { IMessagePassingProtocol, ClientConnectionEvent, IPCServer, IPCClient }
import { join } from 'path';
import { tmpdir } from 'os';
import { generateUuid } from 'vs/base/common/uuid';
import { IDisposable } from 'vs/base/common/lifecycle';
import { TimeoutTimer } from 'vs/base/common/async';
export function generateRandomPipeName(): string {
const randomSuffix = generateUuid();
......@@ -23,17 +25,24 @@ export function generateRandomPipeName(): string {
}
}
export class Protocol implements IMessagePassingProtocol {
export class Protocol implements IDisposable, IMessagePassingProtocol {
private static readonly _headerLen = 5;
private _onMessage = new Emitter<any>();
private _isDisposed: boolean;
private _chunks: Buffer[];
private _firstChunkTimer: TimeoutTimer;
private _socketDataListener: (data: Buffer) => void;
private _socketEndListener: () => void;
private _onMessage = new Emitter<any>();
readonly onMessage: Event<any> = this._onMessage.event;
constructor(private _socket: Socket, firstDataChunk?: Buffer) {
this._isDisposed = false;
this._chunks = [];
let chunks: Buffer[] = [];
let totalLength = 0;
const state = {
......@@ -44,7 +53,7 @@ export class Protocol implements IMessagePassingProtocol {
const acceptChunk = (data: Buffer) => {
chunks.push(data);
this._chunks.push(data);
totalLength += data.length;
while (totalLength > 0) {
......@@ -53,7 +62,7 @@ export class Protocol implements IMessagePassingProtocol {
// expecting header -> read 5bytes for header
// information: `bodyIsJson` and `bodyLen`
if (totalLength >= Protocol._headerLen) {
const all = Buffer.concat(chunks);
const all = Buffer.concat(this._chunks);
state.bodyIsJson = all.readInt8(0) === 1;
state.bodyLen = all.readInt32BE(1);
......@@ -61,7 +70,7 @@ export class Protocol implements IMessagePassingProtocol {
const rest = all.slice(Protocol._headerLen);
totalLength = rest.length;
chunks = [rest];
this._chunks = [rest];
} else {
break;
......@@ -73,21 +82,27 @@ export class Protocol implements IMessagePassingProtocol {
// the actual message or wait for more data
if (totalLength >= state.bodyLen) {
const all = Buffer.concat(chunks);
const all = Buffer.concat(this._chunks);
let message = all.toString('utf8', 0, state.bodyLen);
if (state.bodyIsJson) {
message = JSON.parse(message);
}
this._onMessage.fire(message);
// ensure the public getBuffer returns a valid value if invoked from the event listeners
const rest = all.slice(state.bodyLen);
totalLength = rest.length;
chunks = [rest];
this._chunks = [rest];
state.bodyIsJson = false;
state.bodyLen = -1;
state.readHead = true;
this._onMessage.fire(message);
if (this._isDisposed) {
// check if an event listener lead to our disposal
break;
}
} else {
break;
}
......@@ -103,14 +118,33 @@ export class Protocol implements IMessagePassingProtocol {
}
};
_socket.on('data', (data: Buffer) => {
// Make sure to always handle the firstDataChunk if no more `data` event comes in
this._firstChunkTimer = new TimeoutTimer();
this._firstChunkTimer.setIfNotSet(() => {
acceptFirstDataChunk();
}, 0);
this._socketDataListener = (data: Buffer) => {
acceptFirstDataChunk();
acceptChunk(data);
});
};
_socket.on('data', this._socketDataListener);
_socket.on('end', () => {
this._socketEndListener = () => {
acceptFirstDataChunk();
});
};
_socket.on('end', this._socketEndListener);
}
public dispose(): void {
this._isDisposed = true;
this._firstChunkTimer.dispose();
this._socket.removeListener('data', this._socketDataListener);
this._socket.removeListener('end', this._socketEndListener);
}
public getBuffer(): Buffer {
return Buffer.concat(this._chunks);
}
public send(message: any): void {
......
......@@ -87,4 +87,39 @@ suite('IPC, Socket Protocol', () => {
});
});
});
test('can devolve to a socket and evolve again without losing data', () => {
let resolve: (v: void) => void;
let result = new TPromise<void>((_resolve, _reject) => {
resolve = _resolve;
});
const sender = new Protocol(stream);
const receiver1 = new Protocol(stream);
assert.equal(stream.listenerCount('data'), 2);
assert.equal(stream.listenerCount('end'), 2);
receiver1.onMessage((msg) => {
assert.equal(msg.value, 1);
let buffer = receiver1.getBuffer();
receiver1.dispose();
assert.equal(stream.listenerCount('data'), 1);
assert.equal(stream.listenerCount('end'), 1);
const receiver2 = new Protocol(stream, buffer);
receiver2.onMessage((msg) => {
assert.equal(msg.value, 2);
resolve(void 0);
});
});
const msg1 = { value: 1 };
const msg2 = { value: 2 };
sender.send(msg1);
sender.send(msg2);
return result;
});
});
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册