提交 f7de095e 编写于 作者: J Joao Moreno

cleanup ipc

上级 a5ff3ce6
......@@ -6,8 +6,9 @@
'use strict';
import { Promise, TPromise } from 'vs/base/common/winjs.base';
import { IDisposable, toDisposable, dispose } from 'vs/base/common/lifecycle';
import { IDisposable, toDisposable } from 'vs/base/common/lifecycle';
import { Event, Emitter, once, filterEvent, toPromise, Relay } from 'vs/base/common/event';
import { always } from 'vs/base/common/async';
export enum RequestType {
Promise = 100,
......@@ -22,11 +23,6 @@ type IRawEventListenRequest = { type: RequestType.EventListen; id: number; chann
type IRawEventDisposeRequest = { type: RequestType.EventDispose, id: number };
type IRawRequest = IRawPromiseRequest | IRawPromiseCancelRequest | IRawEventListenRequest | IRawEventDisposeRequest;
interface IRequest {
raw: IRawRequest;
flush?: () => void;
}
export enum ResponseType {
Initialize = 200,
PromiseSuccess = 201,
......@@ -168,6 +164,43 @@ export class ChannelServer implements IChannelServer, IDisposable {
this.channels[channelName] = channel;
}
private sendResponse(response: IRawResponse): void {
switch (response.type) {
case ResponseType.Initialize:
return this.sendBuffer(serialize([response.type]));
case ResponseType.PromiseSuccess:
case ResponseType.PromiseError:
case ResponseType.EventFire:
case ResponseType.PromiseErrorObj:
return this.sendBuffer(serialize([response.type, response.id], response.data));
}
}
private sendBuffer(message: Buffer): void {
try {
this.protocol.send(message);
} catch (err) {
// noop
}
}
private onRawMessage(message: Buffer): void {
const { header, body } = deserialize(message);
const type = header[0] as RequestType;
switch (type) {
case RequestType.Promise:
return this.onPromise({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
case RequestType.EventListen:
return this.onEventListen({ type, id: header[1], channelName: header[2], name: header[3], arg: body });
case RequestType.PromiseCancel:
return this.disposeActiveRequest({ type, id: header[1] });
case RequestType.EventDispose:
return this.disposeActiveRequest({ type, id: header[1] });
}
}
private onPromise(request: IRawPromiseRequest): void {
const channel = this.channels[request.channelName];
let promise: Promise;
......@@ -221,70 +254,6 @@ export class ChannelServer implements IChannelServer, IDisposable {
}
}
private onRawMessage(message: Buffer): void {
const { header, body } = deserialize(message);
const type: RequestType = header[0];
let request: IRawRequest;
switch (type) {
case RequestType.Promise:
case RequestType.EventListen:
request = { type: header[0], id: header[1], channelName: header[2], name: header[3], arg: body };
break;
case RequestType.PromiseCancel:
case RequestType.EventDispose:
request = { type: header[0], id: header[1] };
break;
default:
return;
}
this.onRequest(request);
}
private onRequest(request: IRawRequest): void {
switch (request.type) {
case RequestType.Promise:
this.onPromise(request);
break;
case RequestType.EventListen:
this.onEventListen(request);
break;
case RequestType.PromiseCancel:
case RequestType.EventDispose:
this.disposeActiveRequest(request);
break;
}
}
private sendResponse(response: IRawResponse) {
let buffer: Buffer;
switch (response.type) {
case ResponseType.Initialize:
buffer = serialize([response.type]);
break;
case ResponseType.PromiseSuccess:
case ResponseType.PromiseError:
case ResponseType.EventFire:
case ResponseType.PromiseErrorObj:
buffer = serialize([response.type, response.id], response.data);
break;
}
this.sendRawMessage(buffer);
}
private sendRawMessage(message: Buffer) {
try {
this.protocol.send(message);
} catch (err) {
// noop
}
}
public dispose(): void {
this.protocolListener.dispose();
this.protocolListener = null;
......@@ -300,8 +269,7 @@ export class ChannelServer implements IChannelServer, IDisposable {
export class ChannelClient implements IChannelClient, IDisposable {
private state: State = State.Uninitialized;
private activeRequests: IDisposable[] = [];
private bufferedRequests: IRequest[] = [];
private activeRequests = new Set<IDisposable>();
private handlers: { [id: number]: IHandler; } = Object.create(null);
private lastRequestId: number = 0;
private protocolListener: IDisposable;
......@@ -310,7 +278,7 @@ export class ChannelClient implements IChannelClient, IDisposable {
readonly onDidInitialize = this._onDidInitialize.event;
constructor(private protocol: IMessagePassingProtocol) {
this.protocolListener = this.protocol.onMessage(msg => this.onRawMessage(msg));
this.protocolListener = this.protocol.onMessage(msg => this.onBuffer(msg));
}
getChannel<T extends IChannel>(channelName: string): T {
......@@ -323,18 +291,41 @@ export class ChannelClient implements IChannelClient, IDisposable {
private requestPromise(channelName: string, name: string, arg: any): TPromise<any> {
const id = this.lastRequestId++;
const type = RequestType.Promise;
const request: IRequest = { raw: { id, type, channelName, name, arg } };
const activeRequest = this.state === State.Uninitialized
? this.bufferRequest(request)
: this.doRequest(request);
const request: IRawRequest = { id, type, channelName, name, arg };
const activeRequest = this.whenInitialized().then(() => {
const id = request.id;
return new TPromise((c, e) => {
this.handlers[id] = response => {
switch (response.type) {
case ResponseType.PromiseSuccess:
delete this.handlers[id];
c(response.data);
break;
case ResponseType.PromiseError:
delete this.handlers[id];
const error = new Error(response.data.message);
(<any>error).stack = response.data.stack;
error.name = response.data.name;
e(error);
break;
case ResponseType.PromiseErrorObj:
delete this.handlers[id];
e(response.data);
break;
}
};
this.sendRequest(request);
}, () => this.sendRequest({ id, type: RequestType.PromiseCancel }));
});
const disposable = toDisposable(() => activeRequest.cancel());
this.activeRequests.push(disposable);
activeRequest
.then(null, _ => null)
.then(() => this.activeRequests = this.activeRequests.filter(el => el !== disposable));
this.activeRequests.add(disposable);
always(activeRequest, () => this.activeRequests.delete(disposable));
return activeRequest;
}
......@@ -342,16 +333,17 @@ export class ChannelClient implements IChannelClient, IDisposable {
private requestEvent(channelName: string, name: string, arg: any): Event<any> {
const id = this.lastRequestId++;
const type = RequestType.EventListen;
const raw: IRawRequest = { id, type, channelName, name, arg };
const request: IRequest = { raw };
const request: IRawRequest = { id, type, channelName, name, arg };
let uninitializedPromise: TPromise<any> | null = null;
const emitter = new Emitter<any>({
onFirstListenerAdd: () => {
uninitializedPromise = this.whenInitialized();
uninitializedPromise.then(() => {
uninitializedPromise = null;
this.sendRequest(request.raw);
this.activeRequests.add(emitter);
this.sendRequest(request);
});
},
onLastListenerRemove: () => {
......@@ -359,6 +351,7 @@ export class ChannelClient implements IChannelClient, IDisposable {
uninitializedPromise.cancel();
uninitializedPromise = null;
} else {
this.activeRequests.delete(emitter);
this.sendRequest({ id, type: RequestType.EventDispose });
}
}
......@@ -368,131 +361,56 @@ export class ChannelClient implements IChannelClient, IDisposable {
return emitter.event;
}
private doRequest(request: IRequest): Promise {
const id = request.raw.id;
return new TPromise((c, e) => {
this.handlers[id] = response => {
switch (response.type) {
case ResponseType.PromiseSuccess:
delete this.handlers[id];
c(response.data);
break;
case ResponseType.PromiseError:
delete this.handlers[id];
const error = new Error(response.data.message);
(<any>error).stack = response.data.stack;
error.name = response.data.name;
e(error);
break;
case ResponseType.PromiseErrorObj:
delete this.handlers[id];
e(response.data);
break;
}
};
private sendRequest(request: IRawRequest): void {
switch (request.type) {
case RequestType.Promise:
case RequestType.EventListen:
return this.sendBuffer(serialize([request.type, request.id, request.channelName, request.name], request.arg));
this.sendRequest(request.raw);
},
() => this.sendRequest({ id, type: RequestType.PromiseCancel }));
case RequestType.PromiseCancel:
case RequestType.EventDispose:
return this.sendBuffer(serialize([request.type, request.id]));
}
}
private bufferRequest(request: IRequest): Promise {
let flushedRequest: Promise = null;
return new TPromise((c, e) => {
this.bufferedRequests.push(request);
request.flush = () => {
request.flush = null;
flushedRequest = this.doRequest(request).then(c, e);
};
}, () => {
request.flush = null;
if (this.state !== State.Uninitialized) {
if (flushedRequest) {
flushedRequest.cancel();
flushedRequest = null;
}
return;
}
const idx = this.bufferedRequests.indexOf(request);
if (idx === -1) {
return;
}
this.bufferedRequests.splice(idx, 1);
});
private sendBuffer(message: Buffer): void {
try {
this.protocol.send(message);
} catch (err) {
// noop
}
}
private onRawMessage(message: Buffer): void {
private onBuffer(message: Buffer): void {
const { header, body } = deserialize(message);
const type: ResponseType = header[0];
let response: IRawResponse;
switch (type) {
case ResponseType.Initialize:
response = { type: header[0] };
break;
return this.onResponse({ type: header[0] });
case ResponseType.PromiseSuccess:
case ResponseType.PromiseError:
case ResponseType.EventFire:
case ResponseType.PromiseErrorObj:
response = { type: header[0], id: header[1], data: body };
break;
default:
return;
return this.onResponse({ type: header[0], id: header[1], data: body });
}
this.onResponse(response);
}
private onResponse(response: IRawResponse): void {
if (response.type === ResponseType.Initialize) {
this.state = State.Idle;
this._onDidInitialize.fire();
this.bufferedRequests.forEach(r => r.flush && r.flush());
this.bufferedRequests = null;
return;
}
const handler = this.handlers[response.id];
if (handler) {
handler(response);
}
}
private sendRequest(request: IRawRequest) {
let buffer: Buffer;
switch (request.type) {
case RequestType.Promise:
case RequestType.EventListen:
buffer = serialize([request.type, request.id, request.channelName, request.name], request.arg);
break;
case RequestType.PromiseCancel:
case RequestType.EventDispose:
buffer = serialize([request.type, request.id]);
break;
}
this.sendRawMessage(buffer);
}
private sendRawMessage(message: Buffer) {
try {
this.protocol.send(message);
} catch (err) {
// noop
}
}
private whenInitialized(): TPromise<void> {
if (this.state === State.Idle) {
return TPromise.as(null);
......@@ -504,8 +422,8 @@ export class ChannelClient implements IChannelClient, IDisposable {
dispose(): void {
this.protocolListener.dispose();
this.protocolListener = null;
this.activeRequests = dispose(this.activeRequests);
this.activeRequests.forEach(p => p.dispose());
this.activeRequests.clear();
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册