提交 4c2f7756 编写于 作者: A Alex Dima

Improve connection logic (fixes microsoft/vscode-remote-release#1224)

上级 f58c8f6a
......@@ -405,42 +405,53 @@ export class Client<TContext = string> extends IPCClient<TContext> {
/**
* Will ensure no messages are lost if there are no event listeners.
*/
export function createBufferedEvent<T>(source: Event<T>): Event<T> {
let emitter: Emitter<T>;
let hasListeners = false;
let isDeliveringMessages = false;
let bufferedMessages: T[] = [];
const deliverMessages = () => {
if (isDeliveringMessages) {
export class BufferedEmitter<T> {
private _emitter: Emitter<T>;
public readonly event: Event<T>;
private _hasListeners = false;
private _isDeliveringMessages = false;
private _bufferedMessages: T[] = [];
constructor() {
this._emitter = new Emitter<T>({
onFirstListenerAdd: () => {
this._hasListeners = true;
// it is important to deliver these messages after this call, but before
// other messages have a chance to be received (to guarantee in order delivery)
// that's why we're using here nextTick and not other types of timeouts
process.nextTick(() => this._deliverMessages);
},
onLastListenerRemove: () => {
this._hasListeners = false;
}
});
this.event = this._emitter.event;
}
private _deliverMessages(): void {
if (this._isDeliveringMessages) {
return;
}
isDeliveringMessages = true;
while (hasListeners && bufferedMessages.length > 0) {
emitter.fire(bufferedMessages.shift()!);
this._isDeliveringMessages = true;
while (this._hasListeners && this._bufferedMessages.length > 0) {
this._emitter.fire(this._bufferedMessages.shift()!);
}
isDeliveringMessages = false;
};
this._isDeliveringMessages = false;
}
source((e: T) => {
bufferedMessages.push(e);
deliverMessages();
});
emitter = new Emitter<T>({
onFirstListenerAdd: () => {
hasListeners = true;
// it is important to deliver these messages after this call, but before
// other messages have a chance to be received (to guarantee in order delivery)
// that's why we're using here nextTick and not other types of timeouts
process.nextTick(deliverMessages);
},
onLastListenerRemove: () => {
hasListeners = false;
}
});
return emitter.event;
public fire(event: T): void {
if (this._hasListeners) {
this._emitter.fire(event);
} else {
this._bufferedMessages.push(event);
}
}
public flushBuffer(): void {
this._bufferedMessages = [];
}
}
class QueueElement<T> {
......@@ -530,20 +541,20 @@ export class PersistentProtocol implements IMessagePassingProtocol {
private _socketReader: ProtocolReader;
private _socketDisposables: IDisposable[];
private _onControlMessage = new Emitter<VSBuffer>();
readonly onControlMessage: Event<VSBuffer> = createBufferedEvent(this._onControlMessage.event);
private readonly _onControlMessage = new BufferedEmitter<VSBuffer>();
readonly onControlMessage: Event<VSBuffer> = this._onControlMessage.event;
private _onMessage = new Emitter<VSBuffer>();
readonly onMessage: Event<VSBuffer> = createBufferedEvent(this._onMessage.event);
private readonly _onMessage = new BufferedEmitter<VSBuffer>();
readonly onMessage: Event<VSBuffer> = this._onMessage.event;
private _onClose = new Emitter<void>();
readonly onClose: Event<void> = createBufferedEvent(this._onClose.event);
private readonly _onClose = new BufferedEmitter<void>();
readonly onClose: Event<void> = this._onClose.event;
private _onSocketClose = new Emitter<void>();
readonly onSocketClose: Event<void> = createBufferedEvent(this._onSocketClose.event);
private readonly _onSocketClose = new BufferedEmitter<void>();
readonly onSocketClose: Event<void> = this._onSocketClose.event;
private _onSocketTimeout = new Emitter<void>();
readonly onSocketTimeout: Event<void> = createBufferedEvent(this._onSocketTimeout.event);
private readonly _onSocketTimeout = new BufferedEmitter<void>();
readonly onSocketTimeout: Event<void> = this._onSocketTimeout.event;
public get unacknowledgedCount(): number {
return this._outgoingMsgId - this._outgoingAckId;
......@@ -656,6 +667,10 @@ export class PersistentProtocol implements IMessagePassingProtocol {
this._isReconnecting = true;
this._socketDisposables = dispose(this._socketDisposables);
this._onControlMessage.flushBuffer();
this._onSocketClose.flushBuffer();
this._onSocketTimeout.flushBuffer();
this._socket.dispose();
this._socket = socket;
this._socketWriter = new ProtocolWriter(this._socket);
......
......@@ -143,7 +143,7 @@ class CodeRendererMain extends Disposable {
serviceCollection.set(ISignService, signService);
// Remote Agent
const remoteAgentService = this._register(new RemoteAgentService(this.configuration.webSocketFactory, environmentService, productService, remoteAuthorityResolverService, signService));
const remoteAgentService = this._register(new RemoteAgentService(this.configuration.webSocketFactory, environmentService, productService, remoteAuthorityResolverService, signService, logService));
serviceCollection.set(IRemoteAgentService, remoteAgentService);
// Files
......
......@@ -194,7 +194,7 @@ class CodeRendererMain extends Disposable {
const signService = new SignService();
serviceCollection.set(ISignService, signService);
const remoteAgentService = this._register(new RemoteAgentService(this.environmentService.configuration, this.environmentService, remoteAuthorityResolverService, signService));
const remoteAgentService = this._register(new RemoteAgentService(this.environmentService.configuration, this.environmentService, remoteAuthorityResolverService, signService, logService));
serviceCollection.set(IRemoteAgentService, remoteAgentService);
// Files
......
......@@ -109,7 +109,7 @@ suite('WorkspaceContextService - Folder', () => {
const diskFileSystemProvider = new DiskFileSystemProvider(new NullLogService());
fileService.registerProvider(Schemas.file, diskFileSystemProvider);
fileService.registerProvider(Schemas.userData, new FileUserDataProvider(environmentService.appSettingsHome, environmentService.backupHome, new DiskFileSystemProvider(new NullLogService()), environmentService));
workspaceContextService = new WorkspaceService({ configurationCache: new ConfigurationCache(environmentService) }, environmentService, fileService, new RemoteAgentService(<IWindowConfiguration>{}, environmentService, new RemoteAuthorityResolverService(), new SignService(undefined)));
workspaceContextService = new WorkspaceService({ configurationCache: new ConfigurationCache(environmentService) }, environmentService, fileService, new RemoteAgentService(<IWindowConfiguration>{}, environmentService, new RemoteAuthorityResolverService(), new SignService(undefined), new NullLogService()));
return (<WorkspaceService>workspaceContextService).initialize(convertToWorkspacePayload(URI.file(folderDir)));
});
});
......
......@@ -79,7 +79,8 @@ export class RemoteExtensionHostClient extends Disposable implements IExtensionH
return { host: authority.host, port: authority.port };
}
},
signService: this._signService
signService: this._signService,
logService: this._logService
};
return this.remoteAuthorityResolverService.resolveAuthority(this._initDataProvider.remoteAuthority).then((resolverResult) => {
......
......@@ -7,9 +7,9 @@ import * as nativeWatchdog from 'native-watchdog';
import * as net from 'net';
import * as minimist from 'vscode-minimist';
import { onUnexpectedError } from 'vs/base/common/errors';
import { Event, Emitter } from 'vs/base/common/event';
import { Event } from 'vs/base/common/event';
import { IMessagePassingProtocol } from 'vs/base/parts/ipc/common/ipc';
import { PersistentProtocol, ProtocolConstants, createBufferedEvent } from 'vs/base/parts/ipc/common/ipc.net';
import { PersistentProtocol, ProtocolConstants, BufferedEmitter } from 'vs/base/parts/ipc/common/ipc.net';
import { NodeSocket, WebSocketNodeSocket } from 'vs/base/parts/ipc/node/ipc.net';
import product from 'vs/platform/product/node/product';
import { IInitData } from 'vs/workbench/api/common/extHost.protocol';
......@@ -164,8 +164,8 @@ async function createExtHostProtocol(): Promise<IMessagePassingProtocol> {
return new class implements IMessagePassingProtocol {
private readonly _onMessage = new Emitter<VSBuffer>();
readonly onMessage: Event<VSBuffer> = createBufferedEvent(this._onMessage.event);
private readonly _onMessage = new BufferedEmitter<VSBuffer>();
readonly onMessage: Event<VSBuffer> = this._onMessage.event;
private _terminating: boolean;
......
......@@ -11,6 +11,7 @@ import { IProductService } from 'vs/platform/product/common/product';
import { IWebSocketFactory, BrowserSocketFactory } from 'vs/platform/remote/browser/browserSocketFactory';
import { ISignService } from 'vs/platform/sign/common/sign';
import { ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection';
import { ILogService } from 'vs/platform/log/common/log';
export class RemoteAgentService extends AbstractRemoteAgentService implements IRemoteAgentService {
......@@ -23,12 +24,13 @@ export class RemoteAgentService extends AbstractRemoteAgentService implements IR
@IWorkbenchEnvironmentService environmentService: IWorkbenchEnvironmentService,
@IProductService productService: IProductService,
@IRemoteAuthorityResolverService remoteAuthorityResolverService: IRemoteAuthorityResolverService,
@ISignService signService: ISignService
@ISignService signService: ISignService,
@ILogService logService: ILogService
) {
super(environmentService);
this.socketFactory = new BrowserSocketFactory(webSocketFactory);
this._connection = this._register(new RemoteAgentConnection(environmentService.configuration.remoteAuthority!, productService.commit, this.socketFactory, remoteAuthorityResolverService, signService));
this._connection = this._register(new RemoteAgentConnection(environmentService.configuration.remoteAuthority!, productService.commit, this.socketFactory, remoteAuthorityResolverService, signService, logService));
}
getConnection(): IRemoteAgentConnection | null {
......
......@@ -20,6 +20,7 @@ import { INotificationService } from 'vs/platform/notification/common/notificati
import { IDiagnosticInfoOptions, IDiagnosticInfo } from 'vs/platform/diagnostics/common/diagnostics';
import { Emitter } from 'vs/base/common/event';
import { ISignService } from 'vs/platform/sign/common/sign';
import { ILogService } from 'vs/platform/log/common/log';
export abstract class AbstractRemoteAgentService extends Disposable {
......@@ -86,7 +87,8 @@ export class RemoteAgentConnection extends Disposable implements IRemoteAgentCon
private readonly _commit: string | undefined,
private readonly _socketFactory: ISocketFactory,
private readonly _remoteAuthorityResolverService: IRemoteAuthorityResolverService,
private readonly _signService: ISignService
private readonly _signService: ISignService,
private readonly _logService: ILogService
) {
super();
this.remoteAuthority = remoteAuthority;
......@@ -124,7 +126,8 @@ export class RemoteAgentConnection extends Disposable implements IRemoteAgentCon
return { host: authority.host, port: authority.port };
}
},
signService: this._signService
signService: this._signService,
logService: this._logService
};
const connection = this._register(await connectRemoteAgentManagement(options, this.remoteAuthority, `renderer`));
this._register(connection.onDidStateChange(e => this._onDidStateChange.fire(e)));
......
......@@ -12,6 +12,7 @@ import { nodeSocketFactory } from 'vs/platform/remote/node/nodeSocketFactory';
import { AbstractRemoteAgentService, RemoteAgentConnection } from 'vs/workbench/services/remote/common/abstractRemoteAgentService';
import { ISignService } from 'vs/platform/sign/common/sign';
import { ISocketFactory } from 'vs/platform/remote/common/remoteAgentConnection';
import { ILogService } from 'vs/platform/log/common/log';
export class RemoteAgentService extends AbstractRemoteAgentService implements IRemoteAgentService {
......@@ -22,12 +23,13 @@ export class RemoteAgentService extends AbstractRemoteAgentService implements IR
constructor({ remoteAuthority }: IWindowConfiguration,
@IEnvironmentService environmentService: IEnvironmentService,
@IRemoteAuthorityResolverService remoteAuthorityResolverService: IRemoteAuthorityResolverService,
@ISignService signService: ISignService
@ISignService signService: ISignService,
@ILogService logService: ILogService
) {
super(environmentService);
this.socketFactory = nodeSocketFactory;
if (remoteAuthority) {
this._connection = this._register(new RemoteAgentConnection(remoteAuthority, product.commit, nodeSocketFactory, remoteAuthorityResolverService, signService));
this._connection = this._register(new RemoteAgentConnection(remoteAuthority, product.commit, nodeSocketFactory, remoteAuthorityResolverService, signService, logService));
}
}
......
......@@ -15,6 +15,7 @@ import { ITunnelService, RemoteTunnel } from 'vs/platform/remote/common/tunnel';
import { nodeSocketFactory } from 'vs/platform/remote/node/nodeSocketFactory';
import { ISignService } from 'vs/platform/sign/common/sign';
import { registerSingleton } from 'vs/platform/instantiation/common/extensions';
import { ILogService } from 'vs/platform/log/common/log';
export async function createRemoteTunnel(options: IConnectionOptions, tunnelRemotePort: number): Promise<RemoteTunnel> {
const tunnel = new NodeRemoteTunnel(options, tunnelRemotePort);
......@@ -90,7 +91,8 @@ export class TunnelService implements ITunnelService {
public constructor(
@IWorkbenchEnvironmentService private readonly environmentService: IWorkbenchEnvironmentService,
@IRemoteAuthorityResolverService private readonly remoteAuthorityResolverService: IRemoteAuthorityResolverService,
@ISignService private readonly signService: ISignService
@ISignService private readonly signService: ISignService,
@ILogService private readonly logService: ILogService
) {
}
......@@ -109,7 +111,8 @@ export class TunnelService implements ITunnelService {
return { host: authority.host, port: authority.port };
}
},
signService: this.signService
signService: this.signService,
logService: this.logService
};
return createRemoteTunnel(options, remotePort);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册