diff --git a/src/vs/base/parts/ipc/common/ipc.electron.ts b/src/vs/base/parts/ipc/common/ipc.electron.ts index e1a6e1c537830c74e3a6bb6771c4dd6bfff0b616..9e4de8cd357d9617ddb2e36f184558bd26ac8b3f 100644 --- a/src/vs/base/parts/ipc/common/ipc.electron.ts +++ b/src/vs/base/parts/ipc/common/ipc.electron.ts @@ -5,7 +5,7 @@ import { IDisposable, dispose } from 'vs/base/common/lifecycle'; import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc'; -import Event, { chain } from 'vs/base/common/event'; +import Event, { Emitter, chain } from 'vs/base/common/event'; import { fromEventEmitter } from 'vs/base/node/event'; import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelServer, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc'; @@ -23,14 +23,17 @@ class Protocol implements IMessagePassingProtocol { private listener: IDisposable; - constructor(private sender: Sender, private onMessageEvent: Event) {} + private _onMessage: Event; + get onMessage(): Event { return this._onMessage; } - send(message: any): void { - this.sender.send(Message, message); + constructor(private sender: Sender, private onMessageEvent: Event) { + const emitter = new Emitter(); + onMessageEvent(msg => emitter.fire(msg)); + this._onMessage = emitter.event; } - onMessage(callback: (message: any) => void): void { - this.listener = this.onMessageEvent(callback); + send(message: any): void { + this.sender.send(Message, message); } dispose(): void { diff --git a/src/vs/base/parts/ipc/common/ipc.ts b/src/vs/base/parts/ipc/common/ipc.ts index bad73f428be3b806efe120a9de905e34d22abcb4..0d7d322d7d96f01420c3db9f806e91b9ad306e1e 100644 --- a/src/vs/base/parts/ipc/common/ipc.ts +++ b/src/vs/base/parts/ipc/common/ipc.ts @@ -50,7 +50,7 @@ interface IHandler { export interface IMessagePassingProtocol { send(request: any): void; - onMessage(callback: (response: any) => void): void; + onMessage: Event; } enum State { @@ -82,11 +82,12 @@ export class ChannelServer { private channels: { [name: string]: IChannel }; private activeRequests: { [id: number]: IDisposable; }; + private protocolListener: IDisposable; constructor(private protocol: IMessagePassingProtocol) { this.channels = Object.create(null); this.activeRequests = Object.create(null); - this.protocol.onMessage(r => this.onMessage(r)); + this.protocolListener = this.protocol.onMessage(r => this.onMessage(r)); this.protocol.send( { type: MessageType.ResponseInitialize }); } @@ -150,6 +151,9 @@ export class ChannelServer { } public dispose(): void { + this.protocolListener.dispose(); + this.protocolListener = null; + Object.keys(this.activeRequests).forEach(id => { this.activeRequests[id].dispose(); }); @@ -165,6 +169,7 @@ export class ChannelClient implements IChannelClient, IDisposable { private bufferedRequests: IRequest[]; private handlers: { [id: number]: IHandler; }; private lastRequestId: number; + private protocolListener: IDisposable; constructor(private protocol: IMessagePassingProtocol) { this.state = State.Uninitialized; @@ -172,7 +177,7 @@ export class ChannelClient implements IChannelClient, IDisposable { this.bufferedRequests = []; this.handlers = Object.create(null); this.lastRequestId = 0; - this.protocol.onMessage(r => this.onMessage(r)); + this.protocolListener = this.protocol.onMessage(r => this.onMessage(r)); } getChannel(channelName: string): T { @@ -298,6 +303,9 @@ export class ChannelClient implements IChannelClient, IDisposable { } dispose(): void { + this.protocolListener.dispose(); + this.protocolListener = null; + this.activeRequests.forEach(r => r.cancel()); this.activeRequests = []; } diff --git a/src/vs/base/parts/ipc/node/ipc.cp.ts b/src/vs/base/parts/ipc/node/ipc.cp.ts index f64ab02a95f8b8c7bc212884fb3146eebd45f18b..8c9370dd6a0ea747986a7298411226eb2c9288ef 100644 --- a/src/vs/base/parts/ipc/node/ipc.cp.ts +++ b/src/vs/base/parts/ipc/node/ipc.cp.ts @@ -8,13 +8,15 @@ import { IDisposable } from 'vs/base/common/lifecycle'; import { Promise} from 'vs/base/common/winjs.base'; import { Delayer } from 'vs/base/common/async'; import { clone, assign } from 'vs/base/common/objects'; +import { Emitter } from 'vs/base/common/event'; +import { fromEventEmitter } from 'vs/base/node/event'; import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc'; export class Server extends IPCServer { constructor() { super({ send: r => { try { process.send(r); } catch (e) { /* not much to do */ } }, - onMessage: cb => process.on('message', cb) + onMessage: fromEventEmitter(process, 'message', msg => msg) }); process.once('disconnect', () => this.dispose()); @@ -121,32 +123,37 @@ export class Client implements IChannelClient, IDisposable { } this.child = fork(this.modulePath, args, forkOpts); - this._client = new IPCClient({ - send: r => this.child && this.child.connected && this.child.send(r), - onMessage: cb => { - this.child.on('message', (msg) => { - - // Handle console logs specially - if (msg && msg.type === '__$console') { - let args = ['%c[IPC Library: ' + this.options.serverName + ']', 'color: darkgreen']; - try { - const parsed = JSON.parse(msg.arguments); - args = args.concat(Object.getOwnPropertyNames(parsed).map(o => parsed[o])); - } catch (error) { - args.push(msg.arguments); - } - - console[msg.severity].apply(console, args); - } - - // Anything else goes to the outside - else { - cb(msg); - } - }); + + const onMessageEmitter = new Emitter(); + const onRawMessage = fromEventEmitter(this.child, 'message', msg => msg); + + onRawMessage(msg => { + // Handle console logs specially + if (msg && msg.type === '__$console') { + let args = ['%c[IPC Library: ' + this.options.serverName + ']', 'color: darkgreen']; + try { + const parsed = JSON.parse(msg.arguments); + args = args.concat(Object.getOwnPropertyNames(parsed).map(o => parsed[o])); + } catch (error) { + args.push(msg.arguments); + } + + console[msg.severity].apply(console, args); + return null; + } + + // Anything else goes to the outside + else { + onMessageEmitter.fire(msg); } }); + const send = r => this.child && this.child.connected && this.child.send(r); + const onMessage = onMessageEmitter.event; + const protocol = { send, onMessage }; + + this._client = new IPCClient(protocol); + const onExit = () => this.disposeClient(); process.once('exit', onExit); diff --git a/src/vs/base/parts/ipc/node/ipc.net.ts b/src/vs/base/parts/ipc/node/ipc.net.ts index a68ad6d1a1a8ed154b4c557108ebd4ae809c5f2c..aa6b65b3b7f045af0bc826203721fba1fe39d0b1 100644 --- a/src/vs/base/parts/ipc/node/ipc.net.ts +++ b/src/vs/base/parts/ipc/node/ipc.net.ts @@ -23,7 +23,9 @@ function bufferIndexOf(buffer: Buffer, value: number, start = 0) { class Protocol implements IMessagePassingProtocol { private static Boundary = new Buffer([0]); - private onMessageEvent: Event; + + private _onMessage: Event; + get onMessage(): Event { return this._onMessage; } constructor(private socket: Socket) { let buffer = null; @@ -58,7 +60,7 @@ class Protocol implements IMessagePassingProtocol { } }); - this.onMessageEvent = emitter.event; + this._onMessage = emitter.event; } public send(message: any): void { @@ -69,10 +71,6 @@ class Protocol implements IMessagePassingProtocol { // noop } } - - public onMessage(callback: (message: any) => void): void { - this.onMessageEvent(callback); - } } class RoutingChannelClient implements IMultiChannelClient {