提交 938d9c5a 编写于 作者: A Alex Dima

Introduce BufferedProtocol to never miss messages

上级 297f871e
......@@ -239,7 +239,7 @@ export class Client<TContext = string> extends IPCClient<TContext> {
get onClose(): Event<void> { return this.protocol.onClose; }
constructor(private protocol: Protocol, id: TContext) {
constructor(private protocol: Protocol | BufferedProtocol, id: TContext) {
super(protocol, id);
}
......@@ -276,3 +276,68 @@ export function connect(hook: any, clientId: string): Promise<Client> {
socket.once('error', e);
});
}
/**
* Will ensure no messages are lost if there are no event listeners.
*/
function createBufferedEvent<T>(source: Event<T>): Event<T> {
let emitter: Emitter<T>;
let hasListeners = false;
let isDeliveringMessages = false;
let bufferedMessages: T[] = [];
const deliverMessages = () => {
if (isDeliveringMessages) {
return;
}
isDeliveringMessages = true;
while (hasListeners && bufferedMessages.length > 0) {
emitter.fire(bufferedMessages.shift());
}
isDeliveringMessages = false;
};
source((e: T) => {
bufferedMessages.push(e);
deliverMessages();
});
emitter = new Emitter<T>({
onFirstListenerAdd: () => {
hasListeners = true;
// it is important to deliver these messages after this call, but before
// other messages have a chance to be received (to guarantee in order delivery)
// that's why we're using here nextTick and not other types of timeouts
process.nextTick(deliverMessages);
},
onLastListenerRemove: () => {
hasListeners = false;
}
});
return emitter.event;
}
/**
* Will ensure no messages are lost if there are no event listeners.
*/
export class BufferedProtocol implements IMessagePassingProtocol {
private readonly _actual: Protocol;
public readonly onMessage: Event<Buffer>;
public readonly onClose: Event<void>;
constructor(actual: Protocol) {
this._actual = actual;
this.onMessage = createBufferedEvent(this._actual.onMessage);
this.onClose = createBufferedEvent(this._actual.onClose);
}
public send(buffer: Buffer): void {
this._actual.send(buffer);
}
public end(): void {
this._actual.end();
}
}
......@@ -10,7 +10,6 @@ import { Server, Socket, createServer } from 'net';
import { getPathFromAmdModule } from 'vs/base/common/amd';
import { timeout } from 'vs/base/common/async';
import { toErrorMessage } from 'vs/base/common/errorMessage';
import { onUnexpectedError } from 'vs/base/common/errors';
import { Emitter, Event } from 'vs/base/common/event';
import { IDisposable, dispose, toDisposable } from 'vs/base/common/lifecycle';
import { Schemas } from 'vs/base/common/network';
......@@ -21,7 +20,7 @@ import { URI } from 'vs/base/common/uri';
import { IRemoteConsoleLog, log, parse } from 'vs/base/node/console';
import { findFreePort, randomPort } from 'vs/base/node/ports';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { Protocol, generateRandomPipeName } from 'vs/base/parts/ipc/node/ipc.net';
import { Protocol, generateRandomPipeName, BufferedProtocol } from 'vs/base/parts/ipc/node/ipc.net';
import { IBroadcast, IBroadcastService } from 'vs/platform/broadcast/electron-browser/broadcastService';
import { getScopes } from 'vs/platform/configuration/common/configurationRegistry';
import { IEnvironmentService } from 'vs/platform/environment/common/environment';
......@@ -351,7 +350,11 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
this._namedPipeServer.close();
this._namedPipeServer = null;
this._extensionHostConnection = socket;
resolve(new Protocol(this._extensionHostConnection));
// using a buffered message protocol here because between now
// and the first time a `then` executes some messages might be lost
// unless we immediately register a listener for `onMessage`.
resolve(new BufferedProtocol(new Protocol(this._extensionHostConnection)));
});
}).then((protocol) => {
......@@ -397,10 +400,7 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
disposable.dispose();
// release this promise
// using a buffered message protocol here because between now
// and the first time a `then` executes some messages might be lost
// unless we immediately register a listener for `onMessage`.
resolve(new BufferedMessagePassingProtocol(protocol));
resolve(protocol);
return;
}
......@@ -569,57 +569,3 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
}
}
}
/**
* Will ensure no messages are lost from creation time until the first user of onMessage comes in.
*/
class BufferedMessagePassingProtocol implements IMessagePassingProtocol {
private readonly _actual: IMessagePassingProtocol;
private _bufferedMessagesListener: IDisposable;
private _bufferedMessages: Buffer[];
constructor(actual: IMessagePassingProtocol) {
this._actual = actual;
this._bufferedMessages = [];
this._bufferedMessagesListener = this._actual.onMessage((buff) => this._bufferedMessages.push(buff));
}
public send(buffer: Buffer): void {
this._actual.send(buffer);
}
public onMessage(listener: (e: Buffer) => any, thisArgs?: any, disposables?: IDisposable[]): IDisposable {
if (!this._bufferedMessages) {
// second caller gets nothing
return this._actual.onMessage(listener, thisArgs, disposables);
}
// prepare result
const result = this._actual.onMessage(listener, thisArgs, disposables);
// stop listening to buffered messages
this._bufferedMessagesListener.dispose();
// capture buffered messages
const bufferedMessages = this._bufferedMessages;
this._bufferedMessages = null;
// it is important to deliver these messages after this call, but before
// other messages have a chance to be received (to guarantee in order delivery)
// that's why we're using here nextTick and not other types of timeouts
process.nextTick(() => {
// deliver buffered messages
while (bufferedMessages.length > 0) {
const msg = bufferedMessages.shift();
try {
listener.call(thisArgs, msg);
} catch (e) {
onUnexpectedError(e);
}
}
});
return result;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册