未验证 提交 c5a3f49c 编写于 作者: M Miha Zupan 提交者: GitHub

Fix System.Net.Sockets telemetry (#42726)

上级 e79a543d
...@@ -9,90 +9,91 @@ namespace System.Diagnostics.Tracing ...@@ -9,90 +9,91 @@ namespace System.Diagnostics.Tracing
/// <summary>Simple event listener than invokes a callback for each event received.</summary> /// <summary>Simple event listener than invokes a callback for each event received.</summary>
internal sealed class TestEventListener : EventListener internal sealed class TestEventListener : EventListener
{ {
private readonly string _targetSourceName; private class Settings
private readonly Guid _targetSourceGuid; {
private readonly EventLevel _level; public EventLevel Level;
public EventKeywords Keywords;
}
private readonly Dictionary<string, Settings> _names = new Dictionary<string, Settings>();
private readonly Dictionary<Guid, Settings> _guids = new Dictionary<Guid, Settings>();
private readonly double? _eventCounterInterval; private readonly double? _eventCounterInterval;
private Action<EventWrittenEventArgs> _eventWritten; private Action<EventWrittenEventArgs> _eventWritten;
private List<EventSource> _tmpEventSourceList = new List<EventSource>(); private readonly List<EventSource> _eventSourceList = new List<EventSource>();
public TestEventListener(string targetSourceName, EventLevel level, double? eventCounterInterval = null) public TestEventListener(string targetSourceName, EventLevel level, double? eventCounterInterval = null)
{ {
// Store the arguments
_targetSourceName = targetSourceName;
_level = level;
_eventCounterInterval = eventCounterInterval; _eventCounterInterval = eventCounterInterval;
AddSource(targetSourceName, level);
LoadSourceList();
} }
public TestEventListener(Guid targetSourceGuid, EventLevel level, double? eventCounterInterval = null) public TestEventListener(Guid targetSourceGuid, EventLevel level, double? eventCounterInterval = null)
{ {
// Store the arguments
_targetSourceGuid = targetSourceGuid;
_level = level;
_eventCounterInterval = eventCounterInterval; _eventCounterInterval = eventCounterInterval;
AddSource(targetSourceGuid, level);
LoadSourceList();
} }
private void LoadSourceList() public void AddSource(string name, EventLevel level, EventKeywords keywords = EventKeywords.All) =>
AddSource(name, null, level, keywords);
public void AddSource(Guid guid, EventLevel level, EventKeywords keywords = EventKeywords.All) =>
AddSource(null, guid, level, keywords);
private void AddSource(string name, Guid? guid, EventLevel level, EventKeywords keywords)
{ {
// The base constructor, which is called before this constructor, lock (_eventSourceList)
// will invoke the virtual OnEventSourceCreated method for each
// existing EventSource, which means OnEventSourceCreated will be
// called before _targetSourceGuid and _level have been set. As such,
// we store a temporary list that just exists from the moment this instance
// is created (instance field initializers run before the base constructor)
// and until we finish construction... in that window, OnEventSourceCreated
// will store the sources into the list rather than try to enable them directly,
// and then here we can enumerate that list, then clear it out.
List<EventSource> sources;
lock (_tmpEventSourceList)
{ {
sources = _tmpEventSourceList; var settings = new Settings()
_tmpEventSourceList = null; {
} Level = level,
foreach (EventSource source in sources) Keywords = keywords
{ };
EnableSourceIfMatch(source);
if (name is not null)
_names.Add(name, settings);
if (guid.HasValue)
_guids.Add(guid.Value, settings);
foreach (EventSource source in _eventSourceList)
{
if (name == source.Name || guid == source.Guid)
{
EnableEventSource(source, level, keywords);
}
}
} }
} }
public void AddActivityTracking() =>
AddSource("System.Threading.Tasks.TplEventSource", EventLevel.Informational, (EventKeywords)0x80 /* TasksFlowActivityIds */);
protected override void OnEventSourceCreated(EventSource eventSource) protected override void OnEventSourceCreated(EventSource eventSource)
{ {
List<EventSource> tmp = _tmpEventSourceList; lock (_eventSourceList)
if (tmp != null)
{ {
lock (tmp) _eventSourceList.Add(eventSource);
if (_names.TryGetValue(eventSource.Name, out Settings settings) ||
_guids.TryGetValue(eventSource.Guid, out settings))
{ {
if (_tmpEventSourceList != null) EnableEventSource(eventSource, settings.Level, settings.Keywords);
{
_tmpEventSourceList.Add(eventSource);
return;
}
} }
} }
EnableSourceIfMatch(eventSource);
} }
private void EnableSourceIfMatch(EventSource source) private void EnableEventSource(EventSource source, EventLevel level, EventKeywords keywords)
{ {
if (source.Name.Equals(_targetSourceName) || var args = new Dictionary<string, string>();
source.Guid.Equals(_targetSourceGuid))
if (_eventCounterInterval != null)
{ {
if (_eventCounterInterval != null) args.Add("EventCounterIntervalSec", _eventCounterInterval.ToString());
{
var args = new Dictionary<string, string> { { "EventCounterIntervalSec", _eventCounterInterval?.ToString() } };
EnableEvents(source, _level, EventKeywords.All, args);
}
else
{
EnableEvents(source, _level);
}
} }
EnableEvents(source, level, keywords, args);
} }
public void RunWithCallback(Action<EventWrittenEventArgs> handler, Action body) public void RunWithCallback(Action<EventWrittenEventArgs> handler, Action body)
......
...@@ -466,34 +466,9 @@ private static Task GetHostEntryOrAddressesCoreAsync(string hostName, bool justR ...@@ -466,34 +466,9 @@ private static Task GetHostEntryOrAddressesCoreAsync(string hostName, bool justR
if (NameResolutionTelemetry.Log.IsEnabled()) if (NameResolutionTelemetry.Log.IsEnabled())
{ {
ValueStopwatch stopwatch = NameResolutionTelemetry.Log.BeforeResolution(hostName); return justAddresses
? (Task)GetAddrInfoWithTelemetryAsync<IPAddress[]>(hostName, justAddresses)
Task coreTask; : (Task)GetAddrInfoWithTelemetryAsync<IPHostEntry>(hostName, justAddresses);
try
{
coreTask = NameResolutionPal.GetAddrInfoAsync(hostName, justAddresses);
}
catch when (LogFailure(stopwatch))
{
Debug.Fail("LogFailure should return false");
throw;
}
coreTask.ContinueWith(
(task, state) =>
{
NameResolutionTelemetry.Log.AfterResolution(
stopwatch: (ValueStopwatch)state!,
successful: task.IsCompletedSuccessfully);
},
state: stopwatch,
cancellationToken: default,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
// coreTask is not actually a base Task, but Task<IPHostEntry> / Task<IPAddress[]>
// We have to return it and not the continuation
return coreTask;
} }
else else
{ {
...@@ -506,6 +481,23 @@ private static Task GetHostEntryOrAddressesCoreAsync(string hostName, bool justR ...@@ -506,6 +481,23 @@ private static Task GetHostEntryOrAddressesCoreAsync(string hostName, bool justR
RunAsync(s => GetHostEntryCore((string)s), hostName); RunAsync(s => GetHostEntryCore((string)s), hostName);
} }
private static async Task<T> GetAddrInfoWithTelemetryAsync<T>(string hostName, bool justAddresses)
where T : class
{
ValueStopwatch stopwatch = NameResolutionTelemetry.Log.BeforeResolution(hostName);
T? result = null;
try
{
result = await ((Task<T>)NameResolutionPal.GetAddrInfoAsync(hostName, justAddresses)).ConfigureAwait(false);
return result;
}
finally
{
NameResolutionTelemetry.Log.AfterResolution(stopwatch, successful: result is not null);
}
}
private static Task<TResult> RunAsync<TResult>(Func<object, TResult> func, object arg) => private static Task<TResult> RunAsync<TResult>(Func<object, TResult> func, object arg) =>
Task.Factory.StartNew(func!, arg, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); Task.Factory.StartNew(func!, arg, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
......
...@@ -1091,25 +1091,37 @@ public Socket Accept() ...@@ -1091,25 +1091,37 @@ public Socket Accept()
// This may throw ObjectDisposedException. // This may throw ObjectDisposedException.
SafeSocketHandle acceptedSocketHandle; SafeSocketHandle acceptedSocketHandle;
SocketError errorCode = SocketPal.Accept( SocketError errorCode;
_handle, try
socketAddress.Buffer, {
ref socketAddress.InternalSize, errorCode = SocketPal.Accept(
out acceptedSocketHandle); _handle,
socketAddress.Buffer,
ref socketAddress.InternalSize,
out acceptedSocketHandle);
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);
}
throw;
}
// Throw an appropriate SocketException if the native call fails. // Throw an appropriate SocketException if the native call fails.
if (errorCode != SocketError.Success) if (errorCode != SocketError.Success)
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null);
Debug.Assert(acceptedSocketHandle.IsInvalid); Debug.Assert(acceptedSocketHandle.IsInvalid);
UpdateAcceptSocketErrorForDisposed(ref errorCode); UpdateAcceptSocketErrorForDisposed(ref errorCode);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode); UpdateStatusAfterSocketErrorAndThrowException(errorCode);
} }
else
{ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Success);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
}
Debug.Assert(!acceptedSocketHandle.IsInvalid); Debug.Assert(!acceptedSocketHandle.IsInvalid);
...@@ -2127,8 +2139,6 @@ private bool CanUseConnectEx(EndPoint remoteEP) ...@@ -2127,8 +2139,6 @@ private bool CanUseConnectEx(EndPoint remoteEP)
internal IAsyncResult UnsafeBeginConnect(EndPoint remoteEP, AsyncCallback? callback, object? state, bool flowContext = false) internal IAsyncResult UnsafeBeginConnect(EndPoint remoteEP, AsyncCallback? callback, object? state, bool flowContext = false)
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStart(remoteEP);
if (CanUseConnectEx(remoteEP)) if (CanUseConnectEx(remoteEP))
{ {
return BeginConnectEx(remoteEP, flowContext, callback, state); return BeginConnectEx(remoteEP, flowContext, callback, state);
...@@ -2348,7 +2358,23 @@ public void Disconnect(bool reuseSocket) ...@@ -2348,7 +2358,23 @@ public void Disconnect(bool reuseSocket)
// int - Return code from async Connect, 0 for success, SocketError.NotConnected otherwise // int - Return code from async Connect, 0 for success, SocketError.NotConnected otherwise
public void EndConnect(IAsyncResult asyncResult) public void EndConnect(IAsyncResult asyncResult)
{ {
ThrowIfDisposed(); // There are three AsyncResult types we support in EndConnect:
// - ConnectAsyncResult - a fully synchronous operation that already completed, wrapped in an AsyncResult
// - MultipleAddressConnectAsyncResult - a parent operation for other Connects (connecting to DnsEndPoint)
// - ConnectOverlappedAsyncResult - a connect to an IPEndPoint
// For Telemetry, we already logged everything for ConnectAsyncResult in DoConnect,
// and we want to avoid logging duplicated events for MultipleAddressConnect.
// Therefore, we always check that asyncResult is ConnectOverlapped before logging.
if (Disposed)
{
if (SocketsTelemetry.Log.IsEnabled() && asyncResult is ConnectOverlappedAsyncResult)
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket);
}
ThrowObjectDisposedException();
}
// Validate input parameters. // Validate input parameters.
if (asyncResult == null) if (asyncResult == null)
...@@ -2376,13 +2402,13 @@ public void EndConnect(IAsyncResult asyncResult) ...@@ -2376,13 +2402,13 @@ public void EndConnect(IAsyncResult asyncResult)
if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"asyncResult:{asyncResult}"); if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"asyncResult:{asyncResult}");
Exception? ex = castedAsyncResult.Result as Exception; Exception? ex = castedAsyncResult.Result as Exception;
if (ex != null || (SocketError)castedAsyncResult.ErrorCode != SocketError.Success) if (ex != null || (SocketError)castedAsyncResult.ErrorCode != SocketError.Success)
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectFailedAndStop((SocketError)castedAsyncResult.ErrorCode, ex?.Message); SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode;
if (ex == null) if (ex == null)
{ {
SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode;
UpdateConnectSocketErrorForDisposed(ref errorCode); UpdateConnectSocketErrorForDisposed(ref errorCode);
// Update the internal state of this socket according to the error before throwing. // Update the internal state of this socket according to the error before throwing.
SocketException se = SocketExceptionFactory.CreateSocketException((int)errorCode, castedAsyncResult.RemoteEndPoint); SocketException se = SocketExceptionFactory.CreateSocketException((int)errorCode, castedAsyncResult.RemoteEndPoint);
...@@ -2390,11 +2416,19 @@ public void EndConnect(IAsyncResult asyncResult) ...@@ -2390,11 +2416,19 @@ public void EndConnect(IAsyncResult asyncResult)
ex = se; ex = se;
} }
if (SocketsTelemetry.Log.IsEnabled() && castedAsyncResult is ConnectOverlappedAsyncResult)
{
SocketsTelemetry.Log.AfterConnect(errorCode, ex.Message);
}
if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, ex); if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, ex);
ExceptionDispatchInfo.Throw(ex); ExceptionDispatchInfo.Throw(ex);
} }
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStop(); if (SocketsTelemetry.Log.IsEnabled() && castedAsyncResult is ConnectOverlappedAsyncResult)
{
SocketsTelemetry.Log.AfterConnect(SocketError.Success);
}
if (NetEventSource.Log.IsEnabled()) NetEventSource.Connected(this, LocalEndPoint, RemoteEndPoint); if (NetEventSource.Log.IsEnabled()) NetEventSource.Connected(this, LocalEndPoint, RemoteEndPoint);
} }
...@@ -3512,21 +3546,33 @@ private IAsyncResult BeginAcceptCommon(Socket? acceptSocket, int receiveSize, As ...@@ -3512,21 +3546,33 @@ private IAsyncResult BeginAcceptCommon(Socket? acceptSocket, int receiveSize, As
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_rightEndPoint); if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_rightEndPoint);
int socketAddressSize = GetAddressSize(_rightEndPoint); int socketAddressSize = GetAddressSize(_rightEndPoint);
SocketError errorCode = SocketPal.AcceptAsync(this, _handle, acceptHandle, receiveSize, socketAddressSize, asyncResult); SocketError errorCode;
try
{
errorCode = SocketPal.AcceptAsync(this, _handle, acceptHandle, receiveSize, socketAddressSize, asyncResult);
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);
}
throw;
}
if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"AcceptAsync returns:{errorCode} {asyncResult}"); if (NetEventSource.Log.IsEnabled()) NetEventSource.Info(this, $"AcceptAsync returns:{errorCode} {asyncResult}");
// Throw an appropriate SocketException if the native call fails synchronously. // Throw an appropriate SocketException if the native call fails synchronously.
if (!CheckErrorAndUpdateStatus(errorCode)) if (!CheckErrorAndUpdateStatus(errorCode))
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null);
UpdateAcceptSocketErrorForDisposed(ref errorCode); UpdateAcceptSocketErrorForDisposed(ref errorCode);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode);
throw new SocketException((int)errorCode); throw new SocketException((int)errorCode);
} }
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
// Finish the flow capture, maybe complete here. // Finish the flow capture, maybe complete here.
asyncResult.FinishPostingAsyncOp(ref Caches.AcceptClosureCache); asyncResult.FinishPostingAsyncOp(ref Caches.AcceptClosureCache);
...@@ -3552,7 +3598,12 @@ public Socket EndAccept(IAsyncResult asyncResult) ...@@ -3552,7 +3598,12 @@ public Socket EndAccept(IAsyncResult asyncResult)
} }
private Socket EndAcceptCommon(out byte[]? buffer, out int bytesTransferred, IAsyncResult asyncResult) private Socket EndAcceptCommon(out byte[]? buffer, out int bytesTransferred, IAsyncResult asyncResult)
{ {
ThrowIfDisposed(); if (Disposed)
{
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted);
ThrowObjectDisposedException();
}
// Validate input parameters. // Validate input parameters.
if (asyncResult == null) if (asyncResult == null)
...@@ -3573,21 +3624,23 @@ private Socket EndAcceptCommon(out byte[]? buffer, out int bytesTransferred, IAs ...@@ -3573,21 +3624,23 @@ private Socket EndAcceptCommon(out byte[]? buffer, out int bytesTransferred, IAs
bytesTransferred = (int)castedAsyncResult.BytesTransferred; bytesTransferred = (int)castedAsyncResult.BytesTransferred;
buffer = castedAsyncResult.Buffer; buffer = castedAsyncResult.Buffer;
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.BytesReceived(bytesTransferred);
castedAsyncResult.EndCalled = true; castedAsyncResult.EndCalled = true;
// Throw an appropriate SocketException if the native call failed asynchronously. // Throw an appropriate SocketException if the native call failed asynchronously.
SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode; SocketError errorCode = (SocketError)castedAsyncResult.ErrorCode;
if (errorCode != SocketError.Success) if (errorCode != SocketError.Success)
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(errorCode, null);
UpdateAcceptSocketErrorForDisposed(ref errorCode); UpdateAcceptSocketErrorForDisposed(ref errorCode);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(errorCode);
UpdateStatusAfterSocketErrorAndThrowException(errorCode); UpdateStatusAfterSocketErrorAndThrowException(errorCode);
} }
else
{ if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterAccept(SocketError.Success);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
}
if (NetEventSource.Log.IsEnabled()) NetEventSource.Accepted(socket, socket.RemoteEndPoint, socket.LocalEndPoint); if (NetEventSource.Log.IsEnabled()) NetEventSource.Accepted(socket, socket.RemoteEndPoint, socket.LocalEndPoint);
return socket; return socket;
...@@ -3641,16 +3694,23 @@ public bool AcceptAsync(SocketAsyncEventArgs e) ...@@ -3641,16 +3694,23 @@ public bool AcceptAsync(SocketAsyncEventArgs e)
SafeSocketHandle? acceptHandle; SafeSocketHandle? acceptHandle;
e.AcceptSocket = GetOrCreateAcceptSocket(e.AcceptSocket, true, "AcceptSocket", out acceptHandle); e.AcceptSocket = GetOrCreateAcceptSocket(e.AcceptSocket, true, "AcceptSocket", out acceptHandle);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_rightEndPoint!);
// Prepare for and make the native call. // Prepare for and make the native call.
e.StartOperationCommon(this, SocketAsyncOperation.Accept); e.StartOperationCommon(this, SocketAsyncOperation.Accept);
e.StartOperationAccept(); e.StartOperationAccept();
SocketError socketError = SocketError.Success; SocketError socketError;
try try
{ {
socketError = e.DoOperationAccept(this, _handle, acceptHandle); socketError = e.DoOperationAccept(this, _handle, acceptHandle);
} }
catch catch (Exception ex)
{ {
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterAccept(SocketError.Interrupted, ex.Message);
}
// Clear in-use flag on event args object. // Clear in-use flag on event args object.
e.Complete(); e.Complete();
throw; throw;
...@@ -3741,12 +3801,17 @@ private bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket) ...@@ -3741,12 +3801,17 @@ private bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket)
_rightEndPoint = endPointSnapshot; _rightEndPoint = endPointSnapshot;
} }
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.ConnectStart(e._socketAddress!);
}
// Prepare for the native call. // Prepare for the native call.
e.StartOperationCommon(this, SocketAsyncOperation.Connect); e.StartOperationCommon(this, SocketAsyncOperation.Connect);
e.StartOperationConnect(multipleConnect: null, userSocket); e.StartOperationConnect(multipleConnect: null, userSocket);
// Make the native call. // Make the native call.
SocketError socketError = SocketError.Success; SocketError socketError;
try try
{ {
if (CanUseConnectEx(endPointSnapshot)) if (CanUseConnectEx(endPointSnapshot))
...@@ -3759,8 +3824,13 @@ private bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket) ...@@ -3759,8 +3824,13 @@ private bool ConnectAsync(SocketAsyncEventArgs e, bool userSocket)
socketError = e.DoOperationConnect(this, _handle); socketError = e.DoOperationConnect(this, _handle);
} }
} }
catch catch (Exception ex)
{ {
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
}
_rightEndPoint = oldEndPoint; _rightEndPoint = oldEndPoint;
// Clear in-use flag on event args object. // Clear in-use flag on event args object.
...@@ -4197,22 +4267,36 @@ private void DoConnect(EndPoint endPointSnapshot, Internals.SocketAddress socket ...@@ -4197,22 +4267,36 @@ private void DoConnect(EndPoint endPointSnapshot, Internals.SocketAddress socket
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStart(socketAddress); if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStart(socketAddress);
SocketError errorCode = SocketPal.Connect(_handle, socketAddress.Buffer, socketAddress.Size); SocketError errorCode;
try
{
errorCode = SocketPal.Connect(_handle, socketAddress.Buffer, socketAddress.Size);
}
catch (Exception ex)
{
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
}
throw;
}
// Throw an appropriate SocketException if the native call fails. // Throw an appropriate SocketException if the native call fails.
if (errorCode != SocketError.Success) if (errorCode != SocketError.Success)
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectFailedAndStop(errorCode, null);
UpdateConnectSocketErrorForDisposed(ref errorCode); UpdateConnectSocketErrorForDisposed(ref errorCode);
// Update the internal state of this socket according to the error before throwing. // Update the internal state of this socket according to the error before throwing.
SocketException socketException = SocketExceptionFactory.CreateSocketException((int)errorCode, endPointSnapshot); SocketException socketException = SocketExceptionFactory.CreateSocketException((int)errorCode, endPointSnapshot);
UpdateStatusAfterSocketError(socketException); UpdateStatusAfterSocketError(socketException);
if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, socketException); if (NetEventSource.Log.IsEnabled()) NetEventSource.Error(this, socketException);
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(errorCode);
throw socketException; throw socketException;
} }
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStop(); if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(SocketError.Success);
if (_rightEndPoint == null) if (_rightEndPoint == null)
{ {
...@@ -4593,6 +4677,14 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa ...@@ -4593,6 +4677,14 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa
EndPoint endPointSnapshot = remoteEP; EndPoint endPointSnapshot = remoteEP;
Internals.SocketAddress socketAddress = Serialize(ref endPointSnapshot); Internals.SocketAddress socketAddress = Serialize(ref endPointSnapshot);
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.ConnectStart(socketAddress);
// Ignore flowContext when using Telemetry to avoid losing Activity tracking
flowContext = true;
}
WildcardBindForConnectIfNecessary(endPointSnapshot.AddressFamily); WildcardBindForConnectIfNecessary(endPointSnapshot.AddressFamily);
// Allocate the async result and the event we'll pass to the thread pool. // Allocate the async result and the event we'll pass to the thread pool.
...@@ -4615,8 +4707,13 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa ...@@ -4615,8 +4707,13 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa
{ {
errorCode = SocketPal.ConnectAsync(this, _handle, socketAddress.Buffer, socketAddress.Size, asyncResult); errorCode = SocketPal.ConnectAsync(this, _handle, socketAddress.Buffer, socketAddress.Size, asyncResult);
} }
catch catch (Exception ex)
{ {
if (SocketsTelemetry.Log.IsEnabled())
{
SocketsTelemetry.Log.AfterConnect(SocketError.NotSocket, ex.Message);
}
// _rightEndPoint will always equal oldEndPoint. // _rightEndPoint will always equal oldEndPoint.
_rightEndPoint = oldEndPoint; _rightEndPoint = oldEndPoint;
throw; throw;
...@@ -4636,6 +4733,8 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa ...@@ -4636,6 +4733,8 @@ private IAsyncResult BeginConnectEx(EndPoint remoteEP, bool flowContext, AsyncCa
// Update the internal state of this socket according to the error before throwing. // Update the internal state of this socket according to the error before throwing.
_rightEndPoint = oldEndPoint; _rightEndPoint = oldEndPoint;
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AfterConnect(errorCode);
throw new SocketException((int)errorCode); throw new SocketException((int)errorCode);
} }
......
...@@ -155,6 +155,9 @@ private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred, ...@@ -155,6 +155,9 @@ private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred,
// so we can set the results right now. // so we can set the results right now.
FreeNativeOverlapped(overlapped); FreeNativeOverlapped(overlapped);
FinishOperationSyncSuccess(bytesTransferred, SocketFlags.None); FinishOperationSyncSuccess(bytesTransferred, SocketFlags.None);
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
return SocketError.Success; return SocketError.Success;
} }
...@@ -170,6 +173,9 @@ private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred, ...@@ -170,6 +173,9 @@ private unsafe SocketError ProcessIOCPResult(bool success, int bytesTransferred,
// Completed synchronously with a failure. // Completed synchronously with a failure.
FreeNativeOverlapped(overlapped); FreeNativeOverlapped(overlapped);
FinishOperationSyncFailure(socketError, bytesTransferred, SocketFlags.None); FinishOperationSyncFailure(socketError, bytesTransferred, SocketFlags.None);
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
return socketError; return socketError;
} }
...@@ -202,6 +208,9 @@ private unsafe SocketError ProcessIOCPResultWithSingleBufferHandle(SocketError s ...@@ -202,6 +208,9 @@ private unsafe SocketError ProcessIOCPResultWithSingleBufferHandle(SocketError s
_singleBufferHandleState = SingleBufferHandleState.None; _singleBufferHandleState = SingleBufferHandleState.None;
FreeNativeOverlapped(overlapped); FreeNativeOverlapped(overlapped);
FinishOperationSyncSuccess(bytesTransferred, SocketFlags.None); FinishOperationSyncSuccess(bytesTransferred, SocketFlags.None);
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
return SocketError.Success; return SocketError.Success;
} }
...@@ -218,6 +227,9 @@ private unsafe SocketError ProcessIOCPResultWithSingleBufferHandle(SocketError s ...@@ -218,6 +227,9 @@ private unsafe SocketError ProcessIOCPResultWithSingleBufferHandle(SocketError s
_singleBufferHandleState = SingleBufferHandleState.None; _singleBufferHandleState = SingleBufferHandleState.None;
FreeNativeOverlapped(overlapped); FreeNativeOverlapped(overlapped);
FinishOperationSyncFailure(socketError, bytesTransferred, SocketFlags.None); FinishOperationSyncFailure(socketError, bytesTransferred, SocketFlags.None);
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
return socketError; return socketError;
} }
......
...@@ -198,11 +198,38 @@ public int BytesTransferred ...@@ -198,11 +198,38 @@ public int BytesTransferred
public event EventHandler<SocketAsyncEventArgs>? Completed; public event EventHandler<SocketAsyncEventArgs>? Completed;
private void OnCompletedInternal()
{
if (SocketsTelemetry.Log.IsEnabled())
{
AfterConnectAcceptTelemetry();
}
OnCompleted(this);
}
protected virtual void OnCompleted(SocketAsyncEventArgs e) protected virtual void OnCompleted(SocketAsyncEventArgs e)
{ {
Completed?.Invoke(e._currentSocket, e); Completed?.Invoke(e._currentSocket, e);
} }
private void AfterConnectAcceptTelemetry()
{
switch (LastOperation)
{
case SocketAsyncOperation.Accept:
SocketsTelemetry.Log.AfterAccept(SocketError);
break;
case SocketAsyncOperation.Connect:
if (_multipleConnect is null)
{
SocketsTelemetry.Log.AfterConnect(SocketError);
}
break;
}
}
// DisconnectResuseSocket property. // DisconnectResuseSocket property.
public bool DisconnectReuseSocket public bool DisconnectReuseSocket
{ {
...@@ -420,7 +447,7 @@ internal void SetResults(Exception exception, int bytesTransferred, SocketFlags ...@@ -420,7 +447,7 @@ internal void SetResults(Exception exception, int bytesTransferred, SocketFlags
private static void ExecutionCallback(object? state) private static void ExecutionCallback(object? state)
{ {
var thisRef = (SocketAsyncEventArgs)state!; var thisRef = (SocketAsyncEventArgs)state!;
thisRef.OnCompleted(thisRef); thisRef.OnCompletedInternal();
} }
// Marks this object as no longer "in-use". Will also execute a Dispose deferred // Marks this object as no longer "in-use". Will also execute a Dispose deferred
...@@ -509,7 +536,9 @@ internal void StartOperationCommon(Socket? socket, SocketAsyncOperation operatio ...@@ -509,7 +536,9 @@ internal void StartOperationCommon(Socket? socket, SocketAsyncOperation operatio
_currentSocket = socket; _currentSocket = socket;
// Capture execution context if needed (it is unless explicitly disabled). // Capture execution context if needed (it is unless explicitly disabled).
if (_flowExecutionContext) // If Telemetry is enabled, make sure to capture the context if we're making a Connect or Accept call to preserve the activity
if (_flowExecutionContext ||
(SocketsTelemetry.Log.IsEnabled() && (operation == SocketAsyncOperation.Connect || operation == SocketAsyncOperation.Accept)))
{ {
_context = ExecutionContext.Capture(); _context = ExecutionContext.Capture();
} }
...@@ -547,8 +576,6 @@ internal void StartOperationAccept() ...@@ -547,8 +576,6 @@ internal void StartOperationAccept()
_acceptBuffer = new byte[_acceptAddressBufferCount]; _acceptBuffer = new byte[_acceptAddressBufferCount];
} }
} }
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStart(_currentSocket!._rightEndPoint!);
} }
internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool userSocket) internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool userSocket)
...@@ -556,9 +583,6 @@ internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool ...@@ -556,9 +583,6 @@ internal void StartOperationConnect(MultipleConnectAsync? multipleConnect, bool
_multipleConnect = multipleConnect; _multipleConnect = multipleConnect;
_connectSocket = null; _connectSocket = null;
_userSocket = userSocket; _userSocket = userSocket;
// Log only the actual connect operation to a remote endpoint.
if (SocketsTelemetry.Log.IsEnabled() && multipleConnect == null) SocketsTelemetry.Log.ConnectStart(_socketAddress!);
} }
internal void CancelConnectAsync() internal void CancelConnectAsync()
...@@ -572,8 +596,6 @@ internal void CancelConnectAsync() ...@@ -572,8 +596,6 @@ internal void CancelConnectAsync()
} }
else else
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectCanceledAndStop();
// Otherwise we're doing a normal ConnectAsync - cancel it by closing the socket. // Otherwise we're doing a normal ConnectAsync - cancel it by closing the socket.
// _currentSocket will only be null if _multipleConnect was set, so we don't have to check. // _currentSocket will only be null if _multipleConnect was set, so we don't have to check.
if (_currentSocket == null) if (_currentSocket == null)
...@@ -589,12 +611,6 @@ internal void FinishOperationSyncFailure(SocketError socketError, int bytesTrans ...@@ -589,12 +611,6 @@ internal void FinishOperationSyncFailure(SocketError socketError, int bytesTrans
{ {
SetResults(socketError, bytesTransferred, flags); SetResults(socketError, bytesTransferred, flags);
if (SocketsTelemetry.Log.IsEnabled())
{
if (_multipleConnect == null && _completedOperation == SocketAsyncOperation.Connect) SocketsTelemetry.Log.ConnectFailedAndStop(socketError, null);
if (_completedOperation == SocketAsyncOperation.Accept) SocketsTelemetry.Log.AcceptFailedAndStop(socketError, null);
}
// This will be null if we're doing a static ConnectAsync to a DnsEndPoint with AddressFamily.Unspecified; // This will be null if we're doing a static ConnectAsync to a DnsEndPoint with AddressFamily.Unspecified;
// the attempt socket will be closed anyways, so not updating the state is OK. // the attempt socket will be closed anyways, so not updating the state is OK.
// If we're doing a static ConnectAsync to an IPEndPoint, we need to dispose // If we're doing a static ConnectAsync to an IPEndPoint, we need to dispose
...@@ -640,7 +656,7 @@ internal void FinishOperationAsyncFailure(SocketError socketError, int bytesTran ...@@ -640,7 +656,7 @@ internal void FinishOperationAsyncFailure(SocketError socketError, int bytesTran
if (context == null) if (context == null)
{ {
OnCompleted(this); OnCompletedInternal();
} }
else else
{ {
...@@ -656,7 +672,7 @@ internal void FinishConnectByNameAsyncFailure(Exception exception, int bytesTran ...@@ -656,7 +672,7 @@ internal void FinishConnectByNameAsyncFailure(Exception exception, int bytesTran
if (context == null) if (context == null)
{ {
OnCompleted(this); OnCompletedInternal();
} }
else else
{ {
...@@ -677,7 +693,7 @@ internal void FinishWrapperConnectSuccess(Socket? connectSocket, int bytesTransf ...@@ -677,7 +693,7 @@ internal void FinishWrapperConnectSuccess(Socket? connectSocket, int bytesTransf
Complete(); Complete();
if (context == null) if (context == null)
{ {
OnCompleted(this); OnCompletedInternal();
} }
else else
{ {
...@@ -715,13 +731,9 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags ...@@ -715,13 +731,9 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags
} }
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
} }
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptStop();
} }
else else
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.AcceptFailedAndStop(socketError, null);
SetResults(socketError, bytesTransferred, flags); SetResults(socketError, bytesTransferred, flags);
_acceptSocket = null; _acceptSocket = null;
_currentSocket.UpdateStatusAfterSocketError(socketError); _currentSocket.UpdateStatusAfterSocketError(socketError);
...@@ -741,16 +753,12 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags ...@@ -741,16 +753,12 @@ internal void FinishOperationSyncSuccess(int bytesTransferred, SocketFlags flags
catch (ObjectDisposedException) { } catch (ObjectDisposedException) { }
} }
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectStop();
// Mark socket connected. // Mark socket connected.
_currentSocket!.SetToConnected(); _currentSocket!.SetToConnected();
_connectSocket = _currentSocket; _connectSocket = _currentSocket;
} }
else else
{ {
if (SocketsTelemetry.Log.IsEnabled()) SocketsTelemetry.Log.ConnectFailedAndStop(socketError, null);
SetResults(socketError, bytesTransferred, flags); SetResults(socketError, bytesTransferred, flags);
_currentSocket!.UpdateStatusAfterSocketError(socketError); _currentSocket!.UpdateStatusAfterSocketError(socketError);
} }
...@@ -814,7 +822,7 @@ internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flag ...@@ -814,7 +822,7 @@ internal void FinishOperationAsyncSuccess(int bytesTransferred, SocketFlags flag
// Raise completion event. // Raise completion event.
if (context == null) if (context == null)
{ {
OnCompleted(this); OnCompletedInternal();
} }
else else
{ {
...@@ -834,6 +842,8 @@ private void FinishOperationSync(SocketError socketError, int bytesTransferred, ...@@ -834,6 +842,8 @@ private void FinishOperationSync(SocketError socketError, int bytesTransferred,
{ {
FinishOperationSyncFailure(socketError, bytesTransferred, flags); FinishOperationSyncFailure(socketError, bytesTransferred, flags);
} }
if (SocketsTelemetry.Log.IsEnabled()) AfterConnectAcceptTelemetry();
} }
private static void LogBytesTransferEvents(SocketType? socketType, SocketAsyncOperation operation, int bytesTransferred) private static void LogBytesTransferEvents(SocketType? socketType, SocketAsyncOperation operation, int bytesTransferred)
......
// Licensed to the .NET Foundation under one or more agreements. // Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license. // The .NET Foundation licenses this file to you under the MIT license.
using System.Diagnostics;
using System.Diagnostics.Tracing; using System.Diagnostics.Tracing;
using System.Threading; using System.Threading;
...@@ -26,17 +27,13 @@ internal sealed class SocketsTelemetry : EventSource ...@@ -26,17 +27,13 @@ internal sealed class SocketsTelemetry : EventSource
private long _datagramsSent; private long _datagramsSent;
[Event(1, Level = EventLevel.Informational)] [Event(1, Level = EventLevel.Informational)]
public void ConnectStart(string? address) private void ConnectStart(string? address)
{ {
Interlocked.Increment(ref _outgoingConnectionsEstablished); WriteEvent(eventId: 1, address);
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
WriteEvent(eventId: 1, address ?? "");
}
} }
[Event(2, Level = EventLevel.Informational)] [Event(2, Level = EventLevel.Informational)]
public void ConnectStop() private void ConnectStop()
{ {
if (IsEnabled(EventLevel.Informational, EventKeywords.All)) if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{ {
...@@ -45,105 +42,108 @@ public void ConnectStop() ...@@ -45,105 +42,108 @@ public void ConnectStop()
} }
[Event(3, Level = EventLevel.Error)] [Event(3, Level = EventLevel.Error)]
public void ConnectFailed(SocketError error, string? exceptionMessage) private void ConnectFailed(SocketError error, string? exceptionMessage)
{ {
if (IsEnabled(EventLevel.Error, EventKeywords.All)) if (IsEnabled(EventLevel.Error, EventKeywords.All))
{ {
WriteEvent(eventId: 3, (int)error, exceptionMessage ?? string.Empty); WriteEvent(eventId: 3, (int)error, exceptionMessage);
} }
} }
[Event(4, Level = EventLevel.Warning)] [Event(4, Level = EventLevel.Informational)]
public void ConnectCanceled() private void AcceptStart(string? address)
{ {
if (IsEnabled(EventLevel.Warning, EventKeywords.All)) WriteEvent(eventId: 4, address);
{
WriteEvent(eventId: 4);
}
} }
[Event(5, Level = EventLevel.Informational)] [Event(5, Level = EventLevel.Informational)]
public void AcceptStart(string? address) private void AcceptStop()
{ {
Interlocked.Increment(ref _incomingConnectionsEstablished);
if (IsEnabled(EventLevel.Informational, EventKeywords.All)) if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{ {
WriteEvent(eventId: 5, address ?? ""); WriteEvent(eventId: 5);
} }
} }
[Event(6, Level = EventLevel.Informational)] [Event(6, Level = EventLevel.Error)]
public void AcceptStop() private void AcceptFailed(SocketError error, string? exceptionMessage)
{
if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
WriteEvent(eventId: 6);
}
}
[Event(7, Level = EventLevel.Error)]
public void AcceptFailed(SocketError error, string? exceptionMessage)
{ {
if (IsEnabled(EventLevel.Error, EventKeywords.All)) if (IsEnabled(EventLevel.Error, EventKeywords.All))
{ {
WriteEvent(eventId: 7, (int)error, exceptionMessage ?? string.Empty); WriteEvent(eventId: 6, (int)error, exceptionMessage);
} }
} }
[NonEvent] [NonEvent]
public void ConnectStart(Internals.SocketAddress address) public void ConnectStart(Internals.SocketAddress address)
{ {
ConnectStart(address.ToString()); if (IsEnabled(EventLevel.Informational, EventKeywords.All))
} {
ConnectStart(address.ToString());
[NonEvent] }
public void ConnectStart(EndPoint address)
{
ConnectStart(address.ToString());
} }
[NonEvent] [NonEvent]
public void ConnectCanceledAndStop() public void AfterConnect(SocketError error, string? exceptionMessage = null)
{ {
ConnectCanceled(); if (error == SocketError.Success)
ConnectStop(); {
} Debug.Assert(exceptionMessage is null);
Interlocked.Increment(ref _outgoingConnectionsEstablished);
}
else
{
ConnectFailed(error, exceptionMessage);
}
[NonEvent]
public void ConnectFailedAndStop(SocketError error, string? exceptionMessage)
{
ConnectFailed(error, exceptionMessage);
ConnectStop(); ConnectStop();
} }
[NonEvent] [NonEvent]
public void AcceptStart(Internals.SocketAddress address) public void AcceptStart(Internals.SocketAddress address)
{ {
AcceptStart(address.ToString()); if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
AcceptStart(address.ToString());
}
} }
[NonEvent] [NonEvent]
public void AcceptStart(EndPoint address) public void AcceptStart(EndPoint address)
{ {
AcceptStart(address.ToString()); if (IsEnabled(EventLevel.Informational, EventKeywords.All))
{
AcceptStart(address.ToString());
}
} }
[NonEvent] [NonEvent]
public void AcceptFailedAndStop(SocketError error, string? exceptionMessage) public void AfterAccept(SocketError error, string? exceptionMessage = null)
{ {
AcceptFailed(error, exceptionMessage); if (error == SocketError.Success)
{
Debug.Assert(exceptionMessage is null);
Interlocked.Increment(ref _incomingConnectionsEstablished);
}
else
{
AcceptFailed(error, exceptionMessage);
}
AcceptStop(); AcceptStop();
} }
[NonEvent] [NonEvent]
public void BytesReceived(int count) public void BytesReceived(int count)
{ {
Debug.Assert(count >= 0);
Interlocked.Add(ref _bytesReceived, count); Interlocked.Add(ref _bytesReceived, count);
} }
[NonEvent] [NonEvent]
public void BytesSent(int count) public void BytesSent(int count)
{ {
Debug.Assert(count >= 0);
Interlocked.Add(ref _bytesSent, count); Interlocked.Add(ref _bytesSent, count);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册