// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
#nullable enable
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.ErrorReporting;
using Newtonsoft.Json;
using Roslyn.Utilities;
using StreamJsonRpc;
namespace Microsoft.CodeAnalysis.Remote
{
///
/// Helper type that abstract out JsonRpc communication with extra capability of
/// using raw stream to move over big chunk of data
///
internal sealed class RemoteEndPoint : IDisposable
{
private const string UnexpectedExceptionLogMessage = "Unexpected exception from JSON-RPC";
private static readonly JsonRpcTargetOptions s_jsonRpcTargetOptions = new JsonRpcTargetOptions()
{
// Do not allow JSON-RPC to automatically subscribe to events and remote their calls.
NotifyClientOfEvents = false,
// Only allow public methods (may be on internal types) to be invoked remotely.
AllowNonPublicInvocation = false
};
private static int s_id;
private readonly int _id;
private readonly TraceSource _logger;
private readonly JsonRpc _rpc;
private bool _startedListening;
private JsonRpcDisconnectedEventArgs? _disconnectedReason;
public event Action? Disconnected;
public event Action? UnexpectedExceptionThrown;
public RemoteEndPoint(Stream stream, TraceSource logger, object? incomingCallTarget, IEnumerable? jsonConverters = null)
{
RoslynDebug.Assert(stream != null);
RoslynDebug.Assert(logger != null);
_id = Interlocked.Increment(ref s_id);
_logger = logger;
var jsonFormatter = new JsonMessageFormatter();
if (jsonConverters != null)
{
jsonFormatter.JsonSerializer.Converters.AddRange(jsonConverters);
}
jsonFormatter.JsonSerializer.Converters.Add(AggregateJsonConverter.Instance);
_rpc = new JsonRpc(new HeaderDelimitedMessageHandler(stream, jsonFormatter))
{
CancelLocallyInvokedMethodsWhenConnectionIsClosed = true,
TraceSource = logger
};
if (incomingCallTarget != null)
{
_rpc.AddLocalRpcTarget(incomingCallTarget, s_jsonRpcTargetOptions);
}
_rpc.Disconnected += OnDisconnected;
}
///
/// Must be called before any communication commences.
/// See https://github.com/dotnet/roslyn/issues/16900#issuecomment-277378950.
///
public void StartListening()
{
_rpc.StartListening();
_startedListening = true;
}
public bool IsDisposed
=> _rpc.IsDisposed;
public void Dispose()
{
_rpc.Disconnected -= OnDisconnected;
_rpc.Dispose();
}
public async Task InvokeAsync(string targetName, IReadOnlyList arguments, CancellationToken cancellationToken)
{
Contract.ThrowIfFalse(_startedListening);
// if this end-point is already disconnected do not log more errors:
var logError = _disconnectedReason == null;
try
{
await _rpc.InvokeWithCancellationAsync(targetName, arguments, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (!logError || ReportUnlessCanceled(ex, cancellationToken))
{
// Remote call may fail with different exception even when our cancellation token is signaled
// (e.g. on shutdown if the connection is dropped):
cancellationToken.ThrowIfCancellationRequested();
throw CreateSoftCrashException(ex, cancellationToken);
}
}
public async Task TryInvokeAsync(string targetName, IReadOnlyList arguments, CancellationToken cancellationToken)
{
Contract.ThrowIfFalse(_startedListening);
if (_rpc.IsDisposed)
{
return;
}
try
{
await _rpc.InvokeWithCancellationAsync(targetName, arguments, cancellationToken).ConfigureAwait(false);
}
catch
{
// ignore
}
}
public async Task InvokeAsync(string targetName, IReadOnlyList arguments, CancellationToken cancellationToken)
{
Contract.ThrowIfFalse(_startedListening);
// if this end-point is already disconnected do not log more errors:
var logError = _disconnectedReason == null;
try
{
return await _rpc.InvokeWithCancellationAsync(targetName, arguments, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex) when (!logError || ReportUnlessCanceled(ex, cancellationToken))
{
// Remote call may fail with different exception even when our cancellation token is signaled
// (e.g. on shutdown if the connection is dropped):
cancellationToken.ThrowIfCancellationRequested();
throw CreateSoftCrashException(ex, cancellationToken);
}
}
///
/// Invokes a remote method with specified and
/// establishes a pipe through which the target method may transfer large binary data. The name of the pipe is
/// passed to the target method as an additional argument following the specified .
/// The target method is expected to use
///
/// to write the data to the pipe stream.
///
public async Task InvokeAsync(string targetName, IReadOnlyList arguments, Func> dataReader, CancellationToken cancellationToken)
{
const int BufferSize = 12 * 1024;
Contract.ThrowIfFalse(_startedListening);
// if this end-point is already disconnected do not log more errors:
var logError = _disconnectedReason == null;
using var linkedCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var pipeName = Guid.NewGuid().ToString();
var pipe = new NamedPipeServerStream(pipeName, PipeDirection.In, maxNumberOfServerInstances: 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous);
try
{
// Transfer ownership of the pipe to BufferedStream, it will dispose it:
using var stream = new BufferedStream(pipe, BufferSize);
// send request to asset source
var task = _rpc.InvokeWithCancellationAsync(targetName, arguments.Concat(pipeName).ToArray(), cancellationToken);
// if invoke throws an exception, make sure we raise cancellation.
RaiseCancellationIfInvokeFailed(task, linkedCancellationSource, cancellationToken);
// wait for asset source to respond
await pipe.WaitForConnectionAsync(linkedCancellationSource.Token).ConfigureAwait(false);
// run user task with direct stream
var result = await dataReader(stream, linkedCancellationSource.Token).ConfigureAwait(false);
// wait task to finish
await task.ConfigureAwait(false);
return result;
}
catch (Exception ex) when (!logError || ReportUnlessCanceled(ex, linkedCancellationSource.Token, cancellationToken))
{
// Remote call may fail with different exception even when our cancellation token is signaled
// (e.g. on shutdown if the connection is dropped).
// It's important to use cancelationToken here rather than linked token as there is a slight
// delay in between linked token being signaled and cancellation token being signaled.
cancellationToken.ThrowIfCancellationRequested();
throw CreateSoftCrashException(ex, cancellationToken);
}
}
public static Task WriteDataToNamedPipeAsync(string pipeName, TData data, Func dataWriter, CancellationToken cancellationToken)
=> WriteDataToNamedPipeAsync(pipeName, data,
async (stream, data, cancellationToken) =>
{
using var objectWriter = new ObjectWriter(stream, leaveOpen: true, cancellationToken);
await dataWriter(objectWriter, data, cancellationToken).ConfigureAwait(false);
}, cancellationToken);
public static async Task WriteDataToNamedPipeAsync(string pipeName, TData data, Func dataWriter, CancellationToken cancellationToken)
{
const int BufferSize = 4 * 1024;
try
{
var pipe = new NamedPipeClientStream(serverName: ".", pipeName, PipeDirection.Out);
var success = false;
try
{
await ConnectPipeAsync(pipe, cancellationToken).ConfigureAwait(false);
success = true;
}
finally
{
if (!success)
{
pipe.Dispose();
}
}
// Transfer ownership of the pipe to BufferedStream, it will dispose it:
using var stream = new BufferedStream(pipe, BufferSize);
await dataWriter(stream, data, cancellationToken).ConfigureAwait(false);
await stream.FlushAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception) when (cancellationToken.IsCancellationRequested)
{
// The stream has closed before we had chance to check cancellation.
cancellationToken.ThrowIfCancellationRequested();
}
}
private static async Task ConnectPipeAsync(NamedPipeClientStream pipe, CancellationToken cancellationToken)
{
const int ConnectWithoutTimeout = 1;
const int MaxRetryAttemptsForFileNotFoundException = 3;
const int ErrorSemTimeoutHResult = unchecked((int)0x80070079);
var connectRetryInterval = TimeSpan.FromMilliseconds(20);
var retryCount = 0;
while (true)
{
try
{
// Try connecting without wait.
// Connecting with anything else will consume CPU causing a spin wait.
pipe.Connect(ConnectWithoutTimeout);
return;
}
catch (ObjectDisposedException)
{
// Prefer to throw OperationCanceledException if the caller requested cancellation.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
catch (IOException ex) when (ex.HResult == ErrorSemTimeoutHResult)
{
// Ignore and retry.
}
catch (TimeoutException)
{
// Ignore and retry.
}
catch (FileNotFoundException) when (retryCount < MaxRetryAttemptsForFileNotFoundException)
{
// Ignore and retry
retryCount++;
}
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(connectRetryInterval, cancellationToken).ConfigureAwait(false);
}
}
private static void RaiseCancellationIfInvokeFailed(Task task, CancellationTokenSource linkedCancellationSource, CancellationToken cancellationToken)
{
// if invoke throws an exception, make sure we raise cancellation
_ = task.ContinueWith(p =>
{
try
{
// now, we allow user to kill OOP process, when that happen,
// just raise cancellation.
// otherwise, stream.WaitForDirectConnectionAsync can stuck there forever since
// cancellation from user won't be raised
linkedCancellationSource.Cancel();
}
catch (ObjectDisposedException)
{
// merged cancellation is already disposed
}
}, cancellationToken, TaskContinuationOptions.NotOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
private static bool ReportUnlessCanceled(Exception ex, CancellationToken linkedCancellationToken, CancellationToken cancellationToken)
{
// check whether we are in cancellation mode
// things are either cancelled by us (cancellationToken) or cancelled by OOP (linkedCancellationToken).
// "cancelled by us" means operation user invoked is cancelled by another user action such as explicit cancel, or typing.
// "cancelled by OOP" means operation user invoked is cancelled due to issue on OOP such as user killed OOP process.
if (cancellationToken.IsCancellationRequested)
{
// we are under our own cancellation, we don't care what the exception is.
// due to the way we do cancellation (forcefully closing connection in the middle of reading/writing)
// various exceptions can be thrown. for example, if we close our own named pipe stream in the middle of
// object reader/writer using it, we could get invalid operation exception or invalid cast exception.
return true;
}
if (linkedCancellationToken.IsCancellationRequested)
{
// Connection can be closed when the remote process is killed.
// That will manifest as remote token cancellation.
return true;
}
ReportNonFatalWatson(ex);
return true;
}
private static bool ReportUnlessCanceled(Exception ex, CancellationToken cancellationToken)
{
if (!cancellationToken.IsCancellationRequested)
{
ReportNonFatalWatson(ex);
}
return true;
}
private static void ReportNonFatalWatson(Exception exception)
{
FatalError.ReportWithoutCrash(exception);
}
private SoftCrashException CreateSoftCrashException(Exception ex, CancellationToken cancellationToken)
{
// TODO: revisit https://github.com/dotnet/roslyn/issues/40476
// We are getting unexpected exception from service hub. Rather than doing hard crash on unexpected exception,
// we decided to do soft crash where we show info bar to users saying "VS got corrupted and users should save
// their works and close VS"
UnexpectedExceptionThrown?.Invoke(ex);
// throw soft crash exception
return new SoftCrashException(UnexpectedExceptionLogMessage, ex, cancellationToken);
}
private void LogError(string message)
{
var currentProcess = Process.GetCurrentProcess();
_logger.TraceEvent(TraceEventType.Error, _id, $" [{currentProcess.ProcessName}:{currentProcess.Id}] {message}");
}
private void LogDisconnectInfo(JsonRpcDisconnectedEventArgs? e)
{
if (e != null)
{
LogError($@"Stream disconnected unexpectedly: {e.Reason}, '{e.Description}', LastMessage: {e.LastMessage}, Exception: {e.Exception?.Message}");
}
}
///
/// Handle disconnection event, so that we detect disconnection as soon as it happens
/// without waiting for the next failing remote call. The remote call may not happen
/// if there is an issue with the connection. E.g. the client end point might not receive
/// a callback from server, or the server end point might not receive a call from client.
///
private void OnDisconnected(object? sender, JsonRpcDisconnectedEventArgs e)
{
_disconnectedReason = e;
// Don't log info in cases that are common - such as if we dispose the connection or the remote host process shuts down.
if (e.Reason != DisconnectedReason.LocallyDisposed &&
e.Reason != DisconnectedReason.RemotePartyTerminated)
{
LogDisconnectInfo(e);
}
Disconnected?.Invoke(e);
}
}
}