提交 744e89f0 编写于 作者: A Alex Dima

Fixes #57820: Switch to using native Promises

上级 6272a7fe
......@@ -41,6 +41,7 @@ import { isMessageOfType, MessageType, createMessageOfType } from 'vs/workbench/
import { ILabelService } from 'vs/platform/label/common/label';
import { URI } from 'vs/base/common/uri';
import { Schemas } from 'vs/base/common/network';
import { onUnexpectedError } from 'vs/base/common/errors';
export interface IExtensionHostStarter {
readonly onCrashed: Event<[number, string]>;
......@@ -316,9 +317,9 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
});
}
private _tryExtHostHandshake(): TPromise<IMessagePassingProtocol> {
private _tryExtHostHandshake(): Promise<IMessagePassingProtocol> {
return new TPromise<IMessagePassingProtocol>((resolve, reject) => {
return new Promise<IMessagePassingProtocol>((resolve, reject) => {
// Wait for the extension host to connect to our named pipe
// and wrap the socket in the message passing protocol
......@@ -340,7 +341,7 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
// 1) wait for the incoming `ready` event and send the initialization data.
// 2) wait for the incoming `initialized` event.
return new TPromise<IMessagePassingProtocol>((resolve, reject) => {
return new Promise<IMessagePassingProtocol>((resolve, reject) => {
let handle = setTimeout(() => {
reject('timeout');
......@@ -363,7 +364,10 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
disposable.dispose();
// release this promise
resolve(protocol);
// using a buffered message protocol here because between now
// and the first time a `then` executes some messages might be lost
// unless we immediately register a listener for `onMessage`.
resolve(new BufferedMessagePassingProtocol(protocol));
return;
}
......@@ -530,3 +534,57 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
}
}
}
/**
* Will ensure no messages are lost from creation time until the first user of onMessage comes in.
*/
class BufferedMessagePassingProtocol implements IMessagePassingProtocol {
private readonly _actual: IMessagePassingProtocol;
private _bufferedMessagesListener: IDisposable;
private _bufferedMessages: Buffer[];
constructor(actual: IMessagePassingProtocol) {
this._actual = actual;
this._bufferedMessages = [];
this._bufferedMessagesListener = this._actual.onMessage((buff) => this._bufferedMessages.push(buff));
}
public send(buffer: Buffer): void {
this._actual.send(buffer);
}
public onMessage(listener: (e: Buffer) => any, thisArgs?: any, disposables?: IDisposable[]): IDisposable {
if (!this._bufferedMessages) {
// second caller gets nothing
return this._actual.onMessage(listener, thisArgs, disposables);
}
// prepare result
const result = this._actual.onMessage(listener, thisArgs, disposables);
// stop listening to buffered messages
this._bufferedMessagesListener.dispose();
// capture buffered messages
const bufferedMessages = this._bufferedMessages;
this._bufferedMessages = null;
// it is important to deliver these messages after this call, but before
// other messages have a chance to be received (to guarantee in order delivery)
// that's why we're using here nextTick and not other types of timeouts
process.nextTick(() => {
// deliver buffered messages
while (bufferedMessages.length > 0) {
const msg = bufferedMessages.shift();
try {
listener.call(thisArgs, msg);
} catch (e) {
onUnexpectedError(e);
}
}
});
return result;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册