未验证 提交 45c70c29 编写于 作者: A Alex Dima

Fixes #83672

上级 6f223f28
......@@ -533,6 +533,53 @@ class Queue<T> {
}
}
class LoadEstimator {
private static _HISTORY_LENGTH = 10;
private static _INSTANCE: LoadEstimator | null = null;
public static getInstance(): LoadEstimator {
if (!LoadEstimator._INSTANCE) {
LoadEstimator._INSTANCE = new LoadEstimator();
}
return LoadEstimator._INSTANCE;
}
private lastRuns: number[];
constructor() {
this.lastRuns = [];
const now = Date.now();
for (let i = 0; i < LoadEstimator._HISTORY_LENGTH; i++) {
this.lastRuns[i] = now - 1000 * i;
}
setInterval(() => {
for (let i = LoadEstimator._HISTORY_LENGTH; i >= 1; i--) {
this.lastRuns[i] = this.lastRuns[i - 1];
}
this.lastRuns[0] = Date.now();
}, 1000);
}
/**
* returns an estimative number, from 0 (low load) to 1 (high load)
*/
public load(): number {
const now = Date.now();
const historyLimit = (1 + LoadEstimator._HISTORY_LENGTH) * 1000;
let score = 0;
for (let i = 0; i < LoadEstimator._HISTORY_LENGTH; i++) {
if (now - this.lastRuns[i] <= historyLimit) {
score++;
}
}
return 1 - score / LoadEstimator._HISTORY_LENGTH;
}
public hasHighLoad(): boolean {
return this.load() >= 0.5;
}
}
/**
* Same as Protocol, but will actually track messages and acks.
* Moreover, it will ensure no messages are lost if there are no event listeners.
......@@ -559,6 +606,8 @@ export class PersistentProtocol implements IMessagePassingProtocol {
private _socketReader: ProtocolReader;
private _socketDisposables: IDisposable[];
private readonly _loadEstimator = LoadEstimator.getInstance();
private readonly _onControlMessage = new BufferedEmitter<VSBuffer>();
readonly onControlMessage: Event<VSBuffer> = this._onControlMessage.event;
......@@ -670,15 +719,19 @@ export class PersistentProtocol implements IMessagePassingProtocol {
const timeSinceLastIncomingMsg = Date.now() - this._socketReader.lastReadTime;
if (timeSinceLastIncomingMsg >= ProtocolConstants.KeepAliveTimeoutTime) {
// Trash the socket
this._onSocketTimeout.fire(undefined);
return;
// It's been a long time since we received a server message
// But this might be caused by the event loop being busy and failing to read messages
if (!this._loadEstimator.hasHighLoad()) {
// Trash the socket
this._onSocketTimeout.fire(undefined);
return;
}
}
this._incomingKeepAliveTimeout = setTimeout(() => {
this._incomingKeepAliveTimeout = null;
this._recvKeepAliveCheck();
}, ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg + 5);
}, Math.max(ProtocolConstants.KeepAliveTimeoutTime - timeSinceLastIncomingMsg, 0) + 5);
}
public getSocket(): ISocket {
......@@ -821,15 +874,19 @@ export class PersistentProtocol implements IMessagePassingProtocol {
const oldestUnacknowledgedMsg = this._outgoingUnackMsg.peek()!;
const timeSinceOldestUnacknowledgedMsg = Date.now() - oldestUnacknowledgedMsg.writtenTime;
if (timeSinceOldestUnacknowledgedMsg >= ProtocolConstants.AcknowledgeTimeoutTime) {
// Trash the socket
this._onSocketTimeout.fire(undefined);
return;
// It's been a long time since our sent message was acknowledged
// But this might be caused by the event loop being busy and failing to read messages
if (!this._loadEstimator.hasHighLoad()) {
// Trash the socket
this._onSocketTimeout.fire(undefined);
return;
}
}
this._outgoingAckTimeout = setTimeout(() => {
this._outgoingAckTimeout = null;
this._recvAckCheck();
}, ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg + 5);
}, Math.max(ProtocolConstants.AcknowledgeTimeoutTime - timeSinceOldestUnacknowledgedMsg, 0) + 5);
}
private _sendAck(): void {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册