提交 705c4587 编写于 作者: A Alex Dima

Simplify worker protocol

上级 a25cfe1d
......@@ -4,10 +4,8 @@
*--------------------------------------------------------------------------------------------*/
'use strict';
import {onUnexpectedError, transformErrorForSerialization} from 'vs/base/common/errors';
import {onUnexpectedError} from 'vs/base/common/errors';
import {parse, stringify} from 'vs/base/common/marshalling';
import {IRemoteCom} from 'vs/base/common/remote';
import * as timer from 'vs/base/common/timer';
import {TPromise} from 'vs/base/common/winjs.base';
import * as workerProtocol from 'vs/base/common/worker/workerProtocol';
......@@ -51,22 +49,16 @@ export class WorkerClient {
private _messagesQueue:workerProtocol.IClientMessage[];
private _processQueueTimeout:number;
private _waitingForWorkerReply:boolean;
private _lastTimerEvent:timer.ITimerEvent;
private _remoteCom: workerProtocol.RemoteCom;
private _decodeMessageName: (msg: workerProtocol.IClientMessage) => string;
public onModuleLoaded:TPromise<void>;
constructor(workerFactory:IWorkerFactory, moduleId:string, decodeMessageName:(msg:workerProtocol.IClientMessage)=>string) {
this._decodeMessageName = decodeMessageName;
constructor(workerFactory:IWorkerFactory, moduleId:string) {
this._lastMessageId = 0;
this._promises = {};
this._messagesQueue = [];
this._processQueueTimeout = -1;
this._waitingForWorkerReply = false;
this._lastTimerEvent = null;
this._worker = workerFactory.create(
'vs/base/common/worker/workerServer',
......@@ -94,15 +86,9 @@ export class WorkerClient {
moduleId: moduleId,
loaderConfiguration: loaderConfiguration
});
this._remoteCom = new workerProtocol.RemoteCom(this);
}
public getRemoteCom(): IRemoteCom {
return this._remoteCom;
}
public request(requestName:string, payload:any, forceTimestamp?:number): TPromise<any> {
public request(requestName:string, payload:any): TPromise<any> {
if (requestName.charAt(0) === '$') {
throw new Error('Illegal requestName: ' + requestName);
......@@ -117,7 +103,7 @@ export class WorkerClient {
// promise so that it won't be canceled by accident
this.onModuleLoaded.then(() => {
if (!shouldCancelPromise) {
messagePromise = this._sendMessage(requestName, payload, forceTimestamp).then(c, e, p);
messagePromise = this._sendMessage(requestName, payload).then(c, e, p);
}
}, e, p);
......@@ -131,10 +117,6 @@ export class WorkerClient {
});
}
public destroy(): void {
this.dispose();
}
public dispose(): void {
let promises = Object.keys(this._promises);
if (promises.length > 0) {
......@@ -149,12 +131,12 @@ export class WorkerClient {
this._worker.dispose();
}
private _sendMessage(type:string, payload:any, forceTimestamp:number=(new Date()).getTime()):TPromise<any> {
private _sendMessage(type:string, payload:any):TPromise<any> {
let msg = {
id: ++this._lastMessageId,
type: type,
timestamp: forceTimestamp,
timestamp: Date.now(),
payload: payload
};
......@@ -235,7 +217,6 @@ export class WorkerClient {
}
this._waitingForWorkerReply = true;
let msg = this._messagesQueue.shift();
this._lastTimerEvent = timer.start(timer.Topic.WORKER, this._decodeMessageName(msg));
this._postMessage(msg);
}, delayUntilNextMessage);
}
......@@ -269,9 +250,6 @@ export class WorkerClient {
let serverReplyMessage = <workerProtocol.IServerReplyMessage>msg;
this._waitingForWorkerReply = false;
if(this._lastTimerEvent) {
this._lastTimerEvent.stop();
}
if (!this._promises.hasOwnProperty(String(serverReplyMessage.id))) {
this._onError('Received unexpected message from Worker:', msg);
......@@ -307,59 +285,12 @@ export class WorkerClient {
break;
default:
this._dispatchRequestFromWorker(msg);
this._onError('Received unexpected message from worker:', msg);
}
this._processMessagesQueue();
}
private _dispatchRequestFromWorker(msg:workerProtocol.IServerMessage): void {
this._handleWorkerRequest(msg).then((result) => {
let reply: workerProtocol.IClientReplyMessage = {
id: 0,
type: workerProtocol.MessageType.REPLY,
timestamp: (new Date()).getTime(),
seq: msg.req,
payload: (result instanceof Error ? transformErrorForSerialization(result) : result),
err: null
};
this._postMessage(reply);
}, (err) => {
let reply: workerProtocol.IClientReplyMessage = {
id: 0,
type: workerProtocol.MessageType.REPLY,
timestamp: (new Date()).getTime(),
seq: msg.req,
payload: null,
err: (err instanceof Error ? transformErrorForSerialization(err) : err)
};
this._postMessage(reply);
});
}
private _handleWorkerRequest(msg:workerProtocol.IServerMessage): TPromise<any> {
if (msg.type === '_proxyObj') {
return this._remoteCom.handleMessage(msg.payload);
}
if (typeof this[msg.type] === 'function') {
return this._invokeHandler(this[msg.type], this, msg.payload);
}
this._onError('Received unexpected message from Worker:', msg);
return TPromise.wrapError(new Error('No handler found'));
}
private _invokeHandler(handler:Function, handlerCtx:any, payload:any): TPromise<any> {
try {
return TPromise.as(handler.call(handlerCtx, payload));
} catch (err) {
return TPromise.wrapError(err);
}
}
_consoleLog(level:string, payload:any): void {
switch (level) {
case workerProtocol.PrintType.LOG:
......
......@@ -4,9 +4,6 @@
*--------------------------------------------------------------------------------------------*/
'use strict';
import {IManyHandler, IRemoteCom} from 'vs/base/common/remote';
import {TPromise} from 'vs/base/common/winjs.base';
/**
* A message sent from the UI thread to a worker
*/
......@@ -17,14 +14,6 @@ export interface IClientMessage {
payload:any;
}
/**
* A message sent from the UI thread in reply to a worker
*/
export interface IClientReplyMessage extends IClientMessage {
seq:string;
err:any;
}
/**
* A message sent from a worker to the UI thread
*/
......@@ -70,45 +59,3 @@ export var PrintType = {
WARN: 'warn',
ERROR: 'error'
};
export interface IRequester {
request(requestName: string, payload: any): TPromise<any>;
}
export class RemoteCom implements IRemoteCom {
private _requester: IRequester;
private _bigHandler: IManyHandler;
constructor(requester:IRequester) {
this._requester = requester;
this._bigHandler = null;
}
public callOnRemote(proxyId: string, path: string, args:any[]): TPromise<any> {
return this._requester.request('_proxyObj', {
proxyId: proxyId,
path: path,
args: args
});
}
public setManyHandler(handler:IManyHandler): void {
this._bigHandler = handler;
}
public handleMessage(msg: { proxyId: string; path: string; args: any[]; }): TPromise<any> {
if (!this._bigHandler) {
throw new Error('got message before big handler attached!');
}
return this._invokeHandler(msg.proxyId, msg.path, msg.args);
}
private _invokeHandler(rpcId:string, method:string, args:any[]): TPromise<any> {
try {
return TPromise.as(this._bigHandler.handle(rpcId, method, args));
} catch (err) {
return TPromise.wrapError(err);
}
}
}
\ No newline at end of file
......@@ -6,38 +6,19 @@
import {setUnexpectedErrorHandler, transformErrorForSerialization} from 'vs/base/common/errors';
import {parse, stringify} from 'vs/base/common/marshalling';
import {IRemoteCom} from 'vs/base/common/remote';
import {TPromise} from 'vs/base/common/winjs.base';
import * as workerProtocol from 'vs/base/common/worker/workerProtocol';
interface IReplyCallbacks {
c: (value:any)=>void;
e: (err:any)=>void;
p: (progress:any)=>void;
}
export class WorkerServer {
private _postSerializedMessage:(msg:string)=>void;
private _workerId:number;
private _requestHandler:any;
private _lastReq: number;
private _awaitedReplies: { [req:string]: IReplyCallbacks; };
private _remoteCom: workerProtocol.RemoteCom;
constructor(postSerializedMessage:(msg:string)=>void) {
this._postSerializedMessage = postSerializedMessage;
this._workerId = 0;
this._requestHandler = null;
this._lastReq = 0;
this._awaitedReplies = {};
this._bindConsole();
this._remoteCom = new workerProtocol.RemoteCom(this);
}
public getRemoteCom(): IRemoteCom {
return this._remoteCom;
}
private _bindConsole(): void {
......@@ -54,8 +35,8 @@ export class WorkerServer {
}
private _sendPrintMessage(level:string, ...objects:any[]): void {
var transformedObjects = objects.map((obj) => (obj instanceof Error) ? transformErrorForSerialization(obj) : obj);
var msg:workerProtocol.IServerPrintMessage = {
let transformedObjects = objects.map((obj) => (obj instanceof Error) ? transformErrorForSerialization(obj) : obj);
let msg:workerProtocol.IServerPrintMessage = {
monacoWorker: true,
from: this._workerId,
req: '0',
......@@ -67,7 +48,7 @@ export class WorkerServer {
}
private _sendReply(msgId:number, action:string, payload:any): void {
var msg:workerProtocol.IServerReplyMessage = {
let msg:workerProtocol.IServerReplyMessage = {
monacoWorker: true,
from: this._workerId,
req: '0',
......@@ -79,40 +60,6 @@ export class WorkerServer {
this._postMessage(msg);
}
public request(requestName:string, payload:any): TPromise<any> {
if (requestName.charAt(0) === '$') {
throw new Error('Illegal requestName: ' + requestName);
}
var req = String(++this._lastReq);
var msg:workerProtocol.IServerMessage = {
monacoWorker: true,
from: this._workerId,
req: req,
type: requestName,
payload: payload
};
var reply: IReplyCallbacks = {
c: null,
e: null,
p: null
};
var r = new TPromise<any>((c, e, p) => {
reply.c = c;
reply.e = e;
reply.p = p;
});
this._awaitedReplies[req] = reply;
this._postMessage(msg);
return r;
}
public loadModule(moduleId:string, callback:Function, errorback:(err:any)=>void): void {
// Use the global require to be sure to get the global config
(<any>self).require([moduleId], (...result:any[]) => {
......@@ -130,37 +77,15 @@ export class WorkerServer {
private _onmessage(msg:workerProtocol.IClientMessage): void {
if (msg.type === workerProtocol.MessageType.REPLY) {
// this message is a reply to a request we've made to the main thread previously
var typedMsg = <workerProtocol.IClientReplyMessage>msg;
if (!typedMsg.seq || !this._awaitedReplies.hasOwnProperty(typedMsg.seq)) {
console.error('Worker received unexpected reply from main thread', msg);
return;
}
var reply = this._awaitedReplies[typedMsg.seq];
delete this._awaitedReplies[typedMsg.seq];
if (typedMsg.err) {
reply.e(typedMsg.err);
} else {
reply.c(typedMsg.payload);
}
return;
}
var c = this._sendReply.bind(this, msg.id, workerProtocol.ReplyType.COMPLETE);
var e = this._sendReply.bind(this, msg.id, workerProtocol.ReplyType.ERROR);
var p = this._sendReply.bind(this, msg.id, workerProtocol.ReplyType.PROGRESS);
let c = this._sendReply.bind(this, msg.id, workerProtocol.ReplyType.COMPLETE);
let e = this._sendReply.bind(this, msg.id, workerProtocol.ReplyType.ERROR);
let p = this._sendReply.bind(this, msg.id, workerProtocol.ReplyType.PROGRESS);
switch(msg.type) {
case workerProtocol.MessageType.INITIALIZE:
this._workerId = msg.payload.id;
var loaderConfig = msg.payload.loaderConfiguration;
let loaderConfig = msg.payload.loaderConfiguration;
// TODO@Alex: share this code with simpleWorker
if (loaderConfig) {
// Remove 'baseUrl', handling it is beyond scope for now
......@@ -198,29 +123,21 @@ export class WorkerServer {
}
private _handleMessage(msg:workerProtocol.IClientMessage, c:(value:any)=>void, e:(err:any)=>void, p:(progress:any)=>void): void {
if (msg.type === '_proxyObj') {
this._remoteCom.handleMessage(msg.payload).then(c, e, p);
if (!this._requestHandler) {
e('Request handler not loaded');
return;
}
if (!this._requestHandler) {
e('Request handler not loaded');
let handlerMethod = this._requestHandler[msg.type];
if (typeof handlerMethod !== 'function') {
e('Handler does not have method ' + msg.type);
return;
}
if ((msg.type in this._requestHandler) && (typeof this._requestHandler[msg.type] === 'function')) {
// var now = (new Date()).getTime();
try {
this._requestHandler[msg.type].call(this._requestHandler, this, c, e, p, msg.payload);
} catch (handlerError) {
e(transformErrorForSerialization(handlerError));
}
// var what = msg.type;
// console.info(what + ' took ' + ((new Date().getTime())-now));
} else {
this._requestHandler.request(this, c, e, p, msg);
try {
handlerMethod.call(this._requestHandler, this, c, e, p, msg.payload);
} catch (handlerError) {
e(transformErrorForSerialization(handlerError));
}
}
}
......
......@@ -105,10 +105,6 @@ export class MainThreadCompatWorkerService implements ICompatWorkerService {
this._call('$', 'instantiateCompatMode', [compatMode.getId()]);
}
public handle(rpcId: string, methodName: string, args: any[]): any {
throw new Error('Not supported!');
}
public CompatWorker(obj: ICompatMode, methodName: string, target: Function, param: any[]): TPromise<any> {
return this._call(obj.getId(), methodName, param);
}
......@@ -146,22 +142,11 @@ export class MainThreadCompatWorkerService implements ICompatWorkerService {
});
}
private _shortName(major: string, minor: string): string {
return major.substring(major.length - 14) + '.' + minor.substr(0, 14);
}
private _createWorker(isRetry:boolean = false): void {
this._worker = new WorkerClient(
this._workerFactory,
'vs/editor/common/worker/editorWorkerServer',
(msg) => {
if (msg.type === 'threadService') {
return this._shortName(msg.payload[0], msg.payload[1]);
}
return msg.type;
}
'vs/editor/common/worker/editorWorkerServer'
);
this._worker.getRemoteCom().setManyHandler(this);
this._worker.onModuleLoaded = this._worker.request('initialize', {
contextService: {
workspace: this._contextService.getWorkspace(),
......@@ -191,7 +176,11 @@ export class MainThreadCompatWorkerService implements ICompatWorkerService {
if (this._worker === null) {
throw new Error('Cannot fulfill request...');
}
return this._worker.getRemoteCom().callOnRemote(rpcId, methodName, args);
return this._worker.request('request', {
target: rpcId,
methodName: methodName,
args: args
});
});
}
}
......@@ -6,7 +6,6 @@
import {TPromise} from 'vs/base/common/winjs.base';
import {ICompatWorkerService, ICompatMode, IRawModelData} from 'vs/editor/common/services/compatWorkerService';
import {IRemoteCom} from 'vs/base/common/remote';
import {IResourceService} from 'vs/editor/common/services/resourceService';
import {ILanguageExtensionPoint, IModeService} from 'vs/editor/common/services/modeService';
import {IMirrorModelEvents, MirrorModel} from 'vs/editor/common/model/mirrorModel';
......@@ -22,13 +21,11 @@ export class CompatWorkerServiceWorker implements ICompatWorkerService {
constructor(
@IResourceService private resourceService: IResourceService,
@IModeService private modeService: IModeService,
remoteCom: IRemoteCom,
modesRegistryData: {
compatModes: ILegacyLanguageDefinition[];
languages: ILanguageExtensionPoint[];
}
) {
remoteCom.setManyHandler(this);
ModesRegistry.registerCompatModes(modesRegistryData.compatModes);
ModesRegistry.registerLanguages(modesRegistryData.languages);
this._compatModes = Object.create(null);
......@@ -38,7 +35,7 @@ export class CompatWorkerServiceWorker implements ICompatWorkerService {
this._compatModes[compatMode.getId()] = compatMode;
}
public handle(rpcId: string, methodName: string, args: any[]): any {
public handleMainRequest(rpcId: string, methodName: string, args: any[]): any {
if (rpcId === '$') {
switch (methodName) {
case 'acceptNewModel':
......
......@@ -105,14 +105,20 @@ export class EditorWorkerServer {
const modeService = new ModeServiceImpl(instantiationService, extensionService);
services.set(IModeService, modeService);
this.compatWorkerService = new CompatWorkerServiceWorker(resourceService, modeService, mainThread.getRemoteCom(), initData.modesRegistryData);
this.compatWorkerService = new CompatWorkerServiceWorker(resourceService, modeService, initData.modesRegistryData);
services.set(ICompatWorkerService, this.compatWorkerService);
complete(undefined);
}
public request(mainThread:WorkerServer, complete:ICallback, error:ICallback, progress:ICallback, data:any):void {
throw new Error('unexpected!');
try {
TPromise.as(
this.compatWorkerService.handleMainRequest(data.target, data.methodName, data.args)
).then(complete, error);
} catch (err) {
error(err);
}
}
}
......
......@@ -7,12 +7,11 @@
import {AbstractThreadService} from 'vs/platform/thread/common/abstractThreadService';
import {SyncDescriptor0} from 'vs/platform/instantiation/common/descriptors';
import {IThreadService} from 'vs/platform/thread/common/thread';
import {IWorkspaceContextService} from 'vs/platform/workspace/common/workspace';
export abstract class CommonMainThreadService extends AbstractThreadService implements IThreadService {
public serviceId = IThreadService;
constructor(contextService: IWorkspaceContextService, workerModuleId: string) {
constructor() {
super();
}
......
......@@ -47,7 +47,7 @@ export class MainThreadService extends CommonMainThreadService {
@IWindowService windowService: IWindowService,
@ILifecycleService lifecycleService: ILifecycleService
) {
super(contextService, 'vs/editor/common/worker/editorWorkerServer');
super();
this.extensionHostProcessManager = new ExtensionHostProcessManager(contextService, messageService, windowService, lifecycleService);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册