提交 c0227bb7 编写于 作者: J Joao Moreno

debt: push down protocol agnostic IPC code

上级 2cc1a70b
......@@ -8,7 +8,7 @@
import Event, { Emitter } from 'vs/base/common/event';
import { EventEmitter } from 'events';
export function fromEventEmitter<T>(emitter: EventEmitter, eventName: string, map: (...args: any[]) => T): Event<T> {
export function fromEventEmitter<T>(emitter: EventEmitter, eventName: string, map: (...args: any[]) => T = id => id): Event<T> {
const fn = (...args) => result.fire(map(...args));
const onFirstListenerAdd = () => emitter.on(eventName, fn);
const onLastListenerRemove = () => emitter.removeListener(eventName, fn);
......
......@@ -7,7 +7,7 @@
import { Promise, TPromise } from 'vs/base/common/winjs.base';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import Event, { Emitter } from 'vs/base/common/event';
import Event, { Emitter, once, filterEvent } from 'vs/base/common/event';
enum MessageType {
RequestCommon,
......@@ -313,6 +313,105 @@ export class ChannelClient implements IChannelClient, IDisposable {
}
}
export interface ClientConnectionEvent {
protocol: IMessagePassingProtocol;
onDidClientDisconnect: Event<void>;
}
export class IPCServer implements IChannelServer, IRoutingChannelClient, IDisposable {
private channels: { [name: string]: IChannel } = Object.create(null);
private channelClients: { [id: string]: ChannelClient; } = Object.create(null);
private onClientAdded = new Emitter<string>();
constructor(onDidClientConnect: Event<ClientConnectionEvent>) {
onDidClientConnect(({ protocol, onDidClientDisconnect }) => {
const onFirstMessage = once(protocol.onMessage);
onFirstMessage(id => {
const channelServer = new ChannelServer(protocol);
const channelClient = new ChannelClient(protocol);
Object.keys(this.channels)
.forEach(name => channelServer.registerChannel(name, this.channels[name]));
this.channelClients[id] = channelClient;
this.onClientAdded.fire(id);
onDidClientDisconnect(() => {
channelServer.dispose();
channelClient.dispose();
delete this.channelClients[id];
});
});
});
}
getChannel<T extends IChannel>(channelName: string, router: IClientRouter): T {
const call = (command: string, arg: any) => {
const id = router.routeCall(command, arg);
if (!id) {
return TPromise.wrapError('Client id should be provided');
}
return this.getClient(id).then(client => client.getChannel(channelName).call(command, arg));
};
return { call } as T;
}
registerChannel(channelName: string, channel: IChannel): void {
this.channels[channelName] = channel;
}
private getClient(clientId: string): TPromise<IChannelClient> {
const client = this.channelClients[clientId];
if (client) {
return TPromise.as(client);
}
return new TPromise<IChannelClient>(c => {
const onClient = once(filterEvent(this.onClientAdded.event, id => id === clientId));
onClient(() => c(this.channelClients[clientId]));
});
}
dispose(): void {
this.channels = null;
this.channelClients = null;
this.onClientAdded.dispose();
}
}
export class IPCClient implements IChannelClient, IChannelServer, IDisposable {
private channelClient: ChannelClient;
private channelServer: ChannelServer;
constructor(protocol: IMessagePassingProtocol, id: string) {
protocol.send(id);
this.channelClient = new ChannelClient(protocol);
this.channelServer = new ChannelServer(protocol);
}
getChannel<T extends IChannel>(channelName: string): T {
return this.channelClient.getChannel(channelName) as T;
}
registerChannel(channelName: string, channel: IChannel): void {
this.channelServer.registerChannel(channelName, channel);
}
dispose(): void {
this.channelClient.dispose();
this.channelClient = null;
this.channelServer.dispose();
this.channelServer = null;
}
}
export function getDelayedChannel<T extends IChannel>(promise: TPromise<T>): T {
const call = (command, arg) => promise.then(c => c.call(command, arg));
return { call } as T;
......
......@@ -7,10 +7,9 @@
import { Socket, Server as NetServer, createConnection, createServer } from 'net';
import { TPromise } from 'vs/base/common/winjs.base';
import { IDisposable } from 'vs/base/common/lifecycle';
import Event, { Emitter, once } from 'vs/base/common/event';
import Event, { Emitter, once, mapEvent } from 'vs/base/common/event';
import { fromEventEmitter } from 'vs/base/node/event';
import { ChannelServer, ChannelClient, IMessagePassingProtocol, IChannelServer, IChannelClient, IRoutingChannelClient, IClientRouter, IChannel } from 'vs/base/parts/ipc/common/ipc';
import { IMessagePassingProtocol, ClientConnectionEvent, IPCServer, IPCClient } from 'vs/base/parts/ipc/common/ipc';
function bufferIndexOf(buffer: Buffer, value: number, start = 0) {
while (start < buffer.length && buffer[start] !== value) {
......@@ -73,141 +72,42 @@ class Protocol implements IMessagePassingProtocol {
}
}
class RoutingChannelClient implements IRoutingChannelClient, IDisposable {
export class Server extends IPCServer {
private ipcClients: { [id: string]: ChannelClient; };
private onClientAdded = new Emitter();
private static toClientConnectionEvent(server: NetServer): Event<ClientConnectionEvent> {
const onConnection = fromEventEmitter<Socket>(server, 'connection');
constructor() {
this.ipcClients = Object.create(null);
return mapEvent(onConnection, socket => ({
protocol: new Protocol(socket),
onDidClientDisconnect: once(fromEventEmitter<void>(socket, 'close'))
}));
}
add(id: string, client: ChannelClient): void {
this.ipcClients[id] = client;
this.onClientAdded.fire();
}
remove(id: string): void {
delete this.ipcClients[id];
}
private getClient(clientId: string): TPromise<IChannelClient> {
const getClientFn = (clientId: string, c: (client: IChannelClient) => void): boolean => {
let client = this.ipcClients[clientId];
if (client) {
c(client);
return true;
}
return false;
};
return new TPromise<IChannelClient>((c, e) => {
if (!getClientFn(clientId, c)) {
let disposable = this.onClientAdded.event(() => {
if (getClientFn(clientId, c)) {
disposable.dispose();
}
});
}
});
}
getChannel<T extends IChannel>(channelName: string, router: IClientRouter): T {
const call = (command: string, arg: any) => {
const id = router.routeCall(command, arg);
if (!id) {
return TPromise.wrapError('Client id should be provided');
}
return this.getClient(id).then(client => client.getChannel(channelName).call(command, arg));
};
return { call } as T;
}
dispose() {
this.ipcClients = null;
this.onClientAdded.dispose();
}
}
// TODO@joao: move multi channel implementation down to ipc
export class Server implements IChannelServer, IRoutingChannelClient, IDisposable {
private channels: { [name: string]: IChannel };
private router: RoutingChannelClient;
constructor(private server: NetServer) {
this.channels = Object.create(null);
this.router = new RoutingChannelClient();
this.server.on('connection', (socket: Socket) => {
const protocol = new Protocol(socket);
const onFirstMessage = once(protocol.onMessage);
onFirstMessage(id => {
const channelServer = new ChannelServer(protocol);
Object.keys(this.channels)
.forEach(name => channelServer.registerChannel(name, this.channels[name]));
const channelClient = new ChannelClient(protocol);
this.router.add(id, channelClient);
socket.once('close', () => {
channelClient.dispose();
this.router.remove(id);
channelServer.dispose();
});
});
});
}
getChannel<T extends IChannel>(channelName: string, router: IClientRouter): T {
return this.router.getChannel<T>(channelName, router);
}
registerChannel(channelName: string, channel: IChannel): void {
this.channels[channelName] = channel;
super(Server.toClientConnectionEvent(server));
}
dispose(): void {
this.router.dispose();
this.router = null;
this.channels = null;
super.dispose();
this.server.close();
this.server = null;
}
}
export class Client implements IChannelClient, IChannelServer, IDisposable {
private channelClient: ChannelClient;
private channelServer: ChannelServer;
export class Client extends IPCClient {
private _onClose = new Emitter<void>();
get onClose(): Event<void> { return this._onClose.event; }
constructor(private socket: Socket, id: string) {
const protocol = new Protocol(socket);
protocol.send(id);
this.channelClient = new ChannelClient(protocol);
this.channelServer = new ChannelServer(protocol);
super(new Protocol(socket), id);
socket.once('close', () => this._onClose.fire());
}
getChannel<T extends IChannel>(channelName: string): T {
return this.channelClient.getChannel(channelName) as T;
}
registerChannel(channelName: string, channel: IChannel): void {
this.channelServer.registerChannel(channelName, channel);
}
dispose(): void {
super.dispose();
this.socket.end();
this.socket = null;
this.channelClient = null;
this.channelServer.dispose();
this.channelServer = null;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册