提交 2af4677f 编写于 作者: J Joao Moreno

implement timeout mechanism for missing ipc channels

fixes #72531
上级 3fe6a332
......@@ -246,19 +246,29 @@ function deserialize(reader: IReader): any {
}
}
interface PendingRequest {
request: IRawPromiseRequest | IRawEventListenRequest;
timeoutTimer: NodeJS.Timer;
}
export class ChannelServer<TContext = string> implements IChannelServer<TContext>, IDisposable {
private channels = new Map<string, IServerChannel<TContext>>();
private activeRequests = new Map<number, IDisposable>();
private protocolListener: IDisposable | null;
constructor(private protocol: IMessagePassingProtocol, private ctx: TContext) {
// Requests might come in for channels which are not yet registered.
// They will timeout after `timeoutDelay`.
private pendingRequests = new Map<string, PendingRequest[]>();
constructor(private protocol: IMessagePassingProtocol, private ctx: TContext, private timeoutDelay: number = 1000) {
this.protocolListener = this.protocol.onMessage(msg => this.onRawMessage(msg));
this.sendResponse({ type: ResponseType.Initialize });
}
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
this.channels.set(channelName, channel);
this.flushPendingRequests(channelName);
}
private sendResponse(response: IRawResponse): void {
......@@ -311,11 +321,7 @@ export class ChannelServer<TContext = string> implements IChannelServer<TContext
const channel = this.channels.get(request.channelName);
if (!channel) {
this.sendResponse(<IRawResponse>{
id: request.id,
data: { message: request.channelName, name: 'Unknown channel', stack: undefined },
type: ResponseType.PromiseError
});
this.collectPendingRequest(request);
return;
}
......@@ -357,7 +363,7 @@ export class ChannelServer<TContext = string> implements IChannelServer<TContext
const channel = this.channels.get(request.channelName);
if (!channel) {
console.error(`Unknown channel: ${request.channelName}`);
this.collectPendingRequest(request);
return;
}
......@@ -377,6 +383,46 @@ export class ChannelServer<TContext = string> implements IChannelServer<TContext
}
}
private collectPendingRequest(request: IRawPromiseRequest | IRawEventListenRequest): void {
let pendingRequests = this.pendingRequests.get(request.channelName);
if (!pendingRequests) {
pendingRequests = [];
this.pendingRequests.set(request.channelName, pendingRequests);
}
const timer = setTimeout(() => {
console.error(`Unknown channel: ${request.channelName}`);
if (request.type === RequestType.Promise) {
this.sendResponse(<IRawResponse>{
id: request.id,
data: { name: 'Unknown channel', message: `Channel name '${request.channelName}' timed out after ${this.timeoutDelay}ms`, stack: undefined },
type: ResponseType.PromiseError
});
}
}, this.timeoutDelay);
pendingRequests.push({ request, timeoutTimer: timer });
}
private flushPendingRequests(channelName: string): void {
const requests = this.pendingRequests.get(channelName);
if (requests) {
for (const request of requests) {
clearTimeout(request.timeoutTimer);
switch (request.request.type) {
case RequestType.Promise: this.onPromise(request.request); break;
case RequestType.EventListen: this.onEventListen(request.request); break;
}
}
this.pendingRequests.delete(channelName);
}
}
public dispose(): void {
if (this.protocolListener) {
this.protocolListener.dispose();
......@@ -596,6 +642,7 @@ export interface ClientConnectionEvent {
}
interface Connection<TContext> extends Client<TContext> {
readonly channelServer: ChannelServer<TContext>;
readonly channelClient: ChannelClient;
}
......@@ -634,7 +681,7 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
this.channels.forEach((channel, name) => channelServer.registerChannel(name, channel));
const connection: Connection<TContext> = { channelClient, ctx };
const connection: Connection<TContext> = { channelServer, channelClient, ctx };
this._connections.add(connection);
this._onDidChangeConnections.fire(connection);
......@@ -670,6 +717,10 @@ export class IPCServer<TContext = string> implements IChannelServer<TContext>, I
registerChannel(channelName: string, channel: IServerChannel<TContext>): void {
this.channels.set(channelName, channel);
for (const connection of this._connections) {
connection.channelServer.registerChannel(channelName, channel);
}
}
dispose(): void {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册