proxy.ts 5.7 KB
Newer Older
A
Asher 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
import { EventEmitter } from "events";
import { isPromise } from "./util";

// tslint:disable no-any

/**
 * Allow using a proxy like it's returned synchronously. This only works because
 * all proxy methods return promises.
 */
const unpromisify = <T extends ServerProxy>(proxyPromise: Promise<T>): T => {
	return new Proxy({}, {
		get: (target: any, name: string): any => {
			if (typeof target[name] === "undefined") {
				target[name] = async (...args: any[]): Promise<any> => {
					const proxy = await proxyPromise;

					return proxy ? (proxy as any)[name](...args) : undefined;
				};
			}

			return target[name];
		},
	});
};

/**
 * Client-side emitter that just forwards proxy events to its own emitter.
 * It also turns a promisified proxy into a non-promisified proxy so we don't
 * need a bunch of `then` calls everywhere.
 */
export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter {
A
Asher 已提交
32
	private _proxy: T | undefined;
A
Asher 已提交
33 34 35 36 37

	/**
	 * You can specify not to bind events in order to avoid emitting twice for
	 * duplex streams.
	 */
A
Asher 已提交
38 39 40 41
	public constructor(
		proxyPromise: Promise<T> | T,
		private readonly bindEvents: boolean = true,
	) {
A
Asher 已提交
42
		super();
A
Asher 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55
		this.initialize(proxyPromise);
		if (this.bindEvents) {
			this.on("disconnected", (error) => {
				try {
					this.emit("error", error);
				} catch (error) {
					// If nothing is listening, EventEmitter will throw an error.
				}
				this.handleDisconnect();
			});
		}
	}

A
Asher 已提交
56 57 58 59 60 61 62 63 64 65 66
	/**
	 * Remove an event listener.
	 */
	public off(event: string, cb: (...args: any[]) => void): this {
		// Fill it here because the fill we're using to provide EventEmitter for the
		// browser doesn't appear to include `off`.
		this.removeListener(event, cb);

		return this;
	}

A
Asher 已提交
67 68 69 70 71 72 73 74
	protected get proxy(): T {
		if (!this._proxy) {
			throw new Error("not initialized");
		}

		return this._proxy;
	}

75 76 77 78
	/**
	 * Initialize the proxy by unpromisifying if necessary and binding to its
	 * events.
	 */
A
Asher 已提交
79 80 81
	protected initialize(proxyPromise: Promise<T> | T): void {
		this._proxy = isPromise(proxyPromise) ? unpromisify(proxyPromise) : proxyPromise;
		if (this.bindEvents) {
82
			this.catch(this.proxy.onEvent((event, ...args): void => {
A
Asher 已提交
83
				this.emit(event, ...args);
84
			}));
A
Asher 已提交
85 86
		}
	}
A
Asher 已提交
87

88 89 90
	/**
	 * Perform necessary cleanup on disconnect (or reconnect).
	 */
A
Asher 已提交
91
	protected abstract handleDisconnect(): void;
92 93 94 95 96 97 98 99 100 101 102

	/**
	 * Emit an error event if the promise errors.
	 */
	protected catch(promise?: Promise<any>): this {
		if (promise) {
			promise.catch((e) => this.emit("error", e));
		}

		return this;
	}
A
Asher 已提交
103 104 105 106 107 108 109 110 111 112
}

/**
 * Proxy to the actual instance on the server. Every method must only accept
 * serializable arguments and must return promises with serializable values. If
 * a proxy itself has proxies on creation (like how ChildProcess has stdin),
 * then it should return all of those at once, otherwise you will miss events
 * from those child proxies and fail to dispose them properly.
 */
export interface ServerProxy {
113 114 115
	/**
	 * Dispose the proxy.
	 */
A
Asher 已提交
116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
	dispose(): Promise<void>;

	/**
	 * This is used instead of an event to force it to be implemented since there
	 * would be no guarantee the implementation would remember to emit the event.
	 */
	onDone(cb: () => void): Promise<void>;

	/**
	 * Listen to all possible events. On the client, this is to reduce boilerplate
	 * that would just be a bunch of error-prone forwarding of each individual
	 * event from the proxy to its own emitter. It also fixes a timing issue
	 * because we just always send all events from the server, so we never miss
	 * any due to listening too late.
	 */
	// tslint:disable-next-line no-any
	onEvent(cb: (event: string, ...args: any[]) => void): Promise<void>;
}

135 136 137
/**
 * Supported top-level module proxies.
 */
A
Asher 已提交
138 139 140 141 142 143 144 145
export enum Module {
	Fs = "fs",
	ChildProcess = "child_process",
	Net = "net",
	Spdlog = "spdlog",
	NodePty = "node-pty",
	Trash = "trash",
}
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164

interface BatchItem<T, A> {
	args: A;
	resolve: (t: T) => void;
	reject: (e: Error) => void;
}

/**
 * Batch remote calls.
 */
export abstract class Batch<T, A> {
	private idleTimeout: number | NodeJS.Timer | undefined;
	private maxTimeout: number | NodeJS.Timer | undefined;
	private batch = <BatchItem<T, A>[]>[];

	public constructor(
		/**
		 * Flush after reaching this amount of time.
		 */
165
		private readonly maxTime: number = 1000,
166 167 168
		/**
		 * Flush after reaching this count.
		 */
169
		private readonly maxCount: number = 100,
170 171
		/**
		 * Flush after not receiving more requests for this amount of time.
A
Asher 已提交
172 173
		 * This is pretty low by default so essentially we just end up batching
		 * requests that are all made at the same time.
174
		 */
A
Asher 已提交
175
		private readonly idleTime: number = 1,
176 177 178
	) {}

	public add = (args: A): Promise<T> => {
179
		return new Promise((resolve, reject): void => {
180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
			this.batch.push({
				args,
				resolve,
				reject,
			});
			if (this.batch.length >= this.maxCount) {
				this.flush();
			} else {
				clearTimeout(this.idleTimeout as any);
				this.idleTimeout = setTimeout(this.flush, this.idleTime);
				if (typeof this.maxTimeout === "undefined") {
					this.maxTimeout = setTimeout(this.flush, this.maxTime);
				}
			}
		});
	}

197 198 199
	/**
	 * Perform remote call for a batch.
	 */
200 201
	protected abstract remoteCall(batch: A[]): Promise<(T | Error)[]>;

202 203 204 205
	/**
	 * Flush out the current batch.
	 */
	private readonly flush = (): void => {
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
		clearTimeout(this.idleTimeout as any);
		clearTimeout(this.maxTimeout as any);
		this.maxTimeout = undefined;

		const batch = this.batch;
		this.batch = [];

		this.remoteCall(batch.map((q) => q.args)).then((results) => {
			batch.forEach((item, i) => {
				const result = results[i];
				if (result && result instanceof Error) {
					item.reject(result);
				} else {
					item.resolve(result);
				}
			});
		}).catch((error) => batch.forEach((item) => item.reject(error)));
	}
}