提交 5e6580aa 编写于 作者: J Joao Moreno

💄 ipc

上级 787b9c64
......@@ -7,7 +7,7 @@ import { IDisposable, dispose } from 'vs/base/common/lifecycle';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc';
import Event, { chain } from 'vs/base/common/event';
import { fromEventEmitter } from 'vs/base/node/event';
import { Server as IPCServer, Client as IPCClient, IServer, IClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelServer, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
const Hello = 'ipc:hello';
const Goodbye = 'ipc:goodbye';
......@@ -43,7 +43,7 @@ interface IIPCEvent {
message: string;
}
export class Server implements IServer, IDisposable {
export class Server implements IChannelServer, IDisposable {
private channels: { [name: string]: IChannel } = Object.create(null);
......@@ -84,7 +84,7 @@ export class Server implements IServer, IDisposable {
}
}
export class Client implements IClient, IDisposable {
export class Client implements IChannelClient, IDisposable {
private protocol: Protocol;
private ipcClient: IPCClient;
......
......@@ -62,15 +62,23 @@ export interface IChannel {
call(command: string, arg: any): TPromise<any>;
}
export interface IServer {
export interface IChannelServer {
registerChannel(channelName: string, channel: IChannel): void;
}
export interface IClient {
export interface IChannelClient {
getChannel<T extends IChannel>(channelName: string): T;
}
export class Server {
export interface IClientRouter {
routeCall(command: string, arg: any): string;
}
export interface IMultiChannelClient {
getChannel<T extends IChannel>(channelName: string, router: IClientRouter): T;
}
export class ChannelServer {
private channels: { [name: string]: IChannel };
private activeRequests: { [id: number]: IDisposable; };
......@@ -150,7 +158,7 @@ export class Server {
}
}
export class Client implements IClient, IDisposable {
export class ChannelClient implements IChannelClient, IDisposable {
private state: State;
private activeRequests: Promise[];
......
......@@ -8,7 +8,7 @@ import { IDisposable } from 'vs/base/common/lifecycle';
import { Promise} from 'vs/base/common/winjs.base';
import { Delayer } from 'vs/base/common/async';
import { clone, assign } from 'vs/base/common/objects';
import { Server as IPCServer, Client as IPCClient, IClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
import { ChannelServer as IPCServer, ChannelClient as IPCClient, IChannelClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
export class Server extends IPCServer {
constructor() {
......@@ -54,7 +54,7 @@ export interface IIPCOptions {
debugBrk?:number;
}
export class Client implements IClient, IDisposable {
export class Client implements IChannelClient, IDisposable {
private disposeDelayer: Delayer<void>;
private activeRequests: Promise[];
......
......@@ -10,7 +10,7 @@ import { TPromise } from 'vs/base/common/winjs.base';
import { IDisposable } from 'vs/base/common/lifecycle';
import Event, { Emitter } from 'vs/base/common/event';
import { fromEventEmitter } from 'vs/base/node/event';
import { Server as IPCServer, Client as IPCClient, IMessagePassingProtocol, IServer, IClient, IChannel } from 'vs/base/parts/ipc/common/ipc';
import { ChannelServer, ChannelClient, IMessagePassingProtocol, IChannelServer, IChannelClient, IMultiChannelClient, IClientRouter, IChannel } from 'vs/base/parts/ipc/common/ipc';
function bufferIndexOf(buffer: Buffer, value: number, start = 0) {
while (start < buffer.length && buffer[start] !== value) {
......@@ -75,21 +75,15 @@ class Protocol implements IMessagePassingProtocol {
}
}
// TODO: NAME IT BETTER
export interface IClientPicker {
getClientId(): string;
}
// TODO: NAME IT BETTER
class ClientMultiplexer implements IClient {
class RoutingChannelClient implements IMultiChannelClient {
private ipcClients: { [id:string]: IPCClient; };
private ipcClients: { [id:string]: ChannelClient; };
constructor(private clientPicker: IClientPicker) {
constructor() {
this.ipcClients = Object.create(null);
}
add(id: string, client: IPCClient): void {
add(id: string, client: ChannelClient): void {
this.ipcClients[id] = client;
}
......@@ -97,31 +91,32 @@ class ClientMultiplexer implements IClient {
delete this.ipcClients[id];
}
getChannel<T extends IChannel>(channelName: string): T {
return {
call: (command: string, arg: any) => {
const id = this.clientPicker.getClientId();
const client = this.ipcClients[id];
if (!client) {
return TPromise.wrapError('Client unknown');
}
getChannel<T extends IChannel>(channelName: string, router: IClientRouter): T {
const call = (command: string, arg: any) => {
const id = router.routeCall(command, arg);
const client = this.ipcClients[id];
return client.getChannel(channelName).call(command, arg);
if (!client) {
return TPromise.wrapError('Unknown client');
}
} as T;
return client.getChannel(channelName)
.call(command, arg);
};
return { call } as T;
}
}
// TODO: name it better!
export class Server implements IServer, IClient, IDisposable {
// TODO@joao: move multi channel implementation down to ipc
export class Server implements IChannelServer, IMultiChannelClient, IDisposable {
private channels: { [name: string]: IChannel };
private clientMultiplexer: ClientMultiplexer;
private router: RoutingChannelClient;
constructor(private server: NetServer, private clientPicker: IClientPicker) {
constructor(private server: NetServer) {
this.channels = Object.create(null);
this.clientMultiplexer = new ClientMultiplexer(clientPicker);
this.router = new RoutingChannelClient();
this.server.on('connection', (socket: Socket) => {
const protocol = new Protocol(socket);
......@@ -134,25 +129,25 @@ export class Server implements IServer, IClient, IDisposable {
didGetId = true;
const ipcServer = new IPCServer(protocol);
const channelServer = new ChannelServer(protocol);
Object.keys(this.channels)
.forEach(name => ipcServer.registerChannel(name, this.channels[name]));
.forEach(name => channelServer.registerChannel(name, this.channels[name]));
const ipcClient = new IPCClient(protocol);
this.clientMultiplexer.add(id, ipcClient);
const channelClient = new ChannelClient(protocol);
this.router.add(id, channelClient);
socket.once('close', () => {
ipcClient.dispose();
this.clientMultiplexer.remove(id);
ipcServer.dispose();
channelClient.dispose();
this.router.remove(id);
channelServer.dispose();
});
});
});
}
getChannel<T extends IChannel>(channelName: string): T {
return this.clientMultiplexer.getChannel<T>(channelName);
getChannel<T extends IChannel>(channelName: string, router: IClientRouter): T {
return this.router.getChannel<T>(channelName, router);
}
registerChannel(channelName: string, channel: IChannel): void {
......@@ -166,11 +161,10 @@ export class Server implements IServer, IClient, IDisposable {
}
}
// TODO: name it!
export class Client implements IClient, IServer, IDisposable {
export class Client implements IChannelClient, IChannelServer, IDisposable {
private ipcClient: IPCClient;
private ipcServer: IPCServer;
private channelClient: ChannelClient;
private channelServer: ChannelServer;
private _onClose = new Emitter<void>();
get onClose(): Event<void> { return this._onClose.event; }
......@@ -179,25 +173,25 @@ export class Client implements IClient, IServer, IDisposable {
const protocol = new Protocol(socket);
protocol.send(id);
this.ipcClient = new IPCClient(protocol);
this.ipcServer = new IPCServer(protocol);
this.channelClient = new ChannelClient(protocol);
this.channelServer = new ChannelServer(protocol);
socket.once('close', () => this._onClose.fire());
}
getChannel<T extends IChannel>(channelName: string): T {
return this.ipcClient.getChannel(channelName) as T;
return this.channelClient.getChannel(channelName) as T;
}
registerChannel(channelName: string, channel: IChannel): void {
this.ipcServer.registerChannel(channelName, channel);
this.channelServer.registerChannel(channelName, channel);
}
dispose(): void {
this.socket.end();
this.socket = null;
this.ipcClient = null;
this.ipcServer.dispose();
this.ipcServer = null;
this.channelClient = null;
this.channelServer.dispose();
this.channelServer = null;
}
}
......@@ -210,20 +204,18 @@ export function serve(hook: any): TPromise<Server> {
server.on('error', e);
server.listen(hook, () => {
server.removeListener('error', e);
c(new Server(server, {
getClientId: () => ''
}));
c(new Server(server));
});
});
}
export function connect(port: number): TPromise<Client>;
export function connect(namedPipe: string): TPromise<Client>;
export function connect(hook: any): TPromise<Client> {
export function connect(port: number, clientId?: string): TPromise<Client>;
export function connect(namedPipe: string, clientId?: string): TPromise<Client>;
export function connect(hook: any, clientId: string = ''): TPromise<Client> {
return new TPromise<Client>((c, e) => {
const socket = createConnection(hook, () => {
socket.removeListener('error', e);
c(new Client(socket, ''));
c(new Client(socket, clientId));
});
socket.once('error', e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册