提交 715e9d57 编写于 作者: H Heejae Chang

previously, when consumer want to connect to remote host, the engine...

previously, when consumer want to connect to remote host, the engine internally created 2 channels to the remote host. one for the consumer itself, and the other to move remotable data required for the consumer in background.

this was done so that each consumer can have its own isolated connections from others. but later discovered that it can cause too much allocations since something like code lens can cause remote host to have 10+ more concurrent requests and syncing all those remotable data at once can cause too high allocation rate.

so, in RTM we throttled on how many can sync remotable data at the same time, but left 2 channels design as it was.

this change makes connection to have only 1 channel, and remotable data channel to be shared by all connections. since there is time delay to make connection to remote host, this should reduce delay required per each connection.
上级 f267342c
......@@ -18,6 +18,7 @@ namespace Roslyn.Test.Utilities.Remote
internal sealed class InProcRemoteHostClient : RemoteHostClient
{
private readonly InProcRemoteServices _inprocServices;
private readonly RemotableDataJsonRpcEx _remotableDataRpc;
private readonly JsonRpc _rpc;
public static async Task<RemoteHostClient> CreateAsync(Workspace workspace, bool runCacheCleanup, CancellationToken cancellationToken)
......@@ -25,8 +26,9 @@ public static async Task<RemoteHostClient> CreateAsync(Workspace workspace, bool
var inprocServices = new InProcRemoteServices(runCacheCleanup);
var remoteHostStream = await inprocServices.RequestServiceAsync(WellKnownRemoteHostServices.RemoteHostService, cancellationToken).ConfigureAwait(false);
var remotableDataRpc = new RemotableDataJsonRpcEx(workspace, await inprocServices.RequestServiceAsync(WellKnownServiceHubServices.SnapshotService, cancellationToken).ConfigureAwait(false));
var instance = new InProcRemoteHostClient(workspace, inprocServices, remoteHostStream);
var instance = new InProcRemoteHostClient(workspace, inprocServices, remotableDataRpc, remoteHostStream);
// make sure connection is done right
var current = $"VS ({Process.GetCurrentProcess().Id})";
......@@ -42,10 +44,15 @@ public static async Task<RemoteHostClient> CreateAsync(Workspace workspace, bool
return instance;
}
private InProcRemoteHostClient(Workspace workspace, InProcRemoteServices inprocServices, Stream stream) :
private InProcRemoteHostClient(
Workspace workspace,
InProcRemoteServices inprocServices,
RemotableDataJsonRpcEx remotableDataRpc,
Stream stream) :
base(workspace)
{
_inprocServices = inprocServices;
_remotableDataRpc = remotableDataRpc;
_rpc = new JsonRpc(new JsonRpcMessageHandler(stream, stream), target: this);
_rpc.JsonSerializer.Converters.Add(AggregateJsonConverter.Instance);
......@@ -61,16 +68,11 @@ public static async Task<RemoteHostClient> CreateAsync(Workspace workspace, bool
public override async Task<Connection> TryCreateConnectionAsync(
string serviceName, object callbackTarget, CancellationToken cancellationToken)
{
// get stream from service hub to communicate snapshot/asset related information
// this is the back channel the system uses to move data between VS and remote host
var snapshotStream = await _inprocServices.RequestServiceAsync(
WellKnownServiceHubServices.SnapshotService, cancellationToken).ConfigureAwait(false);
// get stream from service hub to communicate service specific information
// this is what consumer actually use to communicate information
var serviceStream = await _inprocServices.RequestServiceAsync(serviceName, cancellationToken).ConfigureAwait(false);
return new ServiceHubJsonRpcConnection(callbackTarget, serviceStream, snapshotStream, cancellationToken);
return new ServiceHubJsonRpcConnection(callbackTarget, serviceStream, _remotableDataRpc.Share(), cancellationToken);
}
protected override void OnStarted()
......@@ -82,6 +84,7 @@ protected override void OnStopped()
// we are asked to disconnect. unsubscribe and dispose to disconnect
_rpc.Disconnected -= OnRpcDisconnected;
_rpc.Dispose();
_remotableDataRpc.Dispose();
}
private void OnRpcDisconnected(object sender, JsonRpcDisconnectedEventArgs e)
......
......@@ -176,6 +176,9 @@
<Compile Include="..\..\VisualStudio\Core\Next\Remote\JsonRpcConnection.cs">
<Link>Remote\JsonRpcConnection.cs</Link>
</Compile>
<Compile Include="..\..\VisualStudio\Core\Next\Remote\RemotableDataJsonRpcEx.cs">
<Link>Remote\RemotableDataJsonRpcEx.cs</Link>
</Compile>
<Compile Include="AbstractCommandHandlerTestState.cs" />
<Compile Include="Async\AsynchronousOperationBlocker.cs" />
<Compile Include="Async\Checkpoint.cs" />
......
......@@ -15,7 +15,7 @@ namespace Microsoft.VisualStudio.LanguageServices.Remote
/// Helper type that abstract out JsonRpc communication with extra capability of
/// using raw stream to move over big chunk of data
/// </summary>
internal class JsonRpcEx : IDisposable
internal abstract class JsonRpcEx : IDisposable
{
private readonly JsonRpc _rpc;
private readonly CancellationToken _cancellationToken;
......@@ -34,6 +34,8 @@ internal class JsonRpcEx : IDisposable
_rpc.Disconnected += OnDisconnected;
}
public abstract void Dispose();
public async Task InvokeAsync(string targetName, params object[] arguments)
{
_cancellationToken.ThrowIfCancellationRequested();
......@@ -42,7 +44,7 @@ public async Task InvokeAsync(string targetName, params object[] arguments)
{
await _rpc.InvokeAsync(targetName, arguments).ConfigureAwait(false);
}
catch
catch
{
// any exception can be thrown from StreamJsonRpc if JsonRpc is disposed in the middle of read/write.
// until we move to newly added cancellation support in JsonRpc, we will catch exception and translate to
......@@ -82,10 +84,8 @@ public Task<T> InvokeAsync<T>(string targetName, IEnumerable<object> arguments,
return Extensions.InvokeAsync(_rpc, targetName, arguments, funcWithDirectStreamAsync, _cancellationToken);
}
public void Dispose()
protected void Disconnect()
{
OnDisposed();
_rpc.Dispose();
}
......@@ -96,11 +96,6 @@ protected void StartListening()
_rpc.StartListening();
}
protected virtual void OnDisposed()
{
// do nothing
}
protected virtual void OnDisconnected(object sender, JsonRpcDisconnectedEventArgs e)
{
// do nothing
......
......@@ -5,12 +5,8 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.Execution;
using Microsoft.CodeAnalysis.Remote;
using Roslyn.Utilities;
using StreamJsonRpc;
using Microsoft.CodeAnalysis.Internal.Log;
namespace Microsoft.VisualStudio.LanguageServices.Remote
{
......@@ -20,7 +16,7 @@ internal class ServiceHubJsonRpcConnection : RemoteHostClient.Connection
private readonly ServiceJsonRpcClient _serviceClient;
// communication channel related to snapshot information
private readonly SnapshotJsonRpcClient _snapshotClient;
private readonly RemotableDataJsonRpcEx _remoteDataRpc;
// close connection when cancellation has raised
private readonly CancellationTokenRegistration _cancellationRegistration;
......@@ -28,12 +24,12 @@ internal class ServiceHubJsonRpcConnection : RemoteHostClient.Connection
public ServiceHubJsonRpcConnection(
object callbackTarget,
Stream serviceStream,
Stream snapshotStream,
RemotableDataJsonRpcEx dataRpc,
CancellationToken cancellationToken) :
base(cancellationToken)
{
_serviceClient = new ServiceJsonRpcClient(serviceStream, callbackTarget, cancellationToken);
_snapshotClient = new SnapshotJsonRpcClient(this, snapshotStream, cancellationToken);
_remoteDataRpc = dataRpc;
// dispose session when cancellation has raised
_cancellationRegistration = CancellationToken.Register(Dispose);
......@@ -41,7 +37,6 @@ internal class ServiceHubJsonRpcConnection : RemoteHostClient.Connection
protected override async Task OnRegisterPinnedRemotableDataScopeAsync(PinnedRemotableDataScope scope)
{
await _snapshotClient.InvokeAsync(WellKnownServiceHubServices.ServiceHubServiceBase_Initialize, scope.SolutionInfo).ConfigureAwait(false);
await _serviceClient.InvokeAsync(WellKnownServiceHubServices.ServiceHubServiceBase_Initialize, scope.SolutionInfo).ConfigureAwait(false);
}
......@@ -74,7 +69,7 @@ protected override void OnDisposed()
// dispose service and snapshot channels
_serviceClient.Dispose();
_snapshotClient.Dispose();
_remoteDataRpc.Dispose();
}
/// <summary>
......@@ -94,120 +89,10 @@ public ServiceJsonRpcClient(Stream stream, object callbackTarget, CancellationTo
StartListening();
}
}
/// <summary>
/// Communication channel between remote host client and remote host.
///
/// this is framework's back channel to talk to remote host
///
/// for example, this will be used to deliver missing assets in remote host.
///
/// each remote host client will have its own back channel so that it can work isolated
/// with other clients.
/// </summary>
private class SnapshotJsonRpcClient : JsonRpcEx
{
private readonly ServiceHubJsonRpcConnection _owner;
private readonly CancellationTokenSource _source;
public SnapshotJsonRpcClient(ServiceHubJsonRpcConnection owner, Stream stream, CancellationToken cancellationToken)
: base(stream, callbackTarget: null, useThisAsCallback: true, cancellationToken: cancellationToken)
{
_owner = owner;
_source = new CancellationTokenSource();
StartListening();
}
/// <summary>
/// this is callback from remote host side to get asset associated with checksum from VS.
/// </summary>
public async Task RequestAssetAsync(int scopeId, Checksum[] checksums, string streamName)
{
try
{
using (Logger.LogBlock(FunctionId.JsonRpcSession_RequestAssetAsync, streamName, _source.Token))
using (var stream = await DirectStream.GetAsync(streamName, _source.Token).ConfigureAwait(false))
{
var scope = _owner.PinnedRemotableDataScope;
using (var writer = new ObjectWriter(stream))
{
writer.WriteInt32(scopeId);
await WriteAssetAsync(writer, scope, checksums).ConfigureAwait(false);
}
await stream.FlushAsync(_source.Token).ConfigureAwait(false);
}
}
catch (IOException)
{
// remote host side is cancelled (client stream connection is closed)
// can happen if pinned solution scope is disposed
}
catch (OperationCanceledException)
{
// rpc connection is closed.
// can happen if pinned solution scope is disposed
}
}
private async Task WriteAssetAsync(ObjectWriter writer, PinnedRemotableDataScope scope, Checksum[] checksums)
{
// special case
if (checksums.Length == 0)
{
await WriteNoAssetAsync(writer).ConfigureAwait(false);
return;
}
if (checksums.Length == 1)
{
await WriteOneAssetAsync(writer, scope, checksums[0]).ConfigureAwait(false);
return;
}
await WriteMultipleAssetsAsync(writer, scope, checksums).ConfigureAwait(false);
}
private Task WriteNoAssetAsync(ObjectWriter writer)
{
writer.WriteInt32(0);
return SpecializedTasks.EmptyTask;
}
private async Task WriteOneAssetAsync(ObjectWriter writer, PinnedRemotableDataScope scope, Checksum checksum)
{
var remotableData = scope.GetRemotableData(checksum, _source.Token) ?? RemotableData.Null;
writer.WriteInt32(1);
checksum.WriteTo(writer);
writer.WriteInt32((int)remotableData.Kind);
await remotableData.WriteObjectToAsync(writer, _source.Token).ConfigureAwait(false);
}
private async Task WriteMultipleAssetsAsync(ObjectWriter writer, PinnedRemotableDataScope scope, Checksum[] checksums)
{
var remotableDataMap = scope.GetRemotableData(checksums, _source.Token);
writer.WriteInt32(remotableDataMap.Count);
foreach (var kv in remotableDataMap)
{
var checksum = kv.Key;
var remotableData = kv.Value;
checksum.WriteTo(writer);
writer.WriteInt32((int)remotableData.Kind);
await remotableData.WriteObjectToAsync(writer, _source.Token).ConfigureAwait(false);
}
}
protected override void OnDisconnected(object sender, JsonRpcDisconnectedEventArgs e)
public override void Dispose()
{
_source.Cancel();
Disconnect();
}
}
}
......
// Copyright (c) Microsoft. All Rights Reserved. Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.Execution;
using Microsoft.CodeAnalysis.Remote;
using Roslyn.Utilities;
using StreamJsonRpc;
using Microsoft.CodeAnalysis.Internal.Log;
namespace Microsoft.VisualStudio.LanguageServices.Remote
{
/// <summary>
/// Communication channel between remote host client and remote host.
///
/// this is framework's back channel to talk to remote host
///
/// for example, this will be used to deliver missing remotable data to remote host.
///
/// all connection will share one remotable data channel
/// </summary>
internal class RemotableDataJsonRpcEx : JsonRpcEx
{
private readonly ISolutionSynchronizationService _remotableDataService;
private readonly CancellationTokenSource _source;
/// <summary>
/// number of shared count
/// </summary>
private readonly object _gate;
private int _count;
public RemotableDataJsonRpcEx(Microsoft.CodeAnalysis.Workspace workspace, Stream stream)
: base(stream, callbackTarget: null, useThisAsCallback: true, cancellationToken: CancellationToken.None)
{
_gate = new object();
_count = 1;
_remotableDataService = workspace.Services.GetService<ISolutionSynchronizationService>();
// cancellation will be removed once I get to cancellation refactoring
_source = new CancellationTokenSource();
StartListening();
}
/// <summary>
/// this is callback from remote host side to get asset associated with checksum from VS.
/// </summary>
public async Task RequestAssetAsync(int scopeId, Checksum[] checksums, string streamName)
{
try
{
using (Logger.LogBlock(FunctionId.JsonRpcSession_RequestAssetAsync, streamName, _source.Token))
using (var stream = await DirectStream.GetAsync(streamName, _source.Token).ConfigureAwait(false))
{
using (var writer = new ObjectWriter(stream))
{
writer.WriteInt32(scopeId);
await WriteAssetAsync(writer, scopeId, checksums).ConfigureAwait(false);
}
await stream.FlushAsync(_source.Token).ConfigureAwait(false);
}
}
catch (IOException)
{
// remote host side is cancelled (client stream connection is closed)
// can happen if pinned solution scope is disposed
}
catch (OperationCanceledException)
{
// rpc connection is closed.
// can happen if pinned solution scope is disposed
}
}
private async Task WriteAssetAsync(ObjectWriter writer, int scopeId, Checksum[] checksums)
{
// special case
if (checksums.Length == 0)
{
await WriteNoAssetAsync(writer).ConfigureAwait(false);
return;
}
if (checksums.Length == 1)
{
await WriteOneAssetAsync(writer, scopeId, checksums[0]).ConfigureAwait(false);
return;
}
await WriteMultipleAssetsAsync(writer, scopeId, checksums).ConfigureAwait(false);
}
private Task WriteNoAssetAsync(ObjectWriter writer)
{
writer.WriteInt32(0);
return SpecializedTasks.EmptyTask;
}
private async Task WriteOneAssetAsync(ObjectWriter writer, int scopeId, Checksum checksum)
{
var remotableData = _remotableDataService.GetRemotableData(scopeId, checksum, _source.Token) ?? RemotableData.Null;
writer.WriteInt32(1);
checksum.WriteTo(writer);
writer.WriteInt32((int)remotableData.Kind);
await remotableData.WriteObjectToAsync(writer, _source.Token).ConfigureAwait(false);
}
private async Task WriteMultipleAssetsAsync(ObjectWriter writer, int scopeId, Checksum[] checksums)
{
var remotableDataMap = _remotableDataService.GetRemotableData(scopeId, checksums, _source.Token);
writer.WriteInt32(remotableDataMap.Count);
foreach (var kv in remotableDataMap)
{
var checksum = kv.Key;
var remotableData = kv.Value;
checksum.WriteTo(writer);
writer.WriteInt32((int)remotableData.Kind);
await remotableData.WriteObjectToAsync(writer, _source.Token).ConfigureAwait(false);
}
}
public RemotableDataJsonRpcEx Share()
{
lock (_gate)
{
Contract.ThrowIfFalse(_count > 0);
_count++;
return this;
}
}
public override void Dispose()
{
lock (_gate)
{
Contract.ThrowIfFalse(_count > 0);
_count--;
if (_count == 0)
{
Disconnect();
}
}
}
protected override void OnDisconnected(object sender, JsonRpcDisconnectedEventArgs e)
{
_source.Cancel();
}
}
}
\ No newline at end of file
......@@ -5,10 +5,8 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.Editor.Shared.Utilities;
using Microsoft.CodeAnalysis.ErrorReporting;
using Microsoft.CodeAnalysis.Execution;
using Microsoft.CodeAnalysis.Internal.Log;
using Microsoft.CodeAnalysis.Remote;
using Microsoft.ServiceHub.Client;
......@@ -27,10 +25,12 @@ internal sealed partial class ServiceHubRemoteHostClient : RemoteHostClient
private static int s_instanceId = 0;
private readonly HubClient _hubClient;
private readonly JsonRpc _rpc;
private readonly HostGroup _hostGroup;
private readonly TimeSpan _timeout;
private readonly JsonRpc _rpc;
private readonly RemotableDataJsonRpcEx _remotableDataRpc;
public static async Task<RemoteHostClient> CreateAsync(
Workspace workspace, CancellationToken cancellationToken)
{
......@@ -46,7 +46,8 @@ internal sealed partial class ServiceHubRemoteHostClient : RemoteHostClient
var timeout = TimeSpan.FromMilliseconds(workspace.Options.GetOption(RemoteHostOptions.RequestServiceTimeoutInMS));
var remoteHostStream = await RequestServiceAsync(primary, WellKnownRemoteHostServices.RemoteHostService, hostGroup, timeout, cancellationToken).ConfigureAwait(false);
var instance = new ServiceHubRemoteHostClient(workspace, primary, hostGroup, remoteHostStream);
var remotableDataRpc = new RemotableDataJsonRpcEx(workspace, await RequestServiceAsync(primary, WellKnownServiceHubServices.SnapshotService, hostGroup, timeout, cancellationToken).ConfigureAwait(false));
var instance = new ServiceHubRemoteHostClient(workspace, primary, hostGroup, remotableDataRpc, remoteHostStream);
// make sure connection is done right
var host = await instance._rpc.InvokeAsync<string>(nameof(IRemoteHostService.Connect), current, TelemetryService.DefaultSession.SerializeSettings()).ConfigureAwait(false);
......@@ -99,12 +100,17 @@ private static async Task RegisterWorkspaceHostAsync(Workspace workspace, Remote
}
private ServiceHubRemoteHostClient(
Workspace workspace, HubClient hubClient, HostGroup hostGroup, Stream stream) :
Workspace workspace,
HubClient hubClient,
HostGroup hostGroup,
RemotableDataJsonRpcEx remotableDataRpc,
Stream stream) :
base(workspace)
{
_hubClient = hubClient;
_hostGroup = hostGroup;
_timeout = TimeSpan.FromMilliseconds(workspace.Options.GetOption(RemoteHostOptions.RequestServiceTimeoutInMS));
_remotableDataRpc = remotableDataRpc;
_rpc = new JsonRpc(new JsonRpcMessageHandler(stream, stream), target: this);
_rpc.JsonSerializer.Converters.Add(AggregateJsonConverter.Instance);
......@@ -117,15 +123,11 @@ private static async Task RegisterWorkspaceHostAsync(Workspace workspace, Remote
public override async Task<Connection> TryCreateConnectionAsync(string serviceName, object callbackTarget, CancellationToken cancellationToken)
{
// get stream from service hub to communicate snapshot/asset related information
// this is the back channel the system uses to move data between VS and remote host for solution related information
var snapshotStream = await RequestServiceAsync(_hubClient, WellKnownServiceHubServices.SnapshotService, _hostGroup, _timeout, cancellationToken).ConfigureAwait(false);
// get stream from service hub to communicate service specific information
// this is what consumer actually use to communicate information
var serviceStream = await RequestServiceAsync(_hubClient, serviceName, _hostGroup, _timeout, cancellationToken).ConfigureAwait(false);
return new ServiceHubJsonRpcConnection(callbackTarget, serviceStream, snapshotStream, cancellationToken);
return new ServiceHubJsonRpcConnection(callbackTarget, serviceStream, _remotableDataRpc.Share(), cancellationToken);
}
protected override void OnStarted()
......@@ -141,6 +143,7 @@ protected override void OnStopped()
// we don't need the event, otherwise, Disconnected event will be called twice.
_rpc.Disconnected -= OnRpcDisconnected;
_rpc.Dispose();
_remotableDataRpc.Dispose();
}
private void OnRpcDisconnected(object sender, JsonRpcDisconnectedEventArgs e)
......
......@@ -172,6 +172,7 @@
<Compile Include="Remote\JsonRpcClient.cs" />
<Compile Include="Remote\JsonRpcMessageHandler.cs" />
<Compile Include="Remote\JsonRpcConnection.cs" />
<Compile Include="Remote\RemotableDataJsonRpcEx.cs" />
<Compile Include="Remote\RemoteHostClientFactory.cs" />
<Compile Include="Remote\ServiceHubRemoteHostClient.WorkspaceHost.cs" />
<Compile Include="Remote\ServiceHubRemoteHostClient.cs" />
......
......@@ -14,18 +14,18 @@ internal class TestAssetSource : AssetSource
{
private readonly Dictionary<Checksum, object> _map;
public TestAssetSource(AssetStorage assetStorage, int sessionId) :
this(assetStorage, sessionId, new Dictionary<Checksum, object>())
public TestAssetSource(AssetStorage assetStorage) :
this(assetStorage, new Dictionary<Checksum, object>())
{
}
public TestAssetSource(AssetStorage assetStorage, int sessionId, Checksum checksum, object data) :
this(assetStorage, sessionId, new Dictionary<Checksum, object>() { { checksum, data } })
public TestAssetSource(AssetStorage assetStorage, Checksum checksum, object data) :
this(assetStorage, new Dictionary<Checksum, object>() { { checksum, data } })
{
}
public TestAssetSource(AssetStorage assetStorage, int sessionId, Dictionary<Checksum, object> map) :
base(assetStorage, sessionId)
public TestAssetSource(AssetStorage assetStorage, Dictionary<Checksum, object> map) :
base(assetStorage)
{
_map = map;
}
......
......@@ -26,7 +26,7 @@ public async Task TestAssets()
var data = new object();
var storage = new AssetStorage();
var source = new TestAssetSource(storage, sessionId, checksum, data);
var source = new TestAssetSource(storage, checksum, data);
var service = new AssetService(sessionId, storage);
var stored = await service.GetAssetAsync<object>(checksum, CancellationToken.None);
......@@ -55,7 +55,7 @@ public async Task TestAssetSynchronization()
var sessionId = 0;
var storage = new AssetStorage();
var source = new TestAssetSource(storage, sessionId, map);
var source = new TestAssetSource(storage, map);
var service = new AssetService(sessionId, storage);
await service.SynchronizeAssetsAsync(new HashSet<Checksum>(map.Keys), CancellationToken.None);
......@@ -84,7 +84,7 @@ public async Task TestSolutionSynchronization()
var sessionId = 0;
var storage = new AssetStorage();
var source = new TestAssetSource(storage, sessionId, map);
var source = new TestAssetSource(storage, map);
var service = new AssetService(sessionId, storage);
await service.SynchronizeSolutionAssetsAsync(await solution.State.GetChecksumAsync(CancellationToken.None), CancellationToken.None);
......@@ -113,7 +113,7 @@ public async Task TestProjectSynchronization()
var sessionId = 0;
var storage = new AssetStorage();
var source = new TestAssetSource(storage, sessionId, map);
var source = new TestAssetSource(storage, map);
var service = new AssetService(sessionId, storage);
await service.SynchronizeProjectAssetsAsync(SpecializedCollections.SingletonEnumerable(await project.State.GetChecksumAsync(CancellationToken.None)), CancellationToken.None);
......
......@@ -15,18 +15,11 @@ public class AssetStorageTests
[Fact, Trait(Traits.Feature, Traits.Features.RemoteHost)]
public void TestCreation()
{
var sessionId = 0;
var storage = new AssetStorage();
var source = new TestAssetSource(storage, sessionId);
var source = new TestAssetSource(storage);
var stored = storage.TryGetAssetSource(sessionId);
var stored = storage.AssetSource;
Assert.Equal(source, stored);
storage.UnregisterAssetSource(sessionId);
var none = storage.TryGetAssetSource(sessionId);
Assert.Null(none);
}
[Fact, Trait(Traits.Feature, Traits.Features.RemoteHost)]
......
......@@ -361,7 +361,7 @@ private static async Task<SolutionService> GetSolutionServiceAsync(Solution solu
var sessionId = 0;
var storage = new AssetStorage();
var source = new TestAssetSource(storage, sessionId, map);
var source = new TestAssetSource(storage, map);
var service = new SolutionService(new AssetService(sessionId, storage));
return service;
......
......@@ -21,12 +21,12 @@ internal partial class AssetStorages
/// <summary>
/// map from solution checksum scope to its associated asset storage
/// </summary>
private readonly ConcurrentDictionary<PinnedRemotableDataScope, Storage> _storages;
private readonly ConcurrentDictionary<int, Storage> _storages;
public AssetStorages()
{
_globalAssets = new ConcurrentDictionary<object, CustomAsset>(concurrencyLevel: 2, capacity: 10);
_storages = new ConcurrentDictionary<PinnedRemotableDataScope, Storage>(concurrencyLevel: 2, capacity: 10);
_storages = new ConcurrentDictionary<int, Storage>(concurrencyLevel: 2, capacity: 10);
}
public void AddGlobalAsset(object value, CustomAsset asset, CancellationToken cancellationToken)
......@@ -59,7 +59,7 @@ public Storage CreateStorage(SolutionState solutionState)
return new Storage(solutionState);
}
public RemotableData GetRemotableData(PinnedRemotableDataScope scope, Checksum checksum, CancellationToken cancellationToken)
public RemotableData GetRemotableData(int scopeId, Checksum checksum, CancellationToken cancellationToken)
{
if (checksum == Checksum.Null)
{
......@@ -68,13 +68,11 @@ public RemotableData GetRemotableData(PinnedRemotableDataScope scope, Checksum c
}
// search snapshots we have
foreach (var storage in GetStorages(scope))
var storage = _storages[scopeId];
var remotableData = storage.TryGetRemotableData(checksum, cancellationToken);
if (remotableData != null)
{
var syncObject = storage.TryGetRemotableData(checksum, cancellationToken);
if (syncObject != null)
{
return syncObject;
}
return remotableData;
}
// search global assets
......@@ -99,7 +97,7 @@ public RemotableData GetRemotableData(PinnedRemotableDataScope scope, Checksum c
return null;
}
public IReadOnlyDictionary<Checksum, RemotableData> GetRemotableData(PinnedRemotableDataScope scope, IEnumerable<Checksum> checksums, CancellationToken cancellationToken)
public IReadOnlyDictionary<Checksum, RemotableData> GetRemotableData(int scopeId, IEnumerable<Checksum> checksums, CancellationToken cancellationToken)
{
using (var searchingChecksumsLeft = Creator.CreateChecksumSet(checksums))
{
......@@ -113,15 +111,14 @@ public RemotableData GetRemotableData(PinnedRemotableDataScope scope, Checksum c
}
// search checksum trees we have
foreach (var storage in GetStorages(scope))
var storage = _storages[scopeId];
storage.AppendRemotableData(searchingChecksumsLeft.Object, result, cancellationToken);
if (result.Count == numberOfChecksumsToSearch)
{
storage.AppendRemotableData(searchingChecksumsLeft.Object, result, cancellationToken);
if (result.Count == numberOfChecksumsToSearch)
{
// no checksum left to find
Contract.Requires(searchingChecksumsLeft.Object.Count == 0);
return result;
}
// no checksum left to find
Contract.Requires(searchingChecksumsLeft.Object.Count == 0);
return result;
}
// search global assets
......@@ -154,46 +151,38 @@ public RemotableData GetRemotableData(PinnedRemotableDataScope scope, Checksum c
}
}
public void RegisterSnapshot(PinnedRemotableDataScope snapshot, AssetStorages.Storage storage)
public void RegisterSnapshot(PinnedRemotableDataScope scope, AssetStorages.Storage storage)
{
// duplicates are not allowed, there can be multiple snapshots to same solution, so no ref counting.
if (!_storages.TryAdd(snapshot, storage))
if (!_storages.TryAdd(scope.SolutionInfo.ScopeId, storage))
{
// this should make failure more explicit
FailFast.OnFatalException(new Exception("who is adding same snapshot?"));
}
}
public void UnregisterSnapshot(PinnedRemotableDataScope snapshot)
public void UnregisterSnapshot(PinnedRemotableDataScope scope)
{
// calling it multiple times for same snapshot is not allowed.
if (!_storages.TryRemove(snapshot, out var dummy))
if (!_storages.TryRemove(scope.SolutionInfo.ScopeId, out var dummy))
{
// this should make failure more explicit
FailFast.OnFatalException(new Exception("who is removing same snapshot?"));
}
}
private IEnumerable<Storage> GetStorages(PinnedRemotableDataScope scope)
public RemotableData GetRemotableData_TestOnly(Checksum checksum, CancellationToken cancellationToken)
{
if (scope != null)
foreach (var kv in _storages)
{
yield return _storages[scope];
yield break;
}
using (var solutionProcessed = Creator.CreateChecksumSet())
{
foreach (var kv in _storages)
var data = GetRemotableData(kv.Key, checksum, cancellationToken);
if (data != null)
{
if (!solutionProcessed.Object.Add(kv.Key.SolutionChecksum))
{
continue;
}
yield return kv.Value;
return data;
}
}
return null;
}
}
}
......@@ -10,7 +10,7 @@
namespace Microsoft.CodeAnalysis.Execution
{
/// <summary>
/// Asset that is not part of solution, but want to participate in ISolutionSynchronizationService
/// Asset that is not part of solution, but want to participate in <see cref="ISolutionSynchronizationService"/>
/// </summary>
internal abstract class CustomAsset : RemotableData
{
......
......@@ -9,7 +9,7 @@
namespace Microsoft.CodeAnalysis.Execution
{
/// <summary>
/// builder to create custom asset which is not part of solution but want to participate in ISolutionSynchronizationService
/// builder to create custom asset which is not part of solution but want to participate in <see cref="ISolutionSynchronizationService"/>
/// </summary>
internal class CustomAssetBuilder
{
......
......@@ -42,11 +42,11 @@ internal interface ISolutionSynchronizationService : IWorkspaceService
/// <summary>
/// Get <see cref="RemotableData"/> corresponding to given <see cref="Checksum"/>.
/// </summary>
RemotableData GetRemotableData(Checksum checksum, CancellationToken cancellationToken);
RemotableData GetRemotableData(int scopeId, Checksum checksum, CancellationToken cancellationToken);
/// <summary>
/// Get <see cref="RemotableData"/>s corresponding to given <see cref="Checksum"/>s.
/// </summary>
IReadOnlyDictionary<Checksum, RemotableData> GetRemotableData(IEnumerable<Checksum> checksums, CancellationToken cancellationToken);
IReadOnlyDictionary<Checksum, RemotableData> GetRemotableData(int scopeId, IEnumerable<Checksum> checksums, CancellationToken cancellationToken);
}
}
......@@ -87,7 +87,7 @@ public RemotableData GetRemotableData(Checksum checksum, CancellationToken cance
{
using (Logger.LogBlock(FunctionId.PinnedRemotableDataScope_GetRemotableData, Checksum.GetChecksumLogInfo, checksum, cancellationToken))
{
return _storages.GetRemotableData(this, checksum, cancellationToken);
return _storages.GetRemotableData(SolutionInfo.ScopeId, checksum, cancellationToken);
}
}
......@@ -95,7 +95,7 @@ public RemotableData GetRemotableData(Checksum checksum, CancellationToken cance
{
using (Logger.LogBlock(FunctionId.PinnedRemotableDataScope_GetRemotableData, Checksum.GetChecksumsLogInfo, checksums, cancellationToken))
{
return _storages.GetRemotableData(this, checksums, cancellationToken);
return _storages.GetRemotableData(SolutionInfo.ScopeId, checksums, cancellationToken);
}
}
......
......@@ -8,7 +8,7 @@
namespace Microsoft.CodeAnalysis.Execution
{
/// <summary>
/// Base for object that will use ISolutionSynchronizationService framework to synchronize data to remote host
/// Base for object that will use <see cref="ISolutionSynchronizationService"/> framework to synchronize data to remote host
/// </summary>
internal abstract partial class RemotableData
{
......
......@@ -28,10 +28,10 @@ internal class Service : ISolutionSynchronizationService
public Serializer Serializer_TestOnly => new Serializer(_workspaceServices);
public Service(HostWorkspaceServices workspaceServices, AssetStorages trees)
public Service(HostWorkspaceServices workspaceServices, AssetStorages storages)
{
_workspaceServices = workspaceServices;
_assetStorages = trees;
_assetStorages = storages;
}
public void AddGlobalAsset(object value, CustomAsset asset, CancellationToken cancellationToken)
......@@ -61,21 +61,26 @@ public async Task<PinnedRemotableDataScope> CreatePinnedRemotableDataScopeAsync(
}
}
public RemotableData GetRemotableData(Checksum checksum, CancellationToken cancellationToken)
public RemotableData GetRemotableData(int scopeId, Checksum checksum, CancellationToken cancellationToken)
{
using (Logger.LogBlock(FunctionId.SolutionSynchronizationService_GetRemotableData, Checksum.GetChecksumLogInfo, checksum, cancellationToken))
{
return _assetStorages.GetRemotableData(scope: null, checksum: checksum, cancellationToken: cancellationToken);
return _assetStorages.GetRemotableData(scopeId, checksum, cancellationToken);
}
}
public IReadOnlyDictionary<Checksum, RemotableData> GetRemotableData(IEnumerable<Checksum> checksums, CancellationToken cancellationToken)
public IReadOnlyDictionary<Checksum, RemotableData> GetRemotableData(int scopeId, IEnumerable<Checksum> checksums, CancellationToken cancellationToken)
{
using (Logger.LogBlock(FunctionId.SolutionSynchronizationService_GetRemotableData, Checksum.GetChecksumsLogInfo, checksums, cancellationToken))
{
return _assetStorages.GetRemotableData(scope: null, checksums: checksums, cancellationToken: cancellationToken);
return _assetStorages.GetRemotableData(scopeId, checksums, cancellationToken);
}
}
public RemotableData GetRemotableData_TestOnly(Checksum checksum, CancellationToken cancellationToken)
{
return _assetStorages.GetRemotableData_TestOnly(checksum, cancellationToken);
}
}
}
}
......@@ -18,7 +18,7 @@ internal static class Extensions
public static async Task<T> GetValueAsync<T>(this ISolutionSynchronizationService service, Checksum checksum)
{
var syncService = (SolutionSynchronizationServiceFactory.Service)service;
var syncObject = service.GetRemotableData(checksum, CancellationToken.None);
var syncObject = syncService.GetRemotableData_TestOnly(checksum, CancellationToken.None);
using (var stream = SerializableBytes.CreateWritableStream())
using (var writer = new ObjectWriter(stream))
......
......@@ -116,7 +116,7 @@ internal static async Task VerifyAssetAsync(ISolutionSynchronizationService serv
{
// re-create asset from object
var syncService = (SolutionSynchronizationServiceFactory.Service)service;
var syncObject = service.GetRemotableData(checksum, CancellationToken.None);
var syncObject = syncService.GetRemotableData_TestOnly(checksum, CancellationToken.None);
var recoveredValue = await service.GetValueAsync<T>(checksum);
var recreatedSyncObject = assetGetter(recoveredValue, kind, syncService.Serializer_TestOnly);
......@@ -256,7 +256,8 @@ internal static void VerifySnapshotInService(ISolutionSynchronizationService sna
internal static void VerifyChecksumInService(ISolutionSynchronizationService snapshotService, Checksum checksum, WellKnownSynchronizationKind kind)
{
Assert.NotNull(checksum);
var otherObject = snapshotService.GetRemotableData(checksum, CancellationToken.None);
var service = (SolutionSynchronizationServiceFactory.Service)snapshotService;
var otherObject = service.GetRemotableData_TestOnly(checksum, CancellationToken.None);
ChecksumEqual(checksum, kind, otherObject.Checksum, otherObject.Kind);
}
......
......@@ -37,14 +37,14 @@ public async Task CreateSolutionSnapshotId_Empty()
using (var snapshot = await snapshotService.CreatePinnedRemotableDataScopeAsync(solution, CancellationToken.None).ConfigureAwait(false))
{
var checksum = snapshot.SolutionChecksum;
var solutionSyncObject = snapshotService.GetRemotableData(checksum, CancellationToken.None);
var solutionSyncObject = snapshot.GetRemotableData(checksum, CancellationToken.None);
VerifySynchronizationObjectInService(snapshotService, solutionSyncObject);
var solutionObject = await snapshotService.GetValueAsync<SolutionStateChecksums>(checksum).ConfigureAwait(false);
VerifyChecksumInService(snapshotService, solutionObject.Info, WellKnownSynchronizationKind.SolutionAttributes);
var projectsSyncObject = snapshotService.GetRemotableData(solutionObject.Projects.Checksum, CancellationToken.None);
var projectsSyncObject = snapshot.GetRemotableData(solutionObject.Projects.Checksum, CancellationToken.None);
VerifySynchronizationObjectInService(snapshotService, projectsSyncObject);
Assert.Equal(solutionObject.Projects.Count, 0);
......@@ -72,7 +72,7 @@ public async Task CreateSolutionSnapshotId_Project()
using (var snapshot = await snapshotService.CreatePinnedRemotableDataScopeAsync(project.Solution, CancellationToken.None).ConfigureAwait(false))
{
var checksum = snapshot.SolutionChecksum;
var solutionSyncObject = snapshotService.GetRemotableData(checksum, CancellationToken.None);
var solutionSyncObject = snapshot.GetRemotableData(checksum, CancellationToken.None);
VerifySynchronizationObjectInService(snapshotService, solutionSyncObject);
......@@ -80,7 +80,7 @@ public async Task CreateSolutionSnapshotId_Project()
VerifyChecksumInService(snapshotService, solutionObject.Info, WellKnownSynchronizationKind.SolutionAttributes);
var projectSyncObject = snapshotService.GetRemotableData(solutionObject.Projects.Checksum, CancellationToken.None);
var projectSyncObject = snapshot.GetRemotableData(solutionObject.Projects.Checksum, CancellationToken.None);
VerifySynchronizationObjectInService(snapshotService, projectSyncObject);
Assert.Equal(solutionObject.Projects.Count, 1);
......@@ -110,7 +110,7 @@ public async Task CreateSolutionSnapshotId()
var snapshotService = (new SolutionSynchronizationServiceFactory()).CreateService(document.Project.Solution.Workspace.Services) as ISolutionSynchronizationService;
using (var snapshot = await snapshotService.CreatePinnedRemotableDataScopeAsync(document.Project.Solution, CancellationToken.None).ConfigureAwait(false))
{
var syncObject = snapshotService.GetRemotableData(snapshot.SolutionChecksum, CancellationToken.None);
var syncObject = snapshot.GetRemotableData(snapshot.SolutionChecksum, CancellationToken.None);
var solutionObject = await snapshotService.GetValueAsync<SolutionStateChecksums>(syncObject.Checksum).ConfigureAwait(false);
VerifySynchronizationObjectInService(snapshotService, syncObject);
......@@ -147,7 +147,7 @@ public async Task CreateSolutionSnapshotId_Full()
var snapshotService = (new SolutionSynchronizationServiceFactory()).CreateService(solution.Workspace.Services) as ISolutionSynchronizationService;
using (var snapshot = await snapshotService.CreatePinnedRemotableDataScopeAsync(solution, CancellationToken.None).ConfigureAwait(false))
{
var syncObject = snapshotService.GetRemotableData(snapshot.SolutionChecksum, CancellationToken.None);
var syncObject = snapshot.GetRemotableData(snapshot.SolutionChecksum, CancellationToken.None);
var solutionObject = await snapshotService.GetValueAsync<SolutionStateChecksums>(syncObject.Checksum).ConfigureAwait(false);
VerifySynchronizationObjectInService(snapshotService, syncObject);
......
......@@ -29,7 +29,7 @@ public AssetService(int scopeId, AssetStorage assetStorage)
_assetStorage = assetStorage;
}
public T Deserialize<T>(WellKnownSynchronizationKind kind, ObjectReader reader, CancellationToken cancellationToken)
public static T Deserialize<T>(WellKnownSynchronizationKind kind, ObjectReader reader, CancellationToken cancellationToken)
{
return s_serializer.Deserialize<T>(kind, reader, cancellationToken);
}
......@@ -150,7 +150,7 @@ private async Task<object> RequestAssetAsync(Checksum checksum, CancellationToke
return SpecializedCollections.EmptyList<ValueTuple<Checksum, object>>();
}
var source = _assetStorage.TryGetAssetSource(_scopeId);
var source = _assetStorage.AssetSource;
cancellationToken.ThrowIfCancellationRequested();
Contract.ThrowIfNull(source);
......
......@@ -13,21 +13,14 @@ namespace Microsoft.CodeAnalysis.Remote
internal abstract class AssetSource
{
private readonly AssetStorage _assetStorage;
private readonly int _scopeId;
protected AssetSource(AssetStorage assetStorage, int scopeId)
protected AssetSource(AssetStorage assetStorage)
{
_assetStorage = assetStorage;
_scopeId = scopeId;
_assetStorage.RegisterAssetSource(_scopeId, this);
_assetStorage.SetAssetSource(this);
}
public abstract Task<IList<ValueTuple<Checksum, object>>> RequestAssetsAsync(int scopeId, ISet<Checksum> checksums, CancellationToken cancellationToken);
public void Done()
{
_assetStorage.UnregisterAssetSource(_scopeId);
}
}
}
......@@ -23,15 +23,14 @@ internal class AssetStorage
private readonly TimeSpan _cleanupIntervalTimeSpan;
private readonly TimeSpan _purgeAfterTimeSpan;
private readonly ConcurrentDictionary<int, AssetSource> _assetSources =
new ConcurrentDictionary<int, AssetSource>(concurrencyLevel: 4, capacity: 10);
private readonly ConcurrentDictionary<Checksum, Entry> _globalAssets =
new ConcurrentDictionary<Checksum, Entry>(concurrencyLevel: 4, capacity: 10);
private readonly ConcurrentDictionary<Checksum, Entry> _assets =
new ConcurrentDictionary<Checksum, Entry>(concurrencyLevel: 4, capacity: 10);
private volatile AssetSource _assetSource;
public AssetStorage()
{
// constructor for testing
......@@ -45,26 +44,14 @@ public AssetStorage(TimeSpan cleanupInterval, TimeSpan purgeAfter)
Task.Run(CleanAssetsAsync, CancellationToken.None);
}
public AssetSource TryGetAssetSource(int scopeId)
{
AssetSource source;
if (_assetSources.TryGetValue(scopeId, out source))
{
return source;
}
return null;
}
public void RegisterAssetSource(int scopeId, AssetSource assetSource)
public AssetSource AssetSource
{
Contract.ThrowIfFalse(_assetSources.TryAdd(scopeId, assetSource));
get { return _assetSource; }
}
public void UnregisterAssetSource(int scopeId)
public void SetAssetSource(AssetSource assetSource)
{
AssetSource dummy;
_assetSources.TryRemove(scopeId, out dummy);
_assetSource = assetSource;
}
public bool TryAddGlobalAsset(Checksum checksum, object value)
......
......@@ -25,8 +25,7 @@ private class JsonRpcAssetSource : AssetSource
{
private readonly SnapshotService _owner;
public JsonRpcAssetSource(SnapshotService owner, int scopeId) :
base(owner.AssetStorage, scopeId)
public JsonRpcAssetSource(SnapshotService owner) : base(owner.AssetStorage)
{
_owner = owner;
}
......@@ -76,7 +75,7 @@ private class JsonRpcAssetSource : AssetSource
var kind = (WellKnownSynchronizationKind)reader.ReadInt32();
// in service hub, cancellation means simply closed stream
var @object = _owner.RoslynServices.AssetService.Deserialize<object>(kind, reader, cancellationToken);
var @object = AssetService.Deserialize<object>(kind, reader, cancellationToken);
results.Add(ValueTuple.Create(responseChecksum, @object));
}
......
......@@ -2,55 +2,24 @@
using System;
using System.IO;
using Microsoft.CodeAnalysis.Execution;
using StreamJsonRpc;
namespace Microsoft.CodeAnalysis.Remote
{
/// <summary>
/// Snapshot service in service hub side.
///
/// this service will be used to move over snapshot data from client to service hub
/// this service will be used to move over remotable data from client to service hub
/// </summary>
internal partial class SnapshotService : ServiceHubServiceBase
{
// use gate to make sure same value is seen by multiple threads correctly.
// initialize and disconnect can be called concurrently due to the way
// we implements cancellation
private readonly object _gate;
private AssetSource _source;
private readonly AssetSource _source;
public SnapshotService(Stream stream, IServiceProvider serviceProvider) :
base(serviceProvider, stream)
{
_gate = new object();
_source = new JsonRpcAssetSource(this);
Rpc.StartListening();
}
public override void Initialize(PinnedSolutionInfo solutionInfo)
{
base.Initialize(solutionInfo);
lock (_gate)
{
if (CancellationToken.IsCancellationRequested)
{
return;
}
_source = new JsonRpcAssetSource(this, solutionInfo.ScopeId);
}
}
protected override void OnDisconnected(JsonRpcDisconnectedEventArgs e)
{
lock (_gate)
{
// operation can be cancelled even before initialize is called.
// or in the middle of initialize is running
_source?.Done();
}
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册