From 260b72e2ab1acd5edf83bfd9908929a88043e8b1 Mon Sep 17 00:00:00 2001 From: Joao Moreno Date: Mon, 3 Feb 2020 14:58:27 +0100 Subject: [PATCH] ipc: multicast events --- src/vs/base/common/arrays.ts | 4 + src/vs/base/parts/ipc/common/ipc.ts | 114 ++++++++++---------- src/vs/base/parts/ipc/test/node/ipc.test.ts | 6 +- 3 files changed, 64 insertions(+), 60 deletions(-) diff --git a/src/vs/base/common/arrays.ts b/src/vs/base/common/arrays.ts index 8ddd98a5a82..0f300db10c6 100644 --- a/src/vs/base/common/arrays.ts +++ b/src/vs/base/common/arrays.ts @@ -584,3 +584,7 @@ export function mapArrayOrNot(items: T | T[], fn: (_: T) => U): U | U[] { export function asArray(x: T | T[]): T[] { return Array.isArray(x) ? x : [x]; } + +export function getRandomElement(arr: T[]): T { + return arr[Math.floor(Math.random() * arr.length)]; +} diff --git a/src/vs/base/parts/ipc/common/ipc.ts b/src/vs/base/parts/ipc/common/ipc.ts index e22fb97289e..397a44cb283 100644 --- a/src/vs/base/parts/ipc/common/ipc.ts +++ b/src/vs/base/parts/ipc/common/ipc.ts @@ -9,6 +9,7 @@ import { CancelablePromise, createCancelablePromise, timeout } from 'vs/base/com import { CancellationToken, CancellationTokenSource } from 'vs/base/common/cancellation'; import * as errors from 'vs/base/common/errors'; import { VSBuffer } from 'vs/base/common/buffer'; +import { getRandomElement } from 'vs/base/common/arrays'; /** * An `IChannel` is an abstraction over a collection of commands. @@ -117,8 +118,7 @@ export interface IClientRouter { * order to pick the right one. */ export interface IRoutingChannelClient { - getChannel(channelName: string, router: IClientRouter): T; - getBroadcastChannel(channelName: string): T; + getChannel(channelName: string, router?: IClientRouter): T; } interface IReader { @@ -700,18 +700,26 @@ export class IPCServer implements IChannelServer, I }); } - getChannel(channelName: string, router: IClientRouter): T { + getChannel(channelName: string, router?: IClientRouter): T { const that = this; return { call(command: string, arg?: any, cancellationToken?: CancellationToken): Promise { - const channelPromise = router.routeCall(that, command, arg) + const connectionPromise = router + ? router.routeCall(that, command, arg) + : Promise.resolve(getRandomElement(that.connections)); + + const channelPromise = connectionPromise .then(connection => (connection as Connection).channelClient.getChannel(channelName)); return getDelayedChannel(channelPromise) .call(command, arg, cancellationToken); }, listen(event: string, arg: any): Event { + if (!router) { + return that.getMulticastEvent(channelName, event, arg); + } + const channelPromise = router.routeEvent(that, event, arg) .then(connection => (connection as Connection).channelClient.getChannel(channelName)); @@ -721,64 +729,56 @@ export class IPCServer implements IChannelServer, I } as T; } - getBroadcastChannel(channelName: string): T { + private getMulticastEvent(channelName: string, eventName: string, arg: any): Event { const that = this; + let disposables = new DisposableStore(); - return { - call(_): Promise { - throw new Error('IPC broadcast channels are not supported for calls'); - }, - listen(eventName: string, arg: any): Event { - let disposables = new DisposableStore(); - - // Create an emitter which hooks up to all clients - // as soon as first listener is added. It also - // disconnects from all clients as soon as the last listener - // is removed. - const emitter = new Emitter({ - onFirstListenerAdd: () => { - disposables = new DisposableStore(); - - // The event multiplexer is useful since the active - // client list is dynamic. We need to hook up and disconnection - // to/from clients as they come and go. - const eventMultiplexer = new EventMultiplexer(); - const map = new Map, IDisposable>(); - - const onDidAddConnection = (connection: Connection) => { - const channel = connection.channelClient.getChannel(channelName); - const event = channel.listen(eventName, arg); - const disposable = eventMultiplexer.add(event); - - map.set(connection, disposable); - }; - - const onDidRemoveConnection = (connection: Connection) => { - const disposable = map.get(connection); - - if (!disposable) { - return; - } - - disposable.dispose(); - map.delete(connection); - }; - - that.connections.forEach(onDidAddConnection); - that.onDidAddConnection(onDidAddConnection, undefined, disposables); - that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables); - eventMultiplexer.event(emitter.fire, emitter, disposables); - - disposables.add(eventMultiplexer); - }, - onLastListenerRemove: () => { - disposables.dispose(); + // Create an emitter which hooks up to all clients + // as soon as first listener is added. It also + // disconnects from all clients as soon as the last listener + // is removed. + const emitter = new Emitter({ + onFirstListenerAdd: () => { + disposables = new DisposableStore(); + + // The event multiplexer is useful since the active + // client list is dynamic. We need to hook up and disconnection + // to/from clients as they come and go. + const eventMultiplexer = new EventMultiplexer(); + const map = new Map, IDisposable>(); + + const onDidAddConnection = (connection: Connection) => { + const channel = connection.channelClient.getChannel(channelName); + const event = channel.listen(eventName, arg); + const disposable = eventMultiplexer.add(event); + + map.set(connection, disposable); + }; + + const onDidRemoveConnection = (connection: Connection) => { + const disposable = map.get(connection); + + if (!disposable) { + return; } - }); - return emitter.event; + disposable.dispose(); + map.delete(connection); + }; + + that.connections.forEach(onDidAddConnection); + that.onDidAddConnection(onDidAddConnection, undefined, disposables); + that.onDidRemoveConnection(onDidRemoveConnection, undefined, disposables); + eventMultiplexer.event(emitter.fire, emitter, disposables); + + disposables.add(eventMultiplexer); + }, + onLastListenerRemove: () => { + disposables.dispose(); } - } as T; + }); + + return emitter.event; } registerChannel(channelName: string, channel: IServerChannel): void { diff --git a/src/vs/base/parts/ipc/test/node/ipc.test.ts b/src/vs/base/parts/ipc/test/node/ipc.test.ts index d0aeee23d47..5839c45dab7 100644 --- a/src/vs/base/parts/ipc/test/node/ipc.test.ts +++ b/src/vs/base/parts/ipc/test/node/ipc.test.ts @@ -454,9 +454,9 @@ suite('Base IPC', function () { client1.registerChannel('channel', clientChannel1); const pings: string[] = []; - const broadcastChannel = server.getBroadcastChannel('channel'); - const broadcastService = new TestChannelClient(broadcastChannel); - broadcastService.onPong(msg => pings.push(msg)); + const channel = server.getChannel('channel'); + const service = new TestChannelClient(channel); + service.onPong(msg => pings.push(msg)); await timeout(1); clientService1.ping('hello 1'); -- GitLab