未验证 提交 260b72e2 编写于 作者: J Joao Moreno

ipc: multicast events

上级 ec284758
...@@ -584,3 +584,7 @@ export function mapArrayOrNot<T, U>(items: T | T[], fn: (_: T) => U): U | U[] { ...@@ -584,3 +584,7 @@ export function mapArrayOrNot<T, U>(items: T | T[], fn: (_: T) => U): U | U[] {
export function asArray<T>(x: T | T[]): T[] { export function asArray<T>(x: T | T[]): T[] {
return Array.isArray(x) ? x : [x]; return Array.isArray(x) ? x : [x];
} }
export function getRandomElement<T>(arr: T[]): T {
return arr[Math.floor(Math.random() * arr.length)];
}
...@@ -9,6 +9,7 @@ import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/com ...@@ -9,6 +9,7 @@ import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/com
import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation'; import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation';
import * as errors from 'vs/base/common/errors'; import * as errors from 'vs/base/common/errors';
import { VSBuffer } from 'vs/base/common/buffer'; import { VSBuffer } from 'vs/base/common/buffer';
import { getRandomElement } from 'vs/base/common/arrays';
/** /**
* An `IChannel` is an abstraction over a collection of commands. * An `IChannel` is an abstraction over a collection of commands.
...@@ -117,8 +118,7 @@ export interface IClientRouter<TContext = string> { ...@@ -117,8 +118,7 @@ export interface IClientRouter<TContext = string> {
* order to pick the right one. * order to pick the right one.
*/ */
export interface IRoutingChannelClient<TContext = string> { export interface IRoutingChannelClient<TContext = string> {
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T; getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T;
getBroadcastChannel<T extends IChannel>(channelName: string): T;
} }
interface IReader { interface IReader {
...@@ -700,18 +700,26 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I ...@@ -700,18 +700,26 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
}); });
} }
getChannel<T extends IChannel>(channelName: string, router: IClientRouter<TContext>): T { getChannel<T extends IChannel>(channelName: string, router?: IClientRouter<TContext>): T {
const that = this; const that = this;
return { return {
call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> { call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise<T> {
const channelPromise = router.routeCall(that, command, arg) const connectionPromise = router
? router.routeCall(that, command, arg)
: Promise.resolve(getRandomElement(that.connections));
const channelPromise = connectionPromise
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName)); .then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
return getDelayedChannel(channelPromise) return getDelayedChannel(channelPromise)
.call(command, arg, cancellationToken); .call(command, arg, cancellationToken);
}, },
listen(event: string, arg: any): Event<T> { listen(event: string, arg: any): Event<T> {
if (!router) {
return that.getMulticastEvent(channelName, event, arg);
}
const channelPromise = router.routeEvent(that, event, arg) const channelPromise = router.routeEvent(that, event, arg)
.then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName)); .then(connection => (connection as Connection<TContext>).channelClient.getChannel(channelName));
...@@ -721,64 +729,56 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I ...@@ -721,64 +729,56 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
} as T; } as T;
} }
getBroadcastChannel<T extends IChannel>(channelName: string): T { private getMulticastEvent<T extends IChannel>(channelName: string, eventName: string, arg: any): Event<T> {
const that = this; const that = this;
let disposables = new DisposableStore();
return { // Create an emitter which hooks up to all clients
call(_): Promise<T> { // as soon as first listener is added. It also
throw new Error('IPC broadcast channels are not supported for calls'); // disconnects from all clients as soon as the last listener
}, // is removed.
listen(eventName: string, arg: any): Event<T> { const emitter = new Emitter<T>({
let disposables = new DisposableStore(); onFirstListenerAdd: () => {
disposables = new DisposableStore();
// Create an emitter which hooks up to all clients
// as soon as first listener is added. It also // The event multiplexer is useful since the active
// disconnects from all clients as soon as the last listener // client list is dynamic. We need to hook up and disconnection
// is removed. // to/from clients as they come and go.
const emitter = new Emitter<T>({ const eventMultiplexer = new EventMultiplexer<T>();
onFirstListenerAdd: () => { const map = new Map<Connection<TContext>, IDisposable>();
disposables = new DisposableStore();
const onDidAddConnection = (connection: Connection<TContext>) => {
// The event multiplexer is useful since the active const channel = connection.channelClient.getChannel(channelName);
// client list is dynamic. We need to hook up and disconnection const event = channel.listen<T>(eventName, arg);
// to/from clients as they come and go. const disposable = eventMultiplexer.add(event);
const eventMultiplexer = new EventMultiplexer<T>();
const map = new Map<Connection<TContext>, IDisposable>(); map.set(connection, disposable);
};
const onDidAddConnection = (connection: Connection<TContext>) => {
const channel = connection.channelClient.getChannel(channelName); const onDidRemoveConnection = (connection: Connection<TContext>) => {
const event = channel.listen<T>(eventName, arg); const disposable = map.get(connection);
const disposable = eventMultiplexer.add(event);
if (!disposable) {
map.set(connection, disposable); return;
};
const onDidRemoveConnection = (connection: Connection<TContext>) => {
const disposable = map.get(connection);
if (!disposable) {
return;
}
disposable.dispose();
map.delete(connection);
};
that.connections.forEach(onDidAddConnection);
that.onDidAddConnection(onDidAddConnection, undefined, disposables);
that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables);
eventMultiplexer.event(emitter.fire, emitter, disposables);
disposables.add(eventMultiplexer);
},
onLastListenerRemove: () => {
disposables.dispose();
} }
});
return emitter.event; disposable.dispose();
map.delete(connection);
};
that.connections.forEach(onDidAddConnection);
that.onDidAddConnection(onDidAddConnection, undefined, disposables);
that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables);
eventMultiplexer.event(emitter.fire, emitter, disposables);
disposables.add(eventMultiplexer);
},
onLastListenerRemove: () => {
disposables.dispose();
} }
} as T; });
return emitter.event;
} }
registerChannel(channelName: string, channel: IServerChannel<TContext>): void { registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
......
...@@ -454,9 +454,9 @@ suite('Base IPC', function () { ...@@ -454,9 +454,9 @@ suite('Base IPC', function () {
client1.registerChannel('channel', clientChannel1); client1.registerChannel('channel', clientChannel1);
const pings: string[] = []; const pings: string[] = [];
const broadcastChannel = server.getBroadcastChannel('channel'); const channel = server.getChannel('channel');
const broadcastService = new TestChannelClient(broadcastChannel); const service = new TestChannelClient(channel);
broadcastService.onPong(msg => pings.push(msg)); service.onPong(msg => pings.push(msg));
await timeout(1); await timeout(1);
clientService1.ping('hello 1'); clientService1.ping('hello 1');
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册