diff --git a/azure-pipelines-integration.yml b/azure-pipelines-integration.yml index 705eac2f5c3b1b58700b0febf450c2d0f48736c0..1abc12b0f8bed44a1fe2d9c99794bc66c5125571 100644 --- a/azure-pipelines-integration.yml +++ b/azure-pipelines-integration.yml @@ -20,21 +20,20 @@ jobs: name: NetCorePublic-Pool queue: buildpool.windows.10.amd64.vs2019.pre.open strategy: - maxParallel: 2 + maxParallel: 4 matrix: debug_32: _configuration: Debug _oop64bit: false - # 64-bit disabled for https://github.com/dotnet/roslyn/issues/40476 - # debug_64: - # _configuration: Debug - # _oop64bit: true + debug_64: + _configuration: Debug + _oop64bit: true release_32: _configuration: Release _oop64bit: false - # release_64: - # _configuration: Release - # _oop64bit: true + release_64: + _configuration: Release + _oop64bit: true timeoutInMinutes: 135 steps: diff --git a/src/Workspaces/Remote/Core/BrokeredServiceConnection.cs b/src/Workspaces/Remote/Core/BrokeredServiceConnection.cs index d6989fcea9a822b4ac9300dfadd969ff5d6c8f92..37894041cc360d98362dc58f71fd054c75988103 100644 --- a/src/Workspaces/Remote/Core/BrokeredServiceConnection.cs +++ b/src/Workspaces/Remote/Core/BrokeredServiceConnection.cs @@ -148,6 +148,11 @@ public override async ValueTask> TryInvokeAsync(Solut Func> reader, CancellationToken cancellationToken) { + // We can cancel at entry, but once the pipe operations are scheduled we rely on both operations running to + // avoid deadlocks (the exception handler in 'writerTask' ensures progress is made in 'readerTask'). + cancellationToken.ThrowIfCancellationRequested(); + var mustNotCancelToken = CancellationToken.None; + var pipe = new Pipe(); // Create new tasks that both start executing, rather than invoking the delegates directly @@ -168,7 +173,7 @@ public override async ValueTask> TryInvokeAsync(Solut throw; } - }, cancellationToken); + }, mustNotCancelToken); var readerTask = Task.Run( async () => @@ -187,7 +192,7 @@ public override async ValueTask> TryInvokeAsync(Solut { await pipe.Reader.CompleteAsync(exception).ConfigureAwait(false); } - }, cancellationToken); + }, mustNotCancelToken); await Task.WhenAll(writerTask, readerTask).ConfigureAwait(false); diff --git a/src/Workspaces/Remote/Core/RemoteCallback.cs b/src/Workspaces/Remote/Core/RemoteCallback.cs index 29bc9efcf8b8e9a9cec653fd26e95a9b21ab0ba1..128d1ede9d128bfd78e11b4a3a291579c1918e35 100644 --- a/src/Workspaces/Remote/Core/RemoteCallback.cs +++ b/src/Workspaces/Remote/Core/RemoteCallback.cs @@ -7,10 +7,7 @@ using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; -using MessagePack; using Microsoft.CodeAnalysis.ErrorReporting; -using Nerdbank.Streams; -using Newtonsoft.Json; using Roslyn.Utilities; using StreamJsonRpc; @@ -28,12 +25,9 @@ namespace Microsoft.CodeAnalysis.Remote { private readonly T _callback; - public readonly CancellationTokenSource ClientDisconnectedSource; - - public RemoteCallback(T callback, CancellationTokenSource clientDisconnectedSource) + public RemoteCallback(T callback) { _callback = callback; - ClientDisconnectedSource = clientDisconnectedSource; } public async ValueTask InvokeAsync(Func invocation, CancellationToken cancellationToken) @@ -44,7 +38,7 @@ public async ValueTask InvokeAsync(Func invocat } catch (Exception exception) when (ReportUnexpectedException(exception, cancellationToken)) { - throw OnUnexpectedException(cancellationToken); + throw OnUnexpectedException(exception, cancellationToken); } } @@ -56,7 +50,7 @@ public async ValueTask InvokeAsync(Func InvokeAsync(Func InvokeAsync(Func> ReadDataAsync(PipeReader pipeReader, int scopeId, ISet checksums, ISerializerService serializerService, CancellationToken cancellationToken) + public static async ValueTask> ReadDataAsync(PipeReader pipeReader, int scopeId, ISet checksums, ISerializerService serializerService, CancellationToken cancellationToken) { + // We can cancel at entry, but once the pipe operations are scheduled we rely on both operations running to + // avoid deadlocks (the exception handler in 'copyTask' ensures progress is made in the blocking read). + cancellationToken.ThrowIfCancellationRequested(); + var mustNotCancelToken = CancellationToken.None; + // Workaround for ObjectReader not supporting async reading. // Unless we read from the RPC stream asynchronously and with cancallation support we might hang when the server cancels. // https://github.com/dotnet/roslyn/issues/47861 @@ -96,7 +101,7 @@ public static ValueTask> ReadDataAsync(PipeRe Exception? exception = null; // start a task on a thread pool thread copying from the RPC pipe to a local pipe: - Task.Run(async () => + var copyTask = Task.Run(async () => { try { @@ -111,13 +116,13 @@ public static ValueTask> ReadDataAsync(PipeRe await localPipe.Writer.CompleteAsync(exception).ConfigureAwait(false); await pipeReader.CompleteAsync(exception).ConfigureAwait(false); } - }, cancellationToken).Forget(); + }, mustNotCancelToken); // blocking read from the local pipe on the current thread: try { using var stream = localPipe.Reader.AsStream(leaveOpen: false); - return new(ReadData(stream, scopeId, checksums, serializerService, cancellationToken)); + return ReadData(stream, scopeId, checksums, serializerService, cancellationToken); } catch (EndOfStreamException) { @@ -125,6 +130,12 @@ public static ValueTask> ReadDataAsync(PipeRe throw exception ?? ExceptionUtilities.Unreachable; } + finally + { + // Make sure to complete the copy and pipes before returning, otherwise the caller could complete the + // reader and/or writer while they are still in use. + await copyTask.ConfigureAwait(false); + } } public static ImmutableArray<(Checksum, object)> ReadData(Stream stream, int scopeId, ISet checksums, ISerializerService serializerService, CancellationToken cancellationToken) diff --git a/src/Workspaces/Remote/Core/ServiceDescriptor.cs b/src/Workspaces/Remote/Core/ServiceDescriptor.cs index fab36a6c1851ab41ab222e80b1f0bf1f4e430e53..eca5d77864e6c2186d04f926fc96d5cef7ad3653 100644 --- a/src/Workspaces/Remote/Core/ServiceDescriptor.cs +++ b/src/Workspaces/Remote/Core/ServiceDescriptor.cs @@ -4,6 +4,7 @@ using System; using System.IO.Pipelines; +using System.Reflection; using MessagePack; using MessagePack.Resolvers; using Microsoft.ServiceHub.Framework; @@ -76,6 +77,26 @@ protected override JsonRpcConnection CreateConnection(JsonRpc jsonRpc) return connection; } + public override ServiceRpcDescriptor WithMultiplexingStream(MultiplexingStream? multiplexingStream) + { + var baseResult = base.WithMultiplexingStream(multiplexingStream); + if (baseResult is ServiceDescriptor) + return baseResult; + + // work around incorrect implementation in 16.8 Preview 2 + if (MultiplexingStream == multiplexingStream) + return this; + + var result = (ServiceDescriptor)Clone(); + typeof(ServiceRpcDescriptor).GetProperty(nameof(MultiplexingStream))!.SetValue(result, multiplexingStream); + if (result.MultiplexingStreamOptions is null) + return result; + + result = (ServiceDescriptor)result.Clone(); + typeof(ServiceJsonRpcDescriptor).GetProperty(nameof(MultiplexingStreamOptions))!.SetValue(result, value: null); + return result; + } + internal static class TestAccessor { public static MessagePackSerializerOptions Options => s_options; diff --git a/src/Workspaces/Remote/Core/SolutionAssetProvider.cs b/src/Workspaces/Remote/Core/SolutionAssetProvider.cs index 12377dee276e589bee830b5bcd4e1fe816f3dc2c..92d6ebd7f82a2c386275c3ede4150e67139a9bc9 100644 --- a/src/Workspaces/Remote/Core/SolutionAssetProvider.cs +++ b/src/Workspaces/Remote/Core/SolutionAssetProvider.cs @@ -50,6 +50,11 @@ public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, int scopeId, Checks assetMap = await assetStorage.GetAssetsAsync(scopeId, checksums, cancellationToken).ConfigureAwait(false); } + // We can cancel early, but once the pipe operations are scheduled we rely on both operations running to + // avoid deadlocks (the exception handler in 'task1' ensures progress is made in 'task2'). + cancellationToken.ThrowIfCancellationRequested(); + var mustNotCancelToken = CancellationToken.None; + // Work around the lack of async stream writing in ObjectWriter, which is required when writing to the RPC pipe. // Run two tasks - the first synchronously writes to a local pipe and the second asynchronosly transfers the data to the RPC pipe. // @@ -57,7 +62,7 @@ public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, int scopeId, Checks // (non-contiguous) memory allocated for the underlying buffers. The amount of memory is bounded by the total size of the serialized assets. var localPipe = new Pipe(RemoteHostAssetSerialization.PipeOptionsWithUnlimitedWriterBuffer); - Task.Run(() => + var task1 = Task.Run(() => { try { @@ -69,12 +74,14 @@ public async ValueTask GetAssetsAsync(PipeWriter pipeWriter, int scopeId, Checks { // no-op } - }, cancellationToken).Forget(); + }, mustNotCancelToken); // Complete RPC once we send the initial piece of data and start waiting for the writer to send more, // so the client can start reading from the stream. Once CopyPipeDataAsync completes the pipeWriter // the corresponding client-side pipeReader will complete and the data transfer will be finished. - CopyPipeDataAsync().Forget(); + var task2 = CopyPipeDataAsync(); + + await Task.WhenAll(task1, task2).ConfigureAwait(false); async Task CopyPipeDataAsync() { diff --git a/src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs b/src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs index 925f2b58f805c79e3e4b000080afc680c2415d61..fa7c4c00b7cf8373fe2422098929005270fea4a2 100644 --- a/src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs +++ b/src/Workspaces/Remote/ServiceHub/Host/SolutionAssetSource.cs @@ -17,12 +17,10 @@ namespace Microsoft.CodeAnalysis.Remote internal sealed class SolutionAssetSource : IAssetSource { private readonly ServiceBrokerClient _client; - private readonly CancellationTokenSource _clientDisconnectedSource; - public SolutionAssetSource(ServiceBrokerClient client, CancellationTokenSource clientDisconnectedSource) + public SolutionAssetSource(ServiceBrokerClient client) { _client = client; - _clientDisconnectedSource = clientDisconnectedSource; } public async ValueTask> GetAssetsAsync(int scopeId, ISet checksums, ISerializerService serializerService, CancellationToken cancellationToken) @@ -33,7 +31,7 @@ public async ValueTask> GetAssetsAsync(int sc using var provider = await _client.GetProxyAsync(SolutionAssetProvider.ServiceDescriptor, cancellationToken).ConfigureAwait(false); Contract.ThrowIfNull(provider.Proxy); - return await new RemoteCallback(provider.Proxy, _clientDisconnectedSource).InvokeAsync( + return await new RemoteCallback(provider.Proxy).InvokeAsync( (proxy, pipeWriter, cancellationToken) => proxy.GetAssetsAsync(pipeWriter, scopeId, checksums.ToArray(), cancellationToken), (pipeReader, cancellationToken) => RemoteHostAssetSerialization.ReadDataAsync(pipeReader, scopeId, checksums, serializerService, cancellationToken), cancellationToken).ConfigureAwait(false); @@ -47,7 +45,7 @@ public async ValueTask IsExperimentEnabledAsync(string experimentName, Can using var provider = await _client.GetProxyAsync(SolutionAssetProvider.ServiceDescriptor, cancellationToken).ConfigureAwait(false); Contract.ThrowIfNull(provider.Proxy); - return await new RemoteCallback(provider.Proxy, _clientDisconnectedSource).InvokeAsync( + return await new RemoteCallback(provider.Proxy).InvokeAsync( (self, cancellationToken) => provider.Proxy.IsExperimentEnabledAsync(experimentName, cancellationToken), cancellationToken).ConfigureAwait(false); } diff --git a/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.FactoryBase.cs b/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.FactoryBase.cs index 9ec83fa52469cc5a8c43b62ac47425d028fdeb6c..b21fe46c57be11c26d3404283562686bc8cce00e 100644 --- a/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.FactoryBase.cs +++ b/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.FactoryBase.cs @@ -6,7 +6,6 @@ using System.Diagnostics; using System.IO; using System.IO.Pipelines; -using System.Threading; using System.Threading.Tasks; using Microsoft.ServiceHub.Framework; using Microsoft.ServiceHub.Framework.Services; @@ -72,7 +71,7 @@ object IFactory.Create(IDuplexPipe pipe, IServiceProvider hostProvidedServices, var serviceHubTraceSource = (TraceSource)hostProvidedServices.GetService(typeof(TraceSource)); var serverConnection = descriptor.WithTraceSource(serviceHubTraceSource).ConstructRpcConnection(pipe); - var args = new ServiceConstructionArguments(hostProvidedServices, serviceBroker, new CancellationTokenSource()); + var args = new ServiceConstructionArguments(hostProvidedServices, serviceBroker); var service = CreateService(args, descriptor, serverConnection, serviceActivationOptions.ClientRpcTarget); serverConnection.AddLocalRpcTarget(service); @@ -104,7 +103,7 @@ protected sealed override TService CreateService(in ServiceConstructionArguments { Contract.ThrowIfNull(descriptor.ClientInterface); var callback = (TCallback)(clientRpcTarget ?? serverConnection.ConstructRpcClient(descriptor.ClientInterface)); - return CreateService(arguments, new RemoteCallback(callback, arguments.ClientDisconnectedSource)); + return CreateService(arguments, new RemoteCallback(callback)); } } } diff --git a/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.ServiceConstructionArguments.cs b/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.ServiceConstructionArguments.cs index 11d7cc08fbccae4b69a83bddf658064373483da7..4c0b1f6d83761086afc01e89bc8ee075080d728f 100644 --- a/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.ServiceConstructionArguments.cs +++ b/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.ServiceConstructionArguments.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information. using System; -using System.Threading; using Microsoft.ServiceHub.Framework; namespace Microsoft.CodeAnalysis.Remote @@ -14,13 +13,11 @@ internal abstract partial class BrokeredServiceBase { public readonly IServiceProvider ServiceProvider; public readonly IServiceBroker ServiceBroker; - public readonly CancellationTokenSource ClientDisconnectedSource; - public ServiceConstructionArguments(IServiceProvider serviceProvider, IServiceBroker serviceBroker, CancellationTokenSource clientDisconnectedSource) + public ServiceConstructionArguments(IServiceProvider serviceProvider, IServiceBroker serviceBroker) { ServiceProvider = serviceProvider; ServiceBroker = serviceBroker; - ClientDisconnectedSource = clientDisconnectedSource; } } } diff --git a/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.cs b/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.cs index c98c177a67c06c969889f5b35b30e5ab99eb4431..3b178d952a2d5857812e89f3ed6424772deefade 100644 --- a/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.cs +++ b/src/Workspaces/Remote/ServiceHub/Services/BrokeredServiceBase.cs @@ -4,14 +4,10 @@ using System; using System.Diagnostics; -using System.IO; -using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; using Microsoft.CodeAnalysis.ErrorReporting; using Microsoft.ServiceHub.Framework; -using Microsoft.ServiceHub.Framework.Services; -using Nerdbank.Streams; using Roslyn.Utilities; namespace Microsoft.CodeAnalysis.Remote @@ -25,7 +21,6 @@ internal abstract partial class BrokeredServiceBase : IDisposable protected readonly RemoteWorkspaceManager WorkspaceManager; protected readonly SolutionAssetSource SolutionAssetSource; - protected readonly CancellationTokenSource ClientDisconnectedSource; protected readonly ServiceBrokerClient ServiceBrokerClient; // test data are only available when running tests: @@ -46,8 +41,7 @@ protected BrokeredServiceBase(in ServiceConstructionArguments arguments) ServiceBrokerClient = new ServiceBrokerClient(arguments.ServiceBroker); #pragma warning restore - SolutionAssetSource = new SolutionAssetSource(ServiceBrokerClient, arguments.ClientDisconnectedSource); - ClientDisconnectedSource = arguments.ClientDisconnectedSource; + SolutionAssetSource = new SolutionAssetSource(ServiceBrokerClient); } public void Dispose() @@ -69,7 +63,6 @@ protected Task GetSolutionAsync(PinnedSolutionInfo solutionInfo, Cance protected async ValueTask RunServiceAsync(Func> implementation, CancellationToken cancellationToken) { WorkspaceManager.SolutionAssetCache.UpdateLastActivityTime(); - using var _ = LinkToken(ref cancellationToken); try { @@ -84,7 +77,6 @@ protected async ValueTask RunServiceAsync(Func implementation, CancellationToken cancellationToken) { WorkspaceManager.SolutionAssetCache.UpdateLastActivityTime(); - using var _ = LinkToken(ref cancellationToken); try { @@ -95,12 +87,5 @@ protected async ValueTask RunServiceAsync(Func imp throw ExceptionUtilities.Unreachable; } } - - private CancellationTokenSource? LinkToken(ref CancellationToken cancellationToken) - { - var source = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, ClientDisconnectedSource.Token); - cancellationToken = source.Token; - return source; - } } } diff --git a/src/Workspaces/Remote/ServiceHub/Services/DesignerAttributeDiscovery/RemoteDesignerAttributeIncrementalAnalyzer.cs b/src/Workspaces/Remote/ServiceHub/Services/DesignerAttributeDiscovery/RemoteDesignerAttributeIncrementalAnalyzer.cs index 6ba8d8dce90ddfc0eb2222b4c22934419bcc6164..01eec2bf2acc0c79052d76711a458f99bfdc3316 100644 --- a/src/Workspaces/Remote/ServiceHub/Services/DesignerAttributeDiscovery/RemoteDesignerAttributeIncrementalAnalyzer.cs +++ b/src/Workspaces/Remote/ServiceHub/Services/DesignerAttributeDiscovery/RemoteDesignerAttributeIncrementalAnalyzer.cs @@ -25,22 +25,16 @@ public RemoteDesignerAttributeIncrementalAnalyzer(Workspace workspace, RemoteCal protected override async ValueTask ReportProjectRemovedAsync(ProjectId projectId, CancellationToken cancellationToken) { - // cancel whenever the analyzer runner cancels or the client disconnects and the request is canceled: - using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _callback.ClientDisconnectedSource.Token); - await _callback.InvokeAsync( (callback, cancellationToken) => callback.OnProjectRemovedAsync(projectId, cancellationToken), - linkedSource.Token).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); } protected override async ValueTask ReportDesignerAttributeDataAsync(List data, CancellationToken cancellationToken) { - // cancel whenever the analyzer runner cancels or the client disconnects and the request is canceled: - using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _callback.ClientDisconnectedSource.Token); - await _callback.InvokeAsync( (callback, cancellationToken) => callback.ReportDesignerAttributeDataAsync(data.ToImmutableArray(), cancellationToken), - linkedSource.Token).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); } } } diff --git a/src/Workspaces/Remote/ServiceHub/Services/ProjectTelemetry/RemoteProjectTelemetryIncrementalAnalyzer.cs b/src/Workspaces/Remote/ServiceHub/Services/ProjectTelemetry/RemoteProjectTelemetryIncrementalAnalyzer.cs index 3ab993fca1342125b6960c706ebef44390a8ce0a..6cd1108b8b756a5eb8ed4479ed22e516a2df95c2 100644 --- a/src/Workspaces/Remote/ServiceHub/Services/ProjectTelemetry/RemoteProjectTelemetryIncrementalAnalyzer.cs +++ b/src/Workspaces/Remote/ServiceHub/Services/ProjectTelemetry/RemoteProjectTelemetryIncrementalAnalyzer.cs @@ -7,7 +7,6 @@ using System.Threading.Tasks; using Microsoft.CodeAnalysis.ProjectTelemetry; using Microsoft.CodeAnalysis.SolutionCrawler; -using StreamJsonRpc; namespace Microsoft.CodeAnalysis.Remote { @@ -65,12 +64,9 @@ public override async Task AnalyzeProjectAsync(Project project, bool semanticsCh _projectToData[projectId] = info; } - // cancel whenever the analyzer runner cancels or the client disconnects and the request is canceled: - using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _callback.ClientDisconnectedSource.Token); - await _callback.InvokeAsync( (callback, cancellationToken) => callback.ReportProjectTelemetryDataAsync(info, cancellationToken), - linkedSource.Token).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); } public override Task RemoveProjectAsync(ProjectId projectId, CancellationToken cancellationToken) diff --git a/src/Workspaces/Remote/ServiceHub/Services/TodoCommentsDiscovery/RemoteTodoCommentsIncrementalAnalyzer.cs b/src/Workspaces/Remote/ServiceHub/Services/TodoCommentsDiscovery/RemoteTodoCommentsIncrementalAnalyzer.cs index a8018eaa4204347e262ef23f582fec28d82ab086..a7149bde5e24601b52cb038244e1fe19eea6c887 100644 --- a/src/Workspaces/Remote/ServiceHub/Services/TodoCommentsDiscovery/RemoteTodoCommentsIncrementalAnalyzer.cs +++ b/src/Workspaces/Remote/ServiceHub/Services/TodoCommentsDiscovery/RemoteTodoCommentsIncrementalAnalyzer.cs @@ -21,12 +21,9 @@ public RemoteTodoCommentsIncrementalAnalyzer(RemoteCallback data, CancellationToken cancellationToken) { - // cancel whenever the analyzer runner cancels or the client disconnects and the request is canceled: - using var linkedSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _callback.ClientDisconnectedSource.Token); - await _callback.InvokeAsync( (callback, cancellationToken) => callback.ReportTodoCommentDataAsync(documentId, data, cancellationToken), - linkedSource.Token).ConfigureAwait(false); + cancellationToken).ConfigureAwait(false); } } }