提交 47d23363 编写于 作者: A Alex Dima

Lift resources to members

上级 8ddccaa9
......@@ -27,8 +27,8 @@ import { ReloadWindowAction } from 'vs/workbench/electron-browser/actions';
import { IInstantiationService } from 'vs/platform/instantiation/common/instantiation';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc';
import { generateRandomPipeName, Protocol } from 'vs/base/parts/ipc/node/ipc.net';
import { createServer, Server } from 'net';
import Event, { Emitter, debounceEvent, mapEvent, any } from 'vs/base/common/event';
import { createServer, Server, Socket } from 'net';
import { debounceEvent, mapEvent, any } from 'vs/base/common/event';
import { fromEventEmitter } from 'vs/base/node/event';
import { IInitData, IWorkspaceData } from 'vs/workbench/api/node/extHost.protocol';
import { ExtensionService } from "vs/workbench/services/extensions/electron-browser/extensionService";
......@@ -38,43 +38,23 @@ import { IBroadcastService, IBroadcast } from "vs/platform/broadcast/electron-br
import { isEqual } from "vs/base/common/paths";
import { EXTENSION_CLOSE_EXTHOST_BROADCAST_CHANNEL, EXTENSION_RELOAD_BROADCAST_CHANNEL, ILogEntry, EXTENSION_ATTACH_BROADCAST_CHANNEL, EXTENSION_LOG_BROADCAST_CHANNEL, EXTENSION_TERMINATE_BROADCAST_CHANNEL } from "vs/platform/extensions/common/extensionHost";
export class LazyMessagePassingProtol implements IMessagePassingProtocol {
private _delegate: IMessagePassingProtocol;
private _onMessage = new Emitter<any>();
private _buffer: any[] = [];
readonly onMessage: Event<any> = this._onMessage.event;
send(msg: any): void {
if (this._delegate) {
this._delegate.send(msg);
} else {
this._buffer.push(msg);
}
}
resolve(delegate: IMessagePassingProtocol): void {
this._delegate = delegate;
this._delegate.onMessage(data => this._onMessage.fire(data));
this._buffer.forEach(this._delegate.send, this._delegate);
this._buffer = null;
}
}
export class ExtensionHostProcessWorker {
private extensionHostProcess: ChildProcess;
private lastExtensionHostError: string;
private terminating: boolean;
private readonly _isExtensionDevHost: boolean;
private readonly _isExtensionDevTestFromCli: boolean;
private readonly _isExtensionDevDebug: boolean;
private readonly _isExtensionDevDebugBrk: boolean;
private readonly _isExtensionDevTestFromCli: boolean;
// State
private _lastExtensionHostError: string;
private _terminating: boolean;
readonly messagingProtocol = new LazyMessagePassingProtol();
// Resources, in order they get acquired/created:
private _namedPipeServer: Server;
private _namedPipeName: string;
private _extensionHostProcess: ChildProcess;
private _extensionHostConnection: Socket;
private _messagingProtocol: IMessagePassingProtocol;
private extensionService: ExtensionService;
......@@ -98,13 +78,26 @@ export class ExtensionHostProcessWorker {
this._isExtensionDevDebugBrk = !!this._environmentService.debugExtensionHost.break;
this._isExtensionDevTestFromCli = this._isExtensionDevHost && !!this._environmentService.extensionTestsPath && !this._environmentService.debugExtensionHost.break;
this._lastExtensionHostError = null;
this._terminating = false;
this._namedPipeServer = null;
this._namedPipeName = null;
this._extensionHostProcess = null;
this._messagingProtocol = null;
// TODO@rehost
this._lifecycleService.onWillShutdown(this._onWillShutdown, this);
// TODO@rehost
this._lifecycleService.onShutdown(reason => this.terminate());
_broadcastService.onBroadcast(b => this.onBroadcast(b));
// TODO@rehost
_broadcastService.onBroadcast(b => this._onBroadcast(b));
}
private onBroadcast(broadcast: IBroadcast): void {
private _onBroadcast(broadcast: IBroadcast): void {
// Close Ext Host Window Request
if (broadcast.channel === EXTENSION_CLOSE_EXTHOST_BROADCAST_CHANNEL && this._isExtensionDevHost) {
......@@ -125,9 +118,9 @@ export class ExtensionHostProcessWorker {
public start(extensionService: ExtensionService): TPromise<IMessagePassingProtocol> {
this.extensionService = extensionService;
return TPromise.join<any>([this._tryListenOnPipe(), this._tryFindDebugPort()]).then(data => {
const [server, hook] = <[Server, string]>data[0];
const port = <number>data[1];
return TPromise.join<any>([this._tryListenOnPipe(), this._tryFindDebugPort()]).then((data: [void, number]) => {
// The port will be 0 if there's no need to debug or if a free port was not found
const port = data[1];
const opts = {
env: objects.mixin(objects.clone(process.env), {
......@@ -135,7 +128,7 @@ export class ExtensionHostProcessWorker {
PIPE_LOGGING: 'true',
VERBOSE_LOGGING: true,
VSCODE_WINDOW_ID: String(this._windowService.getCurrentWindowId()),
VSCODE_IPC_HOOK_EXTHOST: hook,
VSCODE_IPC_HOOK_EXTHOST: this._namedPipeName,
ELECTRON_NO_ASAR: '1'
}),
// We only detach the extension host on windows. Linux and Mac orphan by default
......@@ -155,14 +148,14 @@ export class ExtensionHostProcessWorker {
}
// Run Extension Host as fork of current process
this.extensionHostProcess = fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=extensionHost'], opts);
this._extensionHostProcess = fork(URI.parse(require.toUrl('bootstrap')).fsPath, ['--type=extensionHost'], opts);
// Catch all output coming from the extension host process
type Output = { data: string, format: string[] };
this.extensionHostProcess.stdout.setEncoding('utf8');
this.extensionHostProcess.stderr.setEncoding('utf8');
const onStdout = fromEventEmitter<string>(this.extensionHostProcess.stdout, 'data');
const onStderr = fromEventEmitter<string>(this.extensionHostProcess.stderr, 'data');
this._extensionHostProcess.stdout.setEncoding('utf8');
this._extensionHostProcess.stderr.setEncoding('utf8');
const onStdout = fromEventEmitter<string>(this._extensionHostProcess.stdout, 'data');
const onStderr = fromEventEmitter<string>(this._extensionHostProcess.stderr, 'data');
const onOutput = any(
mapEvent(onStdout, o => ({ data: `%c${o}`, format: [''] })),
mapEvent(onStderr, o => ({ data: `%c${o}`, format: ['color: red'] }))
......@@ -183,7 +176,7 @@ export class ExtensionHostProcessWorker {
});
// Support logging from extension host
this.extensionHostProcess.on('message', msg => {
this._extensionHostProcess.on('message', msg => {
if (msg && (<ILogEntry>msg).type === '__$console') {
this._logExtensionHostMessage(<ILogEntry>msg);
}
......@@ -192,10 +185,10 @@ export class ExtensionHostProcessWorker {
// Lifecycle
const globalExitListener = () => this.terminate();
process.once('exit', globalExitListener);
this.extensionHostProcess.on('error', (err) => this._onError(err));
this.extensionHostProcess.on('exit', (code: number, signal: string) => {
this._extensionHostProcess.on('error', (err) => this._onExtHostProcessError(err));
this._extensionHostProcess.on('exit', (code: number, signal: string) => {
process.removeListener('exit', globalExitListener);
this._onExit(code, signal);
this._onExtHostProcessExit(code, signal);
});
// Notify debugger that we are ready to attach to the process if we run a development extension
......@@ -222,29 +215,36 @@ export class ExtensionHostProcessWorker {
}
// Initialize extension host process with hand shakes
return this._tryExtHostHandshake(server).then((protocol) => {
return this._tryExtHostHandshake().then((protocol) => {
clearTimeout(startupTimeoutHandle);
return protocol;
return this._messagingProtocol;
});
});
}
private _tryListenOnPipe(): TPromise<[Server, string]> {
return new TPromise<[Server, string]>((resolve, reject) => {
const server = createServer();
server.on('error', reject);
const hook = generateRandomPipeName();
server.listen(hook, () => {
server.removeListener('error', reject);
resolve([server, hook]);
/**
* Start a server (`this._namedPipeServer`) that listens on a named pipe (`this._namedPipeName`)
*/
private _tryListenOnPipe(): TPromise<void> {
return new TPromise<void>((resolve, reject) => {
this._namedPipeName = generateRandomPipeName();
this._namedPipeServer = createServer();
this._namedPipeServer.on('error', reject);
this._namedPipeServer.listen(this._namedPipeName, () => {
this._namedPipeServer.removeListener('error', reject);
resolve(void 0);
});
});
}
/**
* Find a free port if extension host debugging is enabled.
*/
private _tryFindDebugPort(): TPromise<number> {
const extensionHostPort = this._environmentService.debugExtensionHost.port;
if (typeof extensionHostPort !== 'number') {
return TPromise.wrap<number>(void 0);
return TPromise.wrap<number>(0);
}
return new TPromise<number>((c, e) => {
findFreePort(extensionHostPort, 10 /* try 10 ports */, 5000 /* try up to 5 seconds */, (port) => {
......@@ -265,31 +265,48 @@ export class ExtensionHostProcessWorker {
});
}
private _tryExtHostHandshake(server: Server): TPromise<IMessagePassingProtocol> {
private _tryExtHostHandshake(): TPromise<void> {
return new TPromise<IMessagePassingProtocol>((resolve, reject) => {
return new TPromise<void>((resolve, reject) => {
// Wait for the extension host to connect to our named pipe
// and wrap the socket in the message passing protocol
let handle = setTimeout(() => reject('timeout'), 60 * 1000);
server.on('connection', socket => {
this._namedPipeServer.on('connection', socket => {
clearTimeout(handle);
const protocol = new Protocol(socket);
resolve(protocol);
this._extensionHostConnection = socket;
this._messagingProtocol = new Protocol(this._extensionHostConnection);
resolve(void 0);
});
}).then(protocol => {
}).then(() => {
// 1) wait for the incoming `ready` event and send the initialization data.
// 2) wait for the incoming `initialized` event.
return new TPromise<void>((resolve, reject) => {
const disposable = this._messagingProtocol.onMessage(msg => {
return new TPromise<IMessagePassingProtocol>((resolve, reject) => {
protocol.onMessage(msg => {
if (msg === 'ready') {
// 1) Host is ready to receive messages, initialize it
return this._createExtHostInitData().then(data => protocol.send(stringify(data)));
} else if (msg === 'initialized') {
this._createExtHostInitData().then(data => this._messagingProtocol.send(stringify(data)));
return;
}
if (msg === 'initialized') {
// 2) Host is initialized
this.messagingProtocol.resolve(protocol);
resolve(protocol);
// stop listening for messages here
disposable.dispose();
// release this promise
resolve(void 0);
return;
}
return undefined;
console.error(`received unexpected message during handshake phase from the extension host: `, msg);
});
});
});
......@@ -360,20 +377,20 @@ export class ExtensionHostProcessWorker {
}
}
private _onError(err: any): void {
private _onExtHostProcessError(err: any): void {
let errorMessage = toErrorMessage(err);
if (errorMessage === this.lastExtensionHostError) {
if (errorMessage === this._lastExtensionHostError) {
return; // prevent error spam
}
this.lastExtensionHostError = errorMessage;
this._lastExtensionHostError = errorMessage;
this._messageService.show(Severity.Error, nls.localize('extensionHostProcess.error', "Error from the extension host: {0}", errorMessage));
}
private _onExit(code: number, signal: string): void {
if (this.terminating) {
// Expected termination path (we asked it to terminate)
private _onExtHostProcessExit(code: number, signal: string): void {
if (this._terminating) {
// Expected termination path (we asked the process to terminate)
return;
}
......@@ -411,11 +428,14 @@ export class ExtensionHostProcessWorker {
}
public terminate(): void {
this.terminating = true;
if (this.extensionHostProcess) {
this.messagingProtocol.send({
this._terminating = true;
if (this._messagingProtocol) {
this._messagingProtocol.send({
type: '__$terminate'
});
} else {
// TODO@rehost
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册