提交 137452cf 编写于 作者: J Joao Moreno

wip: use ipc for exthost rpc

上级 82a2a476
......@@ -14,16 +14,6 @@ import { generateUuid } from 'vs/base/common/uuid';
import { IDisposable } from 'vs/base/common/lifecycle';
import { TimeoutTimer } from 'vs/base/common/async';
export function generateRandomPipeName(): string {
const randomSuffix = generateUuid();
if (process.platform === 'win32') {
return `\\\\.\\pipe\\vscode-ipc-${randomSuffix}-sock`;
} else {
// Mac/Unix: use socket file
return join(tmpdir(), `vscode-ipc-${randomSuffix}.sock`);
}
}
/**
* A message has the following format:
*
......@@ -273,3 +263,13 @@ export function connect(hook: any, clientId: string): Thenable<Client> {
socket.once('error', e);
});
}
export function generateRandomPipeName(): string {
const randomSuffix = generateUuid();
if (process.platform === 'win32') {
return `\\\\.\\pipe\\vscode-ipc-${randomSuffix}-sock`;
} else {
// Mac/Unix: use socket file
return join(tmpdir(), `vscode-ipc-${randomSuffix}.sock`);
}
}
\ No newline at end of file
......@@ -152,12 +152,13 @@ function deserialize(buffer: Buffer): { header: any, body: any } {
export class ChannelServer implements IChannelServer, IDisposable {
private channels = new Map<string, IChannel>();
private channels: Map<string, IChannel>;
private activeRequests = new Map<number, IDisposable>();
private protocolListener: IDisposable;
constructor(private protocol: IMessagePassingProtocol) {
constructor(private protocol: IMessagePassingProtocol, channels: Map<string, IChannel> = new Map<string, IChannel>()) {
this.protocolListener = this.protocol.onMessage(msg => this.onRawMessage(msg));
this.channels = channels;
this.sendResponse({ type: ResponseType.Initialize });
}
......@@ -483,11 +484,9 @@ export class IPCServer implements IChannelServer, IRoutingChannelClient, IDispos
const onFirstMessage = once(protocol.onMessage);
onFirstMessage(rawId => {
const channelServer = new ChannelServer(protocol);
const channelServer = new ChannelServer(protocol, this.channels);
const channelClient = new ChannelClient(protocol);
this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
const id = rawId.toString();
this.channelClients.set(id, channelClient);
this.onClientAdded.fire(id);
......@@ -628,3 +627,29 @@ export function getNextTickChannel<T extends IChannel>(channel: T): T {
}
} as T;
}
class SingleClientRouter implements IClientRouter {
constructor(private id: string) { }
async routeCall(): Promise<string> {
return this.id;
}
async routeEvent(): Promise<string> {
return this.id;
}
}
export class SingleClientChannelClient implements IChannelClient {
private router: IClientRouter;
constructor(private client: IRoutingChannelClient, id: string) {
this.router = new SingleClientRouter(id);
}
getChannel<T extends IChannel>(channelName: string): T {
return this.client.getChannel(channelName, this.router);
}
}
\ No newline at end of file
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
export const enum MessageType {
Initialized,
Ready,
Terminate
}
export function createMessageOfType(type: MessageType): Buffer {
const result = Buffer.allocUnsafe(1);
switch (type) {
case MessageType.Initialized: result.writeUInt8(1, 0); break;
case MessageType.Ready: result.writeUInt8(2, 0); break;
case MessageType.Terminate: result.writeUInt8(3, 0); break;
}
return result;
}
export function isMessageOfType(message: Buffer, type: MessageType): boolean {
if (message.length !== 1) {
return false;
}
switch (message.readUInt8(0)) {
case 1: return type === MessageType.Initialized;
case 2: return type === MessageType.Ready;
case 3: return type === MessageType.Terminate;
default: return false;
}
}
\ No newline at end of file
......@@ -17,13 +17,12 @@ import { IInitData, IEnvironment, IWorkspaceData, MainContext, MainThreadWorkspa
import * as errors from 'vs/base/common/errors';
import { ExtensionActivatedByEvent } from 'vs/workbench/api/node/extHostExtensionActivator';
import { IDisposable, dispose } from 'vs/base/common/lifecycle';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol';
import { URI } from 'vs/base/common/uri';
import { ExtHostLogService } from 'vs/workbench/api/node/extHostLogService';
import { timeout } from 'vs/base/common/async';
import { Counter } from 'vs/base/common/numbers';
import { CancellationTokenSource } from 'vs/base/common/cancellation';
import { IRPCProtocol } from 'vs/workbench/services/extensions/node/proxyIdentifier';
const nativeExit = process.exit.bind(process);
function patchProcess(allowExit: boolean) {
......@@ -88,8 +87,8 @@ export class ExtensionHostMain {
private _searchRequestIdProvider: Counter;
private _mainThreadWorkspace: MainThreadWorkspaceShape;
constructor(protocol: IMessagePassingProtocol, initData: IInitData) {
const rpcProtocol = new RPCProtocol(protocol);
constructor(rpcProtocol: IRPCProtocol, initData: IInitData) {
// ensure URIs are transformed and revived
initData = this.transform(initData, rpcProtocol);
......@@ -335,13 +334,13 @@ export class ExtensionHostMain {
return TPromise.wrapError<void>(new Error(requireError ? requireError.toString() : nls.localize('extensionTestError', "Path {0} does not point to a valid extension test runner.", this._environment.extensionTestsPath)));
}
private transform(initData: IInitData, rpcProtocol: RPCProtocol): IInitData {
private transform(initData: IInitData, rpcProtocol: IRPCProtocol): IInitData {
initData.extensions.forEach((ext) => (<any>ext).extensionLocation = URI.revive(ext.extensionLocation));
initData.environment.appRoot = URI.revive(initData.environment.appRoot);
initData.environment.appSettingsHome = URI.revive(initData.environment.appSettingsHome);
initData.environment.extensionDevelopmentLocationURI = URI.revive(initData.environment.extensionDevelopmentLocationURI);
initData.logsLocation = URI.revive(initData.logsLocation);
initData.workspace = rpcProtocol.transformIncomingURIs(initData.workspace);
// initData.workspace = rpcProtocol.transformIncomingURIs(initData.workspace);
return initData;
}
......
......@@ -8,12 +8,11 @@
import { onUnexpectedError } from 'vs/base/common/errors';
import { ExtensionHostMain, exit } from 'vs/workbench/node/extensionHostMain';
import { IInitData } from 'vs/workbench/api/node/extHost.protocol';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { Protocol } from 'vs/base/parts/ipc/node/ipc.net';
import { createConnection } from 'net';
import { Event, filterEvent } from 'vs/base/common/event';
import { createMessageOfType, MessageType, isMessageOfType } from 'vs/workbench/common/extensionHostProtocol';
import { IChannel } from 'vs/base/parts/ipc/node/ipc';
import { connect, Client } from 'vs/base/parts/ipc/node/ipc.net';
import { Event } from 'vs/base/common/event';
import * as nativeWatchdog from 'native-watchdog';
import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol.ipc';
// With Electron 2.x and node.js 8.x the "natives" module
// can cause a native crash (see https://github.com/nodejs/node/issues/19891 and
......@@ -33,133 +32,102 @@ import * as nativeWatchdog from 'native-watchdog';
};
})();
interface IRendererConnection {
protocol: IMessagePassingProtocol;
initData: IInitData;
}
// This calls exit directly in case the initialization is not finished and we need to exit
// Otherwise, if initialization completed we go to extensionHostMain.terminate()
let onTerminate = function () {
exit();
};
function createExtHostProtocol(): Promise<IMessagePassingProtocol> {
const pipeName = process.env.VSCODE_IPC_HOOK_EXTHOST;
return new Promise<IMessagePassingProtocol>((resolve, reject) => {
const socket = createConnection(pipeName, () => {
socket.removeListener('error', reject);
resolve(new Protocol(socket));
});
socket.once('error', reject);
function setup(initData: IInitData): void {
// Print a console message when rejection isn't handled within N seconds. For details:
// see https://nodejs.org/api/process.html#process_event_unhandledrejection
// and https://nodejs.org/api/process.html#process_event_rejectionhandled
const unhandledPromises: Promise<any>[] = [];
process.on('unhandledRejection', (reason: any, promise: Promise<any>) => {
unhandledPromises.push(promise);
setTimeout(() => {
const idx = unhandledPromises.indexOf(promise);
if (idx >= 0) {
unhandledPromises.splice(idx, 1);
console.warn('rejected promise not handled within 1 second');
onUnexpectedError(reason);
}
}, 1000);
});
}).then(protocol => {
process.on('rejectionHandled', (promise: Promise<any>) => {
const idx = unhandledPromises.indexOf(promise);
if (idx >= 0) {
unhandledPromises.splice(idx, 1);
}
});
return new class implements IMessagePassingProtocol {
// Print a console message when an exception isn't handled.
process.on('uncaughtException', function (err: Error) {
onUnexpectedError(err);
});
private _terminating = false;
// Kill oneself if one's parent dies. Much drama.
setInterval(function () {
try {
process.kill(initData.parentPid, 0); // throws an exception if the main process doesn't exist anymore.
} catch (e) {
onTerminate();
}
}, 1000);
// In certain cases, the event loop can become busy and never yield
// e.g. while-true or process.nextTick endless loops
// So also use the native node module to do it from a separate thread
let watchdog: typeof nativeWatchdog;
try {
watchdog = require.__$__nodeRequire('native-watchdog');
watchdog.start(initData.parentPid);
} catch (err) {
// no problem...
onUnexpectedError(err);
}
}
readonly onMessage: Event<any> = filterEvent(protocol.onMessage, msg => {
if (!isMessageOfType(msg, MessageType.Terminate)) {
return true;
}
this._terminating = true;
async function init(client: Client): Promise<void> {
// register a lifecycle channel and expect a terminate call
client.registerChannel('lifecycle', new class implements IChannel {
call<T>(command: string): Thenable<T> {
if (command === 'terminate') {
onTerminate();
return false;
});
send(msg: any): void {
if (!this._terminating) {
protocol.send(msg);
}
}
};
return Promise.resolve(null);
}
listen<T>(): Event<T> { throw new Error('Method not implemented.'); }
});
}
function connectToRenderer(protocol: IMessagePassingProtocol): Promise<IRendererConnection> {
return new Promise<IRendererConnection>((c, e) => {
// Listen init data message
const first = protocol.onMessage(raw => {
first.dispose();
const initData = <IInitData>JSON.parse(raw.toString());
// Print a console message when rejection isn't handled within N seconds. For details:
// see https://nodejs.org/api/process.html#process_event_unhandledrejection
// and https://nodejs.org/api/process.html#process_event_rejectionhandled
const unhandledPromises: Promise<any>[] = [];
process.on('unhandledRejection', (reason: any, promise: Promise<any>) => {
unhandledPromises.push(promise);
setTimeout(() => {
const idx = unhandledPromises.indexOf(promise);
if (idx >= 0) {
unhandledPromises.splice(idx, 1);
console.warn('rejected promise not handled within 1 second');
onUnexpectedError(reason);
}
}, 1000);
});
process.on('rejectionHandled', (promise: Promise<any>) => {
const idx = unhandledPromises.indexOf(promise);
if (idx >= 0) {
unhandledPromises.splice(idx, 1);
}
});
// Print a console message when an exception isn't handled.
process.on('uncaughtException', function (err: Error) {
onUnexpectedError(err);
});
// Kill oneself if one's parent dies. Much drama.
setInterval(function () {
try {
process.kill(initData.parentPid, 0); // throws an exception if the main process doesn't exist anymore.
} catch (e) {
onTerminate();
}
}, 1000);
// In certain cases, the event loop can become busy and never yield
// e.g. while-true or process.nextTick endless loops
// So also use the native node module to do it from a separate thread
let watchdog: typeof nativeWatchdog;
try {
watchdog = require.__$__nodeRequire('native-watchdog');
watchdog.start(initData.parentPid);
} catch (err) {
// no problem...
onUnexpectedError(err);
}
// get the other end's lifecycle channel and start initialization
const channel = client.getChannel('lifecycle');
// Tell the outside that we are initialized
protocol.send(createMessageOfType(MessageType.Initialized));
// get initData and setup
const initData = await channel.call<IInitData>('init');
setup(initData);
c({ protocol, initData });
});
// let renderer know we're done
await channel.call<void>('initDone');
// Tell the outside that we are ready to receive messages
protocol.send(createMessageOfType(MessageType.Ready));
});
// here, instead of this, we must create an rpcProtocol instance which
// is based on the IPC layer and not on the IMessagePassingProtocol
const rpcProtocol = new RPCProtocol(client, client);
// bootstrap extension host main
const extensionHostMain = new ExtensionHostMain(rpcProtocol, initData);
onTerminate = () => extensionHostMain.terminate();
return extensionHostMain.start();
}
patchExecArgv();
createExtHostProtocol().then(protocol => {
// connect to main side
return connectToRenderer(protocol);
}).then(renderer => {
// setup things
const extensionHostMain = new ExtensionHostMain(renderer.protocol, renderer.initData);
onTerminate = () => extensionHostMain.terminate();
return extensionHostMain.start();
}).catch(err => console.error(err));
connect(process.env.VSCODE_IPC_HOOK_EXTHOST, 'exthost')
.then(init)
.then(null, err => console.error(err));
function patchExecArgv() {
// when encountering the prevent-inspect flag we delete this
......
......@@ -19,9 +19,9 @@ import { ChildProcess, fork } from 'child_process';
import { ipcRenderer as ipc } from 'electron';
import product from 'vs/platform/node/product';
import { IEnvironmentService } from 'vs/platform/environment/common/environment';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { generateRandomPipeName, Protocol } from 'vs/base/parts/ipc/node/ipc.net';
import { createServer, Server, Socket } from 'net';
import { IChannel, SingleClientChannelClient } from 'vs/base/parts/ipc/node/ipc';
import { generateRandomPipeName, serve } from 'vs/base/parts/ipc/node/ipc.net';
import { Server, Socket } from 'net';
import { Event, Emitter, debounceEvent, mapEvent, anyEvent, fromNodeEventEmitter } from 'vs/base/common/event';
import { IInitData, IConfigurationInitData } from 'vs/workbench/api/node/extHost.protocol';
import { IExtensionDescription } from 'vs/workbench/services/extensions/common/extensions';
......@@ -30,22 +30,22 @@ import { ICrashReporterService } from 'vs/workbench/services/crashReporter/elect
import { IBroadcastService, IBroadcast } from 'vs/platform/broadcast/electron-browser/broadcastService';
import { isEqual } from 'vs/base/common/resources';
import { EXTENSION_CLOSE_EXTHOST_BROADCAST_CHANNEL, EXTENSION_RELOAD_BROADCAST_CHANNEL, EXTENSION_ATTACH_BROADCAST_CHANNEL, EXTENSION_LOG_BROADCAST_CHANNEL, EXTENSION_TERMINATE_BROADCAST_CHANNEL } from 'vs/platform/extensions/common/extensionHost';
import { IDisposable, dispose, toDisposable } from 'vs/base/common/lifecycle';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { IRemoteConsoleLog, log, parse } from 'vs/base/node/console';
import { getScopes } from 'vs/platform/configuration/common/configurationRegistry';
import { ILogService } from 'vs/platform/log/common/log';
import { INotificationService, Severity } from 'vs/platform/notification/common/notification';
import { getPathFromAmdModule } from 'vs/base/common/amd';
import { timeout } from 'vs/base/common/async';
import { isMessageOfType, MessageType, createMessageOfType } from 'vs/workbench/common/extensionHostProtocol';
import { ILabelService } from 'vs/platform/label/common/label';
import { URI } from 'vs/base/common/uri';
import { Schemas } from 'vs/base/common/network';
import { onUnexpectedError } from 'vs/base/common/errors';
import { IRPCProtocol } from 'vs/workbench/services/extensions/node/proxyIdentifier';
import { RPCProtocol } from 'vs/workbench/services/extensions/node/rpcProtocol.ipc';
export interface IExtensionHostStarter {
readonly onCrashed: Event<[number, string]>;
start(): TPromise<IMessagePassingProtocol>;
start(): TPromise<IRPCProtocol>;
getInspectPort(): number;
dispose(): void;
}
......@@ -71,7 +71,7 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
private _inspectPort: number;
private _extensionHostProcess: ChildProcess;
private _extensionHostConnection: Socket;
private _messageProtocol: TPromise<IMessagePassingProtocol>;
private _rpcProtocol: TPromise<IRPCProtocol>;
constructor(
private readonly _extensions: TPromise<IExtensionDescription[]>,
......@@ -103,7 +103,7 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
this._namedPipeServer = null;
this._extensionHostProcess = null;
this._extensionHostConnection = null;
this._messageProtocol = null;
this._rpcProtocol = null;
this._toDispose = [];
this._toDispose.push(this._onCrashed);
......@@ -140,23 +140,22 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
}
}
public start(): TPromise<IMessagePassingProtocol> {
public start(): TPromise<IRPCProtocol> {
if (this._terminating) {
// .terminate() was called
return null;
}
if (!this._messageProtocol) {
this._messageProtocol = TPromise.join([this._tryListenOnPipe(), this._tryFindDebugPort()]).then(data => {
const pipeName = data[0];
const portData = data[1];
if (!this._rpcProtocol) {
const namedPipe = generateRandomPipeName();
this._rpcProtocol = TPromise.join([serve(namedPipe), this._tryFindDebugPort()]).then(([server, portData]) => {
const opts = {
env: objects.mixin(objects.deepClone(process.env), {
AMD_ENTRYPOINT: 'vs/workbench/node/extensionHostProcess',
PIPE_LOGGING: 'true',
VERBOSE_LOGGING: true,
VSCODE_IPC_HOOK_EXTHOST: pipeName,
VSCODE_IPC_HOOK_EXTHOST: namedPipe,
VSCODE_HANDLES_UNCAUGHT_ERRORS: true,
VSCODE_LOG_STACK: !this._isExtensionDevTestFromCli && (this._isExtensionDevHost || !this._environmentService.isBuilt || product.quality !== 'stable' || this._environmentService.verbose),
VSCODE_LOG_LEVEL: this._environmentService.verbose ? 'trace' : this._environmentService.log
......@@ -243,48 +242,61 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
}
this._inspectPort = portData.actual;
// Help in case we fail to start it
let startupTimeoutHandle: number;
if (!this._environmentService.isBuilt || this._isExtensionDevHost) {
startupTimeoutHandle = setTimeout(() => {
const msg = this._isExtensionDevDebugBrk
? nls.localize('extensionHostProcess.startupFailDebug', "Extension host did not start in 10 seconds, it might be stopped on the first line and needs a debugger to continue.")
: nls.localize('extensionHostProcess.startupFail', "Extension host did not start in 10 seconds, that might be a problem.");
this._notificationService.prompt(Severity.Warning, msg,
[{
label: nls.localize('reloadWindow', "Reload Window"),
run: () => this._windowService.reloadWindow()
}]
);
}, 10000);
}
// create a single client channel client
const client = new SingleClientChannelClient(server, 'exthost');
// terminate exthost on disposal
this._toDispose.push(toDisposable(() => {
// send termination call
client.getChannel('lifecycle').call('terminate');
// Give the extension host 10s, after which we will
// try to kill the process and release any resources
setTimeout(() => this._cleanResources(), 10 * 1000);
}));
return new TPromise((c, e) => {
const that = this;
// Help in case we fail to start it
let startupTimeoutHandle: number;
if (!this._environmentService.isBuilt || this._isExtensionDevHost) {
startupTimeoutHandle = setTimeout(() => {
const msg = this._isExtensionDevDebugBrk
? nls.localize('extensionHostProcess.startupFailDebug', "Extension host did not start in 10 seconds, it might be stopped on the first line and needs a debugger to continue.")
: nls.localize('extensionHostProcess.startupFail', "Extension host did not start in 10 seconds, that might be a problem.");
this._notificationService.prompt(Severity.Warning, msg,
[{
label: nls.localize('reloadWindow', "Reload Window"),
run: () => this._windowService.reloadWindow()
}]
);
e(new Error(msg));
}, 10000);
}
// Initialize extension host process with hand shakes
return this._tryExtHostHandshake().then((protocol) => {
clearTimeout(startupTimeoutHandle);
return protocol;
server.registerChannel('lifecycle', new class implements IChannel {
call(command: string): Thenable<any> {
if (command === 'init') {
return that._createExtHostInitData();
} else if (command === 'initDone') {
clearTimeout(startupTimeoutHandle);
c(new RPCProtocol(client, server));
}
return Promise.resolve(null);
}
listen<T>(): Event<T> { throw new Error('Method not implemented.'); }
});
});
});
}
return this._messageProtocol;
}
/**
* Start a server (`this._namedPipeServer`) that listens on a named pipe and return the named pipe name.
*/
private _tryListenOnPipe(): Promise<string> {
return new Promise<string>((resolve, reject) => {
const pipeName = generateRandomPipeName();
this._namedPipeServer = createServer();
this._namedPipeServer.on('error', reject);
this._namedPipeServer.listen(pipeName, () => {
this._namedPipeServer.removeListener('error', reject);
resolve(pipeName);
});
});
return this._rpcProtocol;
}
/**
......@@ -317,68 +329,6 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
});
}
private _tryExtHostHandshake(): Promise<IMessagePassingProtocol> {
return new Promise<IMessagePassingProtocol>((resolve, reject) => {
// Wait for the extension host to connect to our named pipe
// and wrap the socket in the message passing protocol
let handle = setTimeout(() => {
this._namedPipeServer.close();
this._namedPipeServer = null;
reject('timeout');
}, 60 * 1000);
this._namedPipeServer.on('connection', socket => {
clearTimeout(handle);
this._namedPipeServer.close();
this._namedPipeServer = null;
this._extensionHostConnection = socket;
resolve(new Protocol(this._extensionHostConnection));
});
}).then((protocol) => {
// 1) wait for the incoming `ready` event and send the initialization data.
// 2) wait for the incoming `initialized` event.
return new Promise<IMessagePassingProtocol>((resolve, reject) => {
let handle = setTimeout(() => {
reject('timeout');
}, 60 * 1000);
const disposable = protocol.onMessage(msg => {
if (isMessageOfType(msg, MessageType.Ready)) {
// 1) Extension Host is ready to receive messages, initialize it
this._createExtHostInitData().then(data => protocol.send(Buffer.from(JSON.stringify(data))));
return;
}
if (isMessageOfType(msg, MessageType.Initialized)) {
// 2) Extension Host is initialized
clearTimeout(handle);
// stop listening for messages here
disposable.dispose();
// release this promise
// using a buffered message protocol here because between now
// and the first time a `then` executes some messages might be lost
// unless we immediately register a listener for `onMessage`.
resolve(new BufferedMessagePassingProtocol(protocol));
return;
}
console.error(`received unexpected message during handshake phase from the extension host: `, msg);
});
});
});
}
private _createExtHostInitData(): TPromise<IInitData> {
return TPromise.join([this._telemetryService.getTelemetryInfo(), this._extensions])
.then(([telemetryInfo, extensionDescriptions]) => {
......@@ -476,30 +426,6 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
return;
}
this._terminating = true;
dispose(this._toDispose);
if (!this._messageProtocol) {
// .start() was not called
return;
}
this._messageProtocol.then((protocol) => {
// Send the extension host a request to terminate itself
// (graceful termination)
protocol.send(createMessageOfType(MessageType.Terminate));
// Give the extension host 10s, after which we will
// try to kill the process and release any resources
setTimeout(() => this._cleanResources(), 10 * 1000);
}, (err) => {
// Establishing a protocol with the extension host failed, so
// try to kill the process and release any resources.
this._cleanResources();
});
}
private _cleanResources(): void {
......@@ -534,56 +460,56 @@ export class ExtensionHostProcessWorker implements IExtensionHostStarter {
}
}
/**
* Will ensure no messages are lost from creation time until the first user of onMessage comes in.
*/
class BufferedMessagePassingProtocol implements IMessagePassingProtocol {
private readonly _actual: IMessagePassingProtocol;
private _bufferedMessagesListener: IDisposable;
private _bufferedMessages: Buffer[];
constructor(actual: IMessagePassingProtocol) {
this._actual = actual;
this._bufferedMessages = [];
this._bufferedMessagesListener = this._actual.onMessage((buff) => this._bufferedMessages.push(buff));
}
public send(buffer: Buffer): void {
this._actual.send(buffer);
}
public onMessage(listener: (e: Buffer) => any, thisArgs?: any, disposables?: IDisposable[]): IDisposable {
if (!this._bufferedMessages) {
// second caller gets nothing
return this._actual.onMessage(listener, thisArgs, disposables);
}
// prepare result
const result = this._actual.onMessage(listener, thisArgs, disposables);
// stop listening to buffered messages
this._bufferedMessagesListener.dispose();
// capture buffered messages
const bufferedMessages = this._bufferedMessages;
this._bufferedMessages = null;
// it is important to deliver these messages after this call, but before
// other messages have a chance to be received (to guarantee in order delivery)
// that's why we're using here nextTick and not other types of timeouts
process.nextTick(() => {
// deliver buffered messages
while (bufferedMessages.length > 0) {
const msg = bufferedMessages.shift();
try {
listener.call(thisArgs, msg);
} catch (e) {
onUnexpectedError(e);
}
}
});
return result;
}
}
// /**
// * Will ensure no messages are lost from creation time until the first user of onMessage comes in.
// */
// class BufferedMessagePassingProtocol implements IMessagePassingProtocol {
// private readonly _actual: IMessagePassingProtocol;
// private _bufferedMessagesListener: IDisposable;
// private _bufferedMessages: Buffer[];
// constructor(actual: IMessagePassingProtocol) {
// this._actual = actual;
// this._bufferedMessages = [];
// this._bufferedMessagesListener = this._actual.onMessage((buff) => this._bufferedMessages.push(buff));
// }
// public send(buffer: Buffer): void {
// this._actual.send(buffer);
// }
// public onMessage(listener: (e: Buffer) => any, thisArgs?: any, disposables?: IDisposable[]): IDisposable {
// if (!this._bufferedMessages) {
// // second caller gets nothing
// return this._actual.onMessage(listener, thisArgs, disposables);
// }
// // prepare result
// const result = this._actual.onMessage(listener, thisArgs, disposables);
// // stop listening to buffered messages
// this._bufferedMessagesListener.dispose();
// // capture buffered messages
// const bufferedMessages = this._bufferedMessages;
// this._bufferedMessages = null;
// // it is important to deliver these messages after this call, but before
// // other messages have a chance to be received (to guarantee in order delivery)
// // that's why we're using here nextTick and not other types of timeouts
// process.nextTick(() => {
// // deliver buffered messages
// while (bufferedMessages.length > 0) {
// const msg = bufferedMessages.shift();
// try {
// listener.call(thisArgs, msg);
// } catch (e) {
// onUnexpectedError(e);
// }
// }
// });
// return result;
// }
// }
......@@ -21,14 +21,13 @@ import { IExtensionEnablementService, IExtensionIdentifier, EnablementState, IEx
import { areSameExtensions, BetterMergeId, BetterMergeDisabledNowKey, getGalleryExtensionIdFromLocal } from 'vs/platform/extensionManagement/common/extensionManagementUtil';
import { ExtensionsRegistry, ExtensionPoint, IExtensionPointUser, ExtensionMessageCollector, IExtensionPoint, schema } from 'vs/workbench/services/extensions/common/extensionsRegistry';
import { ExtensionScanner, ILog, ExtensionScannerInput, IExtensionResolver, IExtensionReference, Translations, IRelaxedExtensionDescription } from 'vs/workbench/services/extensions/node/extensionPoints';
import { ProxyIdentifier } from 'vs/workbench/services/extensions/node/proxyIdentifier';
import { ProxyIdentifier, IRPCProtocol } from 'vs/workbench/services/extensions/node/proxyIdentifier';
import { ExtHostContext, ExtHostExtensionServiceShape, IExtHostContext, MainContext } from 'vs/workbench/api/node/extHost.protocol';
import { ITelemetryService } from 'vs/platform/telemetry/common/telemetry';
import { IEnvironmentService } from 'vs/platform/environment/common/environment';
import { IStorageService } from 'vs/platform/storage/common/storage';
import { IInstantiationService } from 'vs/platform/instantiation/common/instantiation';
import { ExtensionHostProcessWorker, IExtensionHostStarter } from 'vs/workbench/services/extensions/electron-browser/extensionHost';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/node/ipc';
import { ExtHostCustomersRegistry } from 'vs/workbench/api/electron-browser/extHostCustomers';
import { IWindowService } from 'vs/platform/windows/common/windows';
import { IDisposable, Disposable } from 'vs/base/common/lifecycle';
......@@ -38,8 +37,8 @@ import { Barrier } from 'vs/base/common/async';
import { Event, Emitter } from 'vs/base/common/event';
import { ExtensionHostProfiler } from 'vs/workbench/services/extensions/electron-browser/extensionHostProfiler';
import product from 'vs/platform/node/product';
import * as strings from 'vs/base/common/strings';
import { RPCProtocol, IRPCProtocolLogger, RequestInitiator, ResponsiveState } from 'vs/workbench/services/extensions/node/rpcProtocol';
// import * as strings from 'vs/base/common/strings';
import { ResponsiveState } from 'vs/workbench/services/extensions/node/rpcProtocol';
import { INotificationService, Severity, INotificationHandle } from 'vs/platform/notification/common/notification';
import { isFalsyOrEmpty } from 'vs/base/common/arrays';
import { Schemas } from 'vs/base/common/network';
......@@ -49,8 +48,8 @@ import { IEditorService } from 'vs/workbench/services/editor/common/editorServic
import { RuntimeExtensionsInput } from 'vs/workbench/services/extensions/electron-browser/runtimeExtensionsInput';
// Enable to see detailed message communication between window and extension host
const LOG_EXTENSION_HOST_COMMUNICATION = false;
const LOG_USE_COLORS = true;
// const LOG_EXTENSION_HOST_COMMUNICATION = false;
// const LOG_USE_COLORS = true;
let _SystemExtensionsRoot: string = null;
function getSystemExtensionsRoot(): string {
......@@ -124,7 +123,6 @@ export class ExtensionHostProcessManager extends Disposable {
* A map of already activated events to speed things up if the same activation event is triggered multiple times.
*/
private readonly _extensionHostProcessFinishedActivateEvents: { [activationEvent: string]: boolean; };
private _extensionHostProcessRPCProtocol: RPCProtocol;
private readonly _extensionHostProcessCustomers: IDisposable[];
private readonly _extensionHostProcessWorker: IExtensionHostStarter;
/**
......@@ -136,18 +134,17 @@ export class ExtensionHostProcessManager extends Disposable {
extensionHostProcessWorker: IExtensionHostStarter,
initialActivationEvents: string[],
@IInstantiationService private readonly _instantiationService: IInstantiationService,
@IEnvironmentService private readonly _environmentService: IEnvironmentService,
// @IEnvironmentService private readonly _environmentService: IEnvironmentService,
) {
super();
this._extensionHostProcessFinishedActivateEvents = Object.create(null);
this._extensionHostProcessRPCProtocol = null;
this._extensionHostProcessCustomers = [];
this._extensionHostProcessWorker = extensionHostProcessWorker;
this.onDidCrash = this._extensionHostProcessWorker.onCrashed;
this._extensionHostProcessProxy = this._extensionHostProcessWorker.start().then(
(protocol) => {
return { value: this._createExtensionHostCustomers(protocol) };
(rpcProtocol) => {
return { value: this._createExtensionHostCustomers(rpcProtocol) };
},
(err) => {
console.error('Error received from starting extension host');
......@@ -164,9 +161,6 @@ export class ExtensionHostProcessManager extends Disposable {
if (this._extensionHostProcessWorker) {
this._extensionHostProcessWorker.dispose();
}
if (this._extensionHostProcessRPCProtocol) {
this._extensionHostProcessRPCProtocol.dispose();
}
for (let i = 0, len = this._extensionHostProcessCustomers.length; i < len; i++) {
const customer = this._extensionHostProcessCustomers[i];
try {
......@@ -184,19 +178,20 @@ export class ExtensionHostProcessManager extends Disposable {
return this._extensionHostProcessWorker && Boolean(this._extensionHostProcessWorker.getInspectPort());
}
private _createExtensionHostCustomers(protocol: IMessagePassingProtocol): ExtHostExtensionServiceShape {
private _createExtensionHostCustomers(rpcProtocol: IRPCProtocol): ExtHostExtensionServiceShape {
let logger: IRPCProtocolLogger = null;
if (LOG_EXTENSION_HOST_COMMUNICATION || this._environmentService.logExtensionHostCommunication) {
logger = new RPCLogger();
}
// TODO@joao
// let logger: IRPCProtocolLogger = null;
// if (LOG_EXTENSION_HOST_COMMUNICATION || this._environmentService.logExtensionHostCommunication) {
// logger = new RPCLogger();
// }
this._extensionHostProcessRPCProtocol = new RPCProtocol(protocol, logger);
this._register(this._extensionHostProcessRPCProtocol.onDidChangeResponsiveState((responsiveState: ResponsiveState) => this._onDidChangeResponsiveState.fire(responsiveState)));
// TODO@joao
// this._register(rpcProtocol.onDidChangeResponsiveState((responsiveState: ResponsiveState) => this._onDidChangeResponsiveState.fire(responsiveState)));
const extHostContext: IExtHostContext = {
getProxy: <T>(identifier: ProxyIdentifier<T>): T => this._extensionHostProcessRPCProtocol.getProxy(identifier),
set: <T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R => this._extensionHostProcessRPCProtocol.set(identifier, instance),
assertRegistered: (identifiers: ProxyIdentifier<any>[]): void => this._extensionHostProcessRPCProtocol.assertRegistered(identifiers),
getProxy: <T>(identifier: ProxyIdentifier<T>): T => rpcProtocol.getProxy(identifier),
set: <T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R => rpcProtocol.set(identifier, instance),
assertRegistered: (identifiers: ProxyIdentifier<any>[]): void => rpcProtocol.assertRegistered(identifiers),
};
// Named customers
......@@ -205,7 +200,7 @@ export class ExtensionHostProcessManager extends Disposable {
const [id, ctor] = namedCustomers[i];
const instance = this._instantiationService.createInstance(ctor, extHostContext);
this._extensionHostProcessCustomers.push(instance);
this._extensionHostProcessRPCProtocol.set(id, instance);
rpcProtocol.set(id, instance);
}
// Customers
......@@ -218,9 +213,9 @@ export class ExtensionHostProcessManager extends Disposable {
// Check that no named customers are missing
const expected: ProxyIdentifier<any>[] = Object.keys(MainContext).map((key) => (<any>MainContext)[key]);
this._extensionHostProcessRPCProtocol.assertRegistered(expected);
rpcProtocol.assertRegistered(expected);
return this._extensionHostProcessRPCProtocol.getProxy(ExtHostContext.ExtHostExtensionService);
return rpcProtocol.getProxy(ExtHostContext.ExtHostExtensionService);
}
public activateByEvent(activationEvent: string): TPromise<void> {
......@@ -1027,61 +1022,61 @@ export class ExtensionService extends Disposable implements IExtensionService {
}
}
const colorTables = [
['#2977B1', '#FC802D', '#34A13A', '#D3282F', '#9366BA'],
['#8B564C', '#E177C0', '#7F7F7F', '#BBBE3D', '#2EBECD']
];
function prettyWithoutArrays(data: any): any {
if (Array.isArray(data)) {
return data;
}
if (data && typeof data === 'object' && typeof data.toString === 'function') {
let result = data.toString();
if (result !== '[object Object]') {
return result;
}
}
return data;
}
function pretty(data: any): any {
if (Array.isArray(data)) {
return data.map(prettyWithoutArrays);
}
return prettyWithoutArrays(data);
}
class RPCLogger implements IRPCProtocolLogger {
private _totalIncoming = 0;
private _totalOutgoing = 0;
private _log(direction: string, totalLength, msgLength: number, req: number, initiator: RequestInitiator, str: string, data: any): void {
data = pretty(data);
const colorTable = colorTables[initiator];
const color = LOG_USE_COLORS ? colorTable[req % colorTable.length] : '#000000';
let args = [`%c[${direction}]%c[${strings.pad(totalLength, 7, ' ')}]%c[len: ${strings.pad(msgLength, 5, ' ')}]%c${strings.pad(req, 5, ' ')} - ${str}`, 'color: darkgreen', 'color: grey', 'color: grey', `color: ${color}`];
if (/\($/.test(str)) {
args = args.concat(data);
args.push(')');
} else {
args.push(data);
}
console.log.apply(console, args);
}
logIncoming(msgLength: number, req: number, initiator: RequestInitiator, str: string, data?: any): void {
this._totalIncoming += msgLength;
this._log('Ext \u2192 Win', this._totalIncoming, msgLength, req, initiator, str, data);
}
logOutgoing(msgLength: number, req: number, initiator: RequestInitiator, str: string, data?: any): void {
this._totalOutgoing += msgLength;
this._log('Win \u2192 Ext', this._totalOutgoing, msgLength, req, initiator, str, data);
}
}
// const colorTables = [
// ['#2977B1', '#FC802D', '#34A13A', '#D3282F', '#9366BA'],
// ['#8B564C', '#E177C0', '#7F7F7F', '#BBBE3D', '#2EBECD']
// ];
// function prettyWithoutArrays(data: any): any {
// if (Array.isArray(data)) {
// return data;
// }
// if (data && typeof data === 'object' && typeof data.toString === 'function') {
// let result = data.toString();
// if (result !== '[object Object]') {
// return result;
// }
// }
// return data;
// }
// function pretty(data: any): any {
// if (Array.isArray(data)) {
// return data.map(prettyWithoutArrays);
// }
// return prettyWithoutArrays(data);
// }
// class RPCLogger implements IRPCProtocolLogger {
// private _totalIncoming = 0;
// private _totalOutgoing = 0;
// private _log(direction: string, totalLength, msgLength: number, req: number, initiator: RequestInitiator, str: string, data: any): void {
// data = pretty(data);
// const colorTable = colorTables[initiator];
// const color = LOG_USE_COLORS ? colorTable[req % colorTable.length] : '#000000';
// let args = [`%c[${direction}]%c[${strings.pad(totalLength, 7, ' ')}]%c[len: ${strings.pad(msgLength, 5, ' ')}]%c${strings.pad(req, 5, ' ')} - ${str}`, 'color: darkgreen', 'color: grey', 'color: grey', `color: ${color}`];
// if (/\($/.test(str)) {
// args = args.concat(data);
// args.push(')');
// } else {
// args.push(data);
// }
// console.log.apply(console, args);
// }
// logIncoming(msgLength: number, req: number, initiator: RequestInitiator, str: string, data?: any): void {
// this._totalIncoming += msgLength;
// this._log('Ext \u2192 Win', this._totalIncoming, msgLength, req, initiator, str, data);
// }
// logOutgoing(msgLength: number, req: number, initiator: RequestInitiator, str: string, data?: any): void {
// this._totalOutgoing += msgLength;
// this._log('Win \u2192 Ext', this._totalOutgoing, msgLength, req, initiator, str, data);
// }
// }
interface IExtensionCacheData {
input: ExtensionScannerInput;
......
/*---------------------------------------------------------------------------------------------
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*--------------------------------------------------------------------------------------------*/
'use strict';
import { IRPCProtocol, ProxyIdentifier } from 'vs/workbench/services/extensions/node/proxyIdentifier';
import { IChannelServer, IChannelClient, IChannel } from 'vs/base/parts/ipc/node/ipc';
import { CharCode } from 'vs/base/common/charCode';
import { Event } from 'vs/base/common/event';
declare var Proxy: any; // TODO@TypeScript
export class RPCProtocol implements IRPCProtocol {
private identifiers = new Set<number>();
private proxies = new Map<number, any>();
constructor(private client: IChannelClient, private server: IChannelServer) { }
getProxy<T>(identifier: ProxyIdentifier<T>): T {
let result = this.proxies.get(identifier.nid);
if (!result) {
const channel = this.client.getChannel(`rpc${identifier.nid}`);
result = new Proxy(Object.create(null), {
get: (target: any, name: string) => {
if (!target[name] && name.charCodeAt(0) === CharCode.DollarSign) {
target[name] = (...myArgs: any[]) => {
return channel.call(name, myArgs);
};
}
return target[name];
}
});
this.proxies.set(identifier.nid, result);
}
return result;
}
set<T, R extends T>(identifier: ProxyIdentifier<T>, instance: R): R {
const channel = new class implements IChannel {
call<T>(command: string, arg?: any): Thenable<T> {
return Promise.resolve(instance[command].apply(instance, arg));
}
listen<T>(): Event<T> {
throw new Error('Method not implemented.');
}
};
this.server.registerChannel(`rpc${identifier.nid}`, channel);
this.identifiers.add(identifier.nid);
return instance;
}
assertRegistered(identifiers: ProxyIdentifier<any>[]): void {
for (let i = 0, len = identifiers.length; i < len; i++) {
const identifier = identifiers[i];
if (!this.identifiers.has(identifier.nid)) {
throw new Error(`Missing actor ${identifier.sid} (isMain: ${identifier.isMain})`);
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册