未验证 提交 b2271412 编写于 作者: L Lemon 提交者: GitHub

Agent performance optimization (#158)

* Use atomic operations to determine if the size of the queue exceeds the limit.

* Optimizing reflection performance
上级 a9d0a3f0
......@@ -3,12 +3,12 @@
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Debug"
"Default": "Warning"
}
},
"Console": {
"LogLevel": {
"Default": "Debug"
"Default": "Warning"
}
}
}
......
......@@ -20,8 +20,8 @@
"BatchSize": 3000,
"gRPC": {
"Servers": "localhost:11800",
"Timeout": 10000,
"ConnectTimeout": 10000,
"Timeout": 100000,
"ConnectTimeout": 100000,
"ReportTimeout": 600000
}
}
......
......@@ -16,6 +16,8 @@
*
*/
using AspectCore.Extensions.Reflection;
namespace SkyApm.Diagnostics
{
public class PropertyAttribute : ParameterBinder
......@@ -31,7 +33,7 @@ namespace SkyApm.Diagnostics
var property = value.GetType().GetProperty(Name);
return property?.GetValue(value);
return property?.GetReflector()?.GetValue(value);
}
}
}
\ No newline at end of file
......@@ -19,21 +19,22 @@
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using AspectCore.Extensions.Reflection;
namespace SkyApm.Diagnostics
{
internal class TracingDiagnosticMethod
{
private readonly MethodInfo _method;
private readonly ITracingDiagnosticProcessor _tracingDiagnosticProcessor;
private readonly string _diagnosticName;
private readonly IParameterResolver[] _parameterResolvers;
private readonly MethodReflector _reflector;
public TracingDiagnosticMethod(ITracingDiagnosticProcessor tracingDiagnosticProcessor, MethodInfo method,
string diagnosticName)
{
_tracingDiagnosticProcessor = tracingDiagnosticProcessor;
_method = method;
_reflector = method.GetReflector();
_diagnosticName = diagnosticName;
_parameterResolvers = GetParameterResolvers(method).ToArray();
}
......@@ -51,7 +52,7 @@ namespace SkyApm.Diagnostics
args[i] = _parameterResolvers[i].Resolve(value);
}
_method.Invoke(_tracingDiagnosticProcessor, args);
_reflector.Invoke(_tracingDiagnosticProcessor, args);
}
private static IEnumerable<IParameterResolver> GetParameterResolvers(MethodInfo methodInfo)
......
......@@ -12,6 +12,7 @@
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AspectCore.Extensions.Reflection" Version="1.2.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.1" />
<PackageReference Include="System.Memory" Version="4.5.2" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
......
......@@ -35,6 +35,7 @@ namespace SkyApm.Transport
private readonly ConcurrentQueue<SegmentRequest> _segmentQueue;
private readonly IRuntimeEnvironment _runtimeEnvironment;
private readonly CancellationTokenSource _cancellation;
private int _offset;
public AsyncQueueSegmentDispatcher(IConfigAccessor configAccessor,
ISegmentReporter segmentReporter, IRuntimeEnvironment runtimeEnvironment,
......@@ -55,7 +56,7 @@ namespace SkyApm.Transport
return false;
// todo performance optimization for ConcurrentQueue
if (_config.QueueSize < _segmentQueue.Count || _cancellation.IsCancellationRequested)
if (_config.QueueSize < _offset || _cancellation.IsCancellationRequested)
return false;
var segment = _segmentContextMapper.Map(segmentContext);
......@@ -65,6 +66,8 @@ namespace SkyApm.Transport
_segmentQueue.Enqueue(segment);
Interlocked.Increment(ref _offset);
_logger.Debug($"Dispatch trace segment. [SegmentId]={segmentContext.SegmentId}.");
return true;
}
......@@ -80,11 +83,15 @@ namespace SkyApm.Transport
while (index++ < limit && _segmentQueue.TryDequeue(out var request))
{
segments.Add(request);
Interlocked.Decrement(ref _offset);
}
// send async
if (segments.Count > 0)
_segmentReporter.ReportAsync(segments, token);
Interlocked.Exchange(ref _offset, _segmentQueue.Count);
return Task.CompletedTask;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册