提交 e6c5863e 编写于 作者: J Joao Moreno

ipc: use Event for onMessage

上级 95f24e58
...@@ -5,7 +5,7 @@ ...@@ -5,7 +5,7 @@
import { IDisposable, dispose } from 'vs/base/common/lifecycle'; import { IDisposable, dispose } from 'vs/base/common/lifecycle';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc'; import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc';
import Event, { chain } from 'vs/base/common/event'; import Event, { Emitter, chain } from 'vs/base/common/event';
import { fromEventEmitter } from 'vs/base/node/event'; import { fromEventEmitter } from 'vs/base/node/event';
import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelServer, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc'; import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelServer, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
...@@ -23,14 +23,17 @@ class Protocol implements IMessagePassingProtocol { ...@@ -23,14 +23,17 @@ class Protocol implements IMessagePassingProtocol {
private listener: IDisposable; private listener: IDisposable;
constructor(private sender: Sender, private onMessageEvent: Event<any>) {} private _onMessage: Event<any>;
get onMessage(): Event<any> { return this._onMessage; }
send(message: any): void { constructor(private sender: Sender, private onMessageEvent: Event<any>) {
this.sender.send(Message, message); const emitter = new Emitter<any>();
onMessageEvent(msg => emitter.fire(msg));
this._onMessage = emitter.event;
} }
onMessage(callback: (message: any) => void): void { send(message: any): void {
this.listener = this.onMessageEvent(callback); this.sender.send(Message, message);
} }
dispose(): void { dispose(): void {
......
...@@ -50,7 +50,7 @@ interface IHandler { ...@@ -50,7 +50,7 @@ interface IHandler {
export interface IMessagePassingProtocol { export interface IMessagePassingProtocol {
send(request: any): void; send(request: any): void;
onMessage(callback: (response: any) => void): void; onMessage: Event<any>;
} }
enum State { enum State {
...@@ -82,11 +82,12 @@ export class ChannelServer { ...@@ -82,11 +82,12 @@ export class ChannelServer {
private channels: { [name: string]: IChannel }; private channels: { [name: string]: IChannel };
private activeRequests: { [id: number]: IDisposable; }; private activeRequests: { [id: number]: IDisposable; };
private protocolListener: IDisposable;
constructor(private protocol: IMessagePassingProtocol) { constructor(private protocol: IMessagePassingProtocol) {
this.channels = Object.create(null); this.channels = Object.create(null);
this.activeRequests = Object.create(null); this.activeRequests = Object.create(null);
this.protocol.onMessage(r => this.onMessage(r)); this.protocolListener = this.protocol.onMessage(r => this.onMessage(r));
this.protocol.send(<IRawResponse> { type: MessageType.ResponseInitialize }); this.protocol.send(<IRawResponse> { type: MessageType.ResponseInitialize });
} }
...@@ -150,6 +151,9 @@ export class ChannelServer { ...@@ -150,6 +151,9 @@ export class ChannelServer {
} }
public dispose(): void { public dispose(): void {
this.protocolListener.dispose();
this.protocolListener = null;
Object.keys(this.activeRequests).forEach(id => { Object.keys(this.activeRequests).forEach(id => {
this.activeRequests[<any>id].dispose(); this.activeRequests[<any>id].dispose();
}); });
...@@ -165,6 +169,7 @@ export class ChannelClient implements IChannelClient, IDisposable { ...@@ -165,6 +169,7 @@ export class ChannelClient implements IChannelClient, IDisposable {
private bufferedRequests: IRequest[]; private bufferedRequests: IRequest[];
private handlers: { [id: number]: IHandler; }; private handlers: { [id: number]: IHandler; };
private lastRequestId: number; private lastRequestId: number;
private protocolListener: IDisposable;
constructor(private protocol: IMessagePassingProtocol) { constructor(private protocol: IMessagePassingProtocol) {
this.state = State.Uninitialized; this.state = State.Uninitialized;
...@@ -172,7 +177,7 @@ export class ChannelClient implements IChannelClient, IDisposable { ...@@ -172,7 +177,7 @@ export class ChannelClient implements IChannelClient, IDisposable {
this.bufferedRequests = []; this.bufferedRequests = [];
this.handlers = Object.create(null); this.handlers = Object.create(null);
this.lastRequestId = 0; this.lastRequestId = 0;
this.protocol.onMessage(r => this.onMessage(r)); this.protocolListener = this.protocol.onMessage(r => this.onMessage(r));
} }
getChannel<T extends IChannel>(channelName: string): T { getChannel<T extends IChannel>(channelName: string): T {
...@@ -298,6 +303,9 @@ export class ChannelClient implements IChannelClient, IDisposable { ...@@ -298,6 +303,9 @@ export class ChannelClient implements IChannelClient, IDisposable {
} }
dispose(): void { dispose(): void {
this.protocolListener.dispose();
this.protocolListener = null;
this.activeRequests.forEach(r => r.cancel()); this.activeRequests.forEach(r => r.cancel());
this.activeRequests = []; this.activeRequests = [];
} }
......
...@@ -8,13 +8,15 @@ import { IDisposable } from 'vs/base/common/lifecycle'; ...@@ -8,13 +8,15 @@ import { IDisposable } from 'vs/base/common/lifecycle';
import { Promise} from 'vs/base/common/winjs.base'; import { Promise} from 'vs/base/common/winjs.base';
import { Delayer } from 'vs/base/common/async'; import { Delayer } from 'vs/base/common/async';
import { clone, assign } from 'vs/base/common/objects'; import { clone, assign } from 'vs/base/common/objects';
import { Emitter } from 'vs/base/common/event';
import { fromEventEmitter } from 'vs/base/node/event';
import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc'; import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
export class Server extends IPCServer { export class Server extends IPCServer {
constructor() { constructor() {
super({ super({
send: r => { try { process.send(r); } catch (e) { /* not much to do */ } }, send: r => { try { process.send(r); } catch (e) { /* not much to do */ } },
onMessage: cb => process.on('message', cb) onMessage: fromEventEmitter(process, 'message', msg => msg)
}); });
process.once('disconnect', () => this.dispose()); process.once('disconnect', () => this.dispose());
...@@ -121,32 +123,37 @@ export class Client implements IChannelClient, IDisposable { ...@@ -121,32 +123,37 @@ export class Client implements IChannelClient, IDisposable {
} }
this.child = fork(this.modulePath, args, forkOpts); this.child = fork(this.modulePath, args, forkOpts);
this._client = new IPCClient({
send: r => this.child && this.child.connected && this.child.send(r), const onMessageEmitter = new Emitter<any>();
onMessage: cb => { const onRawMessage = fromEventEmitter(this.child, 'message', msg => msg);
this.child.on('message', (msg) => {
onRawMessage(msg => {
// Handle console logs specially // Handle console logs specially
if (msg && msg.type === '__$console') { if (msg && msg.type === '__$console') {
let args = ['%c[IPC Library: ' + this.options.serverName + ']', 'color: darkgreen']; let args = ['%c[IPC Library: ' + this.options.serverName + ']', 'color: darkgreen'];
try { try {
const parsed = JSON.parse(msg.arguments); const parsed = JSON.parse(msg.arguments);
args = args.concat(Object.getOwnPropertyNames(parsed).map(o => parsed[o])); args = args.concat(Object.getOwnPropertyNames(parsed).map(o => parsed[o]));
} catch (error) { } catch (error) {
args.push(msg.arguments); args.push(msg.arguments);
} }
console[msg.severity].apply(console, args); console[msg.severity].apply(console, args);
} return null;
}
// Anything else goes to the outside
else { // Anything else goes to the outside
cb(msg); else {
} onMessageEmitter.fire(msg);
});
} }
}); });
const send = r => this.child && this.child.connected && this.child.send(r);
const onMessage = onMessageEmitter.event;
const protocol = { send, onMessage };
this._client = new IPCClient(protocol);
const onExit = () => this.disposeClient(); const onExit = () => this.disposeClient();
process.once('exit', onExit); process.once('exit', onExit);
......
...@@ -23,7 +23,9 @@ function bufferIndexOf(buffer: Buffer, value: number, start = 0) { ...@@ -23,7 +23,9 @@ function bufferIndexOf(buffer: Buffer, value: number, start = 0) {
class Protocol implements IMessagePassingProtocol { class Protocol implements IMessagePassingProtocol {
private static Boundary = new Buffer([0]); private static Boundary = new Buffer([0]);
private onMessageEvent: Event<any>;
private _onMessage: Event<any>;
get onMessage(): Event<any> { return this._onMessage; }
constructor(private socket: Socket) { constructor(private socket: Socket) {
let buffer = null; let buffer = null;
...@@ -58,7 +60,7 @@ class Protocol implements IMessagePassingProtocol { ...@@ -58,7 +60,7 @@ class Protocol implements IMessagePassingProtocol {
} }
}); });
this.onMessageEvent = emitter.event; this._onMessage = emitter.event;
} }
public send(message: any): void { public send(message: any): void {
...@@ -69,10 +71,6 @@ class Protocol implements IMessagePassingProtocol { ...@@ -69,10 +71,6 @@ class Protocol implements IMessagePassingProtocol {
// noop // noop
} }
} }
public onMessage(callback: (message: any) => void): void {
this.onMessageEvent(callback);
}
} }
class RoutingChannelClient implements IMultiChannelClient { class RoutingChannelClient implements IMultiChannelClient {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册