mainThreadService.ts 8.4 KB
Newer Older
E
Erich Gamma 已提交
1 2 3 4 5 6
/*---------------------------------------------------------------------------------------------
 *  Copyright (c) Microsoft Corporation. All rights reserved.
 *  Licensed under the MIT License. See License.txt in the project root for license information.
 *--------------------------------------------------------------------------------------------*/
'use strict';

A
Alex Dima 已提交
7
import {TPromise} from 'vs/base/common/winjs.base';
E
Erich Gamma 已提交
8 9 10 11 12 13
import Worker = require('vs/base/common/worker/workerClient');
import abstractThreadService = require('vs/platform/thread/common/abstractThreadService');
import Env = require('vs/base/common/flags');
import Platform = require('vs/base/common/platform');
import remote = require('vs/base/common/remote');
import {SyncDescriptor0} from 'vs/platform/instantiation/common/descriptors';
A
Alex Dima 已提交
14
import {IThreadService, IThreadSynchronizableObject, ThreadAffinity} from 'vs/platform/thread/common/thread';
E
Erich Gamma 已提交
15 16 17 18
import {IWorkspaceContextService} from 'vs/platform/workspace/common/workspace';
import {DefaultWorkerFactory} from 'vs/base/worker/defaultWorkerFactory';

interface IAffinityMap {
B
Benjamin Pasero 已提交
19
	[qualifiedMethodName: string]: number;
E
Erich Gamma 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33
}

export interface IWorker {
	getRemoteCom(): remote.IRemoteCom;
}

export interface IWorkerListenr {
	(worker: IWorker): void;
}

export class MainThreadService extends abstractThreadService.AbstractThreadService implements IThreadService {
	public serviceId = IThreadService;
	static MAXIMUM_WORKER_CREATION_DELAY = 500; // 500ms

B
Benjamin Pasero 已提交
34 35 36
	private _workerPool: Worker.WorkerClient[];
	private _contextService: IWorkspaceContextService;
	private _affinityScrambler: IAffinityMap;
E
Erich Gamma 已提交
37

B
Benjamin Pasero 已提交
38 39
	private _workersCreatedPromise: TPromise<void>;
	private _triggerWorkersCreatedPromise: (value: void) => void;
E
Erich Gamma 已提交
40

B
Benjamin Pasero 已提交
41 42
	private _workerFactory: Worker.IWorkerFactory;
	private _workerModuleId: string;
43
	private _defaultWorkerCount: number;
E
Erich Gamma 已提交
44

45
	constructor(contextService: IWorkspaceContextService, workerModuleId: string, defaultWorkerCount: number) {
E
Erich Gamma 已提交
46 47 48
		super(true);
		this._contextService = contextService;
		this._workerModuleId = workerModuleId;
49
		this._defaultWorkerCount = defaultWorkerCount;
50
		this._workerFactory = new DefaultWorkerFactory(true);
E
Erich Gamma 已提交
51 52 53 54 55 56 57 58

		if (!this.isInMainThread) {
			throw new Error('Incorrect Service usage: this service must be used only in the main thread');
		}

		this._workerPool = [];
		this._affinityScrambler = {};

A
Alex Dima 已提交
59
		this._workersCreatedPromise = new TPromise<void>((c, e, p) => {
E
Erich Gamma 已提交
60 61 62 63 64 65
			this._triggerWorkersCreatedPromise = c;
		}, () => {
			// Not cancelable
		});

		// If nobody asks for workers to be created in 5s, the workers are created automatically
A
Alex Dima 已提交
66
		TPromise.timeout(MainThreadService.MAXIMUM_WORKER_CREATION_DELAY).then(() => this.ensureWorkers());
E
Erich Gamma 已提交
67 68 69 70 71 72
	}

	ensureWorkers(): void {
		if (this._triggerWorkersCreatedPromise) {
			// Workers not created yet

73
			let createCount = Env.workersCount(this._defaultWorkerCount);
E
Erich Gamma 已提交
74 75 76 77 78
			if (!Platform.hasWebWorkerSupport()) {
				// Create at most 1 compatibility worker
				createCount = Math.min(createCount, 1);
			}

B
Benjamin Pasero 已提交
79
			for (let i = 0; i < createCount; i++) {
E
Erich Gamma 已提交
80 81 82
				this._createWorker();
			}

B
Benjamin Pasero 已提交
83
			let complete = this._triggerWorkersCreatedPromise;
E
Erich Gamma 已提交
84 85 86 87 88
			this._triggerWorkersCreatedPromise = null;
			complete(null);
		}
	}

A
Alex Dima 已提交
89
	private _afterWorkers(): TPromise<void> {
B
Benjamin Pasero 已提交
90
		let shouldCancelPromise = false;
E
Erich Gamma 已提交
91

A
Alex Dima 已提交
92
		return new TPromise<void>((c, e, p) => {
E
Erich Gamma 已提交
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111

			// hide the initialize promise inside this
			// promise so that it won't be canceled by accident
			this._workersCreatedPromise.then(() => {
				if (!shouldCancelPromise) {
					c(null);
				}
			}, e, p);

		}, () => {
			// mark that this promise is canceled
			shouldCancelPromise = true;
		});
	}

	private _shortName(major: string, minor: string): string {
		return major.substring(major.length - 14) + '.' + minor.substr(0, 14);
	}

112
	private _createWorker(isRetry:boolean = false): void {
B
Benjamin Pasero 已提交
113
		let worker = new Worker.WorkerClient(
E
Erich Gamma 已提交
114 115 116 117 118 119 120
			this._workerFactory,
			this._workerModuleId,
			(msg) => {
				if (msg.type === 'threadService') {
					return this._shortName(msg.payload[0], msg.payload[1]);
				}
				return msg.type;
121
			}
E
Erich Gamma 已提交
122
		);
A
Alex Dima 已提交
123
		worker.getRemoteCom().setManyHandler(this);
E
Erich Gamma 已提交
124 125 126 127 128 129
		worker.onModuleLoaded = worker.request('initialize', {
			contextService: {
				workspace: this._contextService.getWorkspace(),
				configuration: this._contextService.getConfiguration(),
				options: this._contextService.getOptions()
			}
130 131 132 133 134 135 136 137 138 139 140 141 142
		}).then(null, (err) => {
			for (var i = 0; i < this._workerPool.length; i++) {
				if (this._workerPool[i] === worker) {
					this._workerPool.splice(i, 1);
					break;
				}
			}
			worker.dispose();
			if (isRetry) {
				console.warn('Creating the web worker already failed twice. Giving up!');
			} else {
				this._createWorker(true);
			}
E
Erich Gamma 已提交
143 144
		});

145
		this._workerPool.push(worker);
E
Erich Gamma 已提交
146 147
	}

148
	private _getWorkerIndex(obj: IThreadSynchronizableObject, affinity: ThreadAffinity): number {
E
Erich Gamma 已提交
149
		if (affinity === ThreadAffinity.None) {
B
Benjamin Pasero 已提交
150
			let winners: number[] = [0],
E
Erich Gamma 已提交
151 152
				winnersQueueSize = this._workerPool[0].getQueueSize();

B
Benjamin Pasero 已提交
153 154
			for (let i = 1; i < this._workerPool.length; i++) {
				let queueSize = this._workerPool[i].getQueueSize();
E
Erich Gamma 已提交
155 156 157 158 159 160 161 162 163 164 165
				if (queueSize < winnersQueueSize) {
					winnersQueueSize = queueSize;
					winners = [i];
				} else if (queueSize === winnersQueueSize) {
					winners.push(i);
				}
			}

			return winners[Math.floor(Math.random() * winners.length)];
		}

B
Benjamin Pasero 已提交
166
		let scramble = 0;
E
Erich Gamma 已提交
167 168 169 170 171 172 173 174 175 176
		if (this._affinityScrambler.hasOwnProperty(obj.getId())) {
			scramble = this._affinityScrambler[obj.getId()];
		} else {
			scramble = Math.floor(Math.random() * this._workerPool.length);
			this._affinityScrambler[obj.getId()] = scramble;
		}

		return (scramble + affinity) % this._workerPool.length;
	}

177
	OneWorker(obj: IThreadSynchronizableObject, methodName: string, target: Function, params: any[], affinity: ThreadAffinity): TPromise<any> {
E
Erich Gamma 已提交
178 179 180 181 182
		return this._afterWorkers().then(() => {
			if (this._workerPool.length === 0) {
				throw new Error('Cannot fulfill request...');
			}

B
Benjamin Pasero 已提交
183
			let workerIdx = this._getWorkerIndex(obj, affinity);
E
Erich Gamma 已提交
184 185 186 187 188

			return this._remoteCall(this._workerPool[workerIdx], obj, methodName, params);
		});
	}

189
	AllWorkers(obj: IThreadSynchronizableObject, methodName: string, target: Function, params: any[]): TPromise<any> {
E
Erich Gamma 已提交
190
		return this._afterWorkers().then(() => {
A
Alex Dima 已提交
191
			return TPromise.join(this._workerPool.map((w) => {
E
Erich Gamma 已提交
192 193 194 195 196
				return this._remoteCall(w, obj, methodName, params);
			}));
		});
	}

197
	private _remoteCall(worker: Worker.WorkerClient, obj: IThreadSynchronizableObject, methodName: string, params: any[]): TPromise<any> {
B
Benjamin Pasero 已提交
198
		let id = obj.getId();
E
Erich Gamma 已提交
199 200 201
		if (!id) {
			throw new Error('Synchronizable Objects must have an identifier');
		}
A
Alex Dima 已提交
202
		return worker.request('threadService', [id, methodName, params]);
E
Erich Gamma 已提交
203 204 205 206 207 208
	}

	protected _registerAndInstantiateMainProcessActor<T>(id: string, descriptor: SyncDescriptor0<T>): T {
		return this._getOrCreateLocalInstance(id, descriptor);
	}

B
Benjamin Pasero 已提交
209
	protected _registerMainProcessActor<T>(id: string, actor: T): void {
E
Erich Gamma 已提交
210 211 212
		this._registerLocalInstance(id, actor);
	}

A
Alex Dima 已提交
213 214
	protected _registerAndInstantiateExtHostActor<T>(id: string, descriptor: SyncDescriptor0<T>): T {
		throw new Error('Not supported in this runtime context: Cannot communicate to non-existant Extension Host!');
E
Erich Gamma 已提交
215 216
	}

A
Alex Dima 已提交
217
	protected _registerExtHostActor<T>(id: string, actor: T): void {
E
Erich Gamma 已提交
218 219 220
		throw new Error('Not supported in this runtime context!');
	}

B
Benjamin Pasero 已提交
221 222
	protected _registerAndInstantiateWorkerActor<T>(id: string, descriptor: SyncDescriptor0<T>, whichWorker: ThreadAffinity): T {
		let helper = this._createWorkerProxyHelper(whichWorker);
E
Erich Gamma 已提交
223 224 225
		return this._getOrCreateProxyInstance(helper, id, descriptor);
	}

B
Benjamin Pasero 已提交
226
	protected _registerWorkerActor<T>(id: string, actor: T): void {
E
Erich Gamma 已提交
227 228 229
		throw new Error('Not supported in this runtime context!');
	}

B
Benjamin Pasero 已提交
230
	private _createWorkerProxyHelper(whichWorker: ThreadAffinity): remote.IProxyHelper {
E
Erich Gamma 已提交
231
		return {
B
Benjamin Pasero 已提交
232
			callOnRemote: (proxyId: string, path: string, args: any[]): TPromise<any> => {
E
Erich Gamma 已提交
233 234 235 236 237
				return this._callOnWorker(whichWorker, proxyId, path, args);
			}
		};
	}

B
Benjamin Pasero 已提交
238
	private _callOnWorker(whichWorker: ThreadAffinity, proxyId: string, path: string, args: any[]): TPromise<any> {
E
Erich Gamma 已提交
239
		if (whichWorker === ThreadAffinity.None) {
A
Alex Dima 已提交
240
			return TPromise.as(null);
E
Erich Gamma 已提交
241 242 243 244
		}

		return this._afterWorkers().then(() => {
			if (whichWorker === ThreadAffinity.All) {
B
Benjamin Pasero 已提交
245
				let promises = this._workerPool.map(w => w.getRemoteCom()).map(rCom => rCom.callOnRemote(proxyId, path, args));
A
Alex Dima 已提交
246
				return TPromise.join(promises);
E
Erich Gamma 已提交
247 248
			}

B
Benjamin Pasero 已提交
249 250
			let workerIdx = whichWorker % this._workerPool.length;
			let worker = this._workerPool[workerIdx];
E
Erich Gamma 已提交
251 252 253 254
			return worker.getRemoteCom().callOnRemote(proxyId, path, args);
		});
	}
}