未验证 提交 cf35f615 编写于 作者: H huawei 提交者: GitHub

[Core][Feature] Support snapshot context (#56)

上级 ba66c12b
......@@ -88,7 +88,7 @@ class GrpcProtocol(Protocol):
value=str(tag.val),
) for tag in span.tags],
refs=[SegmentReference(
refType=0,
refType=0 if ref.ref_type == "CrossProcess" else 1,
traceId=ref.trace_id,
parentTraceSegmentId=ref.segment_id,
parentSpanId=ref.span_id,
......
......@@ -20,8 +20,10 @@ import threading
from typing import List
from skywalking import agent, config
from skywalking.trace import ID
from skywalking.trace.carrier import Carrier
from skywalking.trace.segment import Segment
from skywalking.trace.segment import Segment, SegmentRef
from skywalking.trace.snapshot import Snapshot
from skywalking.trace.span import Span, Kind, NoopSpan, EntrySpan, ExitSpan
from skywalking.utils.counter import Counter
......@@ -137,6 +139,28 @@ class SpanContext(object):
self._correlation[key] = value
def capture(self):
if len(self.spans) == 0:
return None
return Snapshot(
segment_id=str(self.segment.segment_id),
span_id=self.active_span().sid,
trace_id=self.segment.related_traces[0],
endpoint=self.spans[0].op,
correlation=self._correlation,
)
def continued(self, snapshot: 'Snapshot'):
if snapshot is None:
return None
if not snapshot.is_from_current(self) and snapshot.is_valid():
ref = SegmentRef.build_ref(snapshot)
span = self.active_span()
span.refs.append(ref)
self.segment.relate(ID(ref.trace_id))
self._correlation.update(snapshot.correlation)
class NoopContext(SpanContext):
def __init__(self):
......@@ -169,6 +193,18 @@ class NoopContext(SpanContext):
def active_span(self):
return self._noop_span
def capture(self):
return Snapshot(
segment_id=None,
span_id=-1,
trace_id=None,
endpoint=None,
correlation=self._correlation,
)
def continued(self, snapshot: 'Snapshot'):
self._correlation.update(snapshot.correlation)
_thread_local = threading.local()
_thread_local.context = None
......
......@@ -18,17 +18,20 @@
import time
from typing import List, TYPE_CHECKING
from skywalking import config
from skywalking.trace import ID
from skywalking.utils.lang import tostring
if TYPE_CHECKING:
from skywalking.trace.carrier import Carrier
from skywalking.trace.span import Span
from skywalking.trace.snapshot import Snapshot
class SegmentRef(object):
def __init__(self, carrier: 'Carrier'):
self.ref_type = 'CrossProcess' # type: str
def __init__(self, carrier: 'Carrier', ref_type: str = 'CrossProcess'):
self.ref_type = ref_type # type: str
self.trace_id = carrier.trace_id # type: str
self.segment_id = carrier.segment_id # type: str
self.span_id = int(carrier.span_id) # type: int
......@@ -49,6 +52,18 @@ class SegmentRef(object):
self.endpoint == other.endpoint and \
self.client_address == other.client_address
@classmethod
def build_ref(cls, snapshot: 'Snapshot'):
from skywalking.trace.carrier import Carrier
carrier = Carrier()
carrier.trace_id = str(snapshot.trace_id)
carrier.segment_id = str(snapshot.segment_id)
carrier.endpoint = snapshot.endpoint
carrier.span_id = snapshot.span_id
carrier.service = config.service_name
carrier.service_instance = config.service_instance
return SegmentRef(carrier, ref_type="CrossThread")
class _NewID(ID):
pass
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from skywalking.trace.context import SpanContext
from skywalking.trace import ID
class Snapshot:
def __init__(
self,
segment_id: str = None,
span_id: int = None,
trace_id: ID = None,
endpoint: str = None,
correlation: dict = None
):
self.trace_id = trace_id # type: ID
self.segment_id = segment_id # type: str
self.span_id = span_id # type: int
self.endpoint = endpoint # type: str
self.correlation = correlation.copy() # type: dict
def is_from_current(self, context: 'SpanContext'):
return self.segment_id is not None and self.segment_id == context.capture().segment_id
def is_valid(self):
return self.segment_id is not None and self.span_id > -1 and self.trace_id is not None
......@@ -17,7 +17,7 @@
segmentItems:
- serviceName: provider
segmentSize: 1
segmentSize: 2
segments:
- segmentId: not null
spans:
......@@ -35,7 +35,36 @@ segmentItems:
value: '200'
refs:
- parentEndpoint: /users
networkAddress: provider:9091
networkAddress: 'provider:9091'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: POST
- key: url
value: http://provider:9091/users
- key: status.code
value: '200'
refs:
- parentEndpoint: /users
networkAddress: 'provider:9091'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
......@@ -49,8 +78,50 @@ segmentItems:
peer: not null
skipAnalysis: false
- serviceName: consumer
segmentSize: 1
segmentSize: 2
segments:
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
isError: false
spanType: Exit
peer: provider:9091
skipAnalysis: false
tags:
- key: http.method
value: POST
- key: url
value: 'http://provider:9091/users'
- key: status.code
value: '200'
- operationName: /test
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Unknown
startTime: gt 0
endTime: gt 0
componentId: 0
isError: false
spanType: Local
peer: ''
skipAnalysis: false
refs:
- parentEndpoint: /users
networkAddress: ''
refType: CrossThread
parentSpanId: 0
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
- segmentId: not null
spans:
- operationName: /users
......@@ -90,4 +161,4 @@ segmentItems:
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
\ No newline at end of file
skipAnalysis: false
......@@ -33,7 +33,23 @@ if __name__ == '__main__':
def application():
from skywalking.trace.context import get_context
get_context().put_correlation("correlation", "correlation")
def post(snap):
with get_context().new_local_span("/test"):
get_context().continued(snap)
requests.post("http://provider:9091/users")
snapshot = get_context().capture()
from threading import Thread
t = Thread(target=post, args=(snapshot,))
t.start()
t.join()
res = requests.post("http://provider:9091/users")
t.join()
return jsonify(res.json())
PORT = 9090
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册