Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
skywalking-python
提交
88b4567b
S
skywalking-python
项目概览
apache
/
skywalking-python
通知
60
Star
3
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
S
skywalking-python
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
88b4567b
编写于
5月 08, 2020
作者:
K
kezhenxu94
提交者:
GitHub
5月 08, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Feature: inter-process propagation (#4)
上级
19715ff1
变更
12
显示空白变更内容
内联
并排
Showing
12 changed file
with
270 addition
and
46 deletion
+270
-46
skywalking/agent/protocol/grpc/__init__.py
skywalking/agent/protocol/grpc/__init__.py
+12
-2
skywalking/plugins/sw_http/__init__.py
skywalking/plugins/sw_http/__init__.py
+6
-1
skywalking/plugins/sw_request/__init__.py
skywalking/plugins/sw_request/__init__.py
+6
-1
skywalking/trace/__init__.py
skywalking/trace/__init__.py
+3
-7
skywalking/trace/carrier/__init__.py
skywalking/trace/carrier/__init__.py
+94
-0
skywalking/trace/context/__init__.py
skywalking/trace/context/__init__.py
+20
-22
skywalking/trace/segment/__init__.py
skywalking/trace/segment/__init__.py
+27
-7
skywalking/trace/span/__init__.py
skywalking/trace/span/__init__.py
+44
-2
skywalking/utils/lang.py
skywalking/utils/lang.py
+10
-0
tests/consumer.py
tests/consumer.py
+43
-0
tests/main.py
tests/main.py
+1
-1
tests/provider.py
tests/provider.py
+4
-3
未找到文件。
skywalking/agent/protocol/grpc/__init__.py
浏览文件 @
88b4567b
...
...
@@ -21,7 +21,7 @@ from queue import Queue
import
grpc
from
common.Common_pb2
import
KeyStringValuePair
from
language_agent.Tracing_pb2
import
SegmentObject
,
SpanObject
,
Log
from
language_agent.Tracing_pb2
import
SegmentObject
,
SpanObject
,
Log
,
SegmentReference
from
skywalking
import
config
from
skywalking.agent
import
Protocol
from
skywalking.agent.protocol.grpc
import
interceptors
...
...
@@ -63,7 +63,7 @@ class GrpcProtocol(Protocol):
logger
.
debug
(
'reporting segment %s'
,
segment
)
s
=
SegmentObject
(
traceId
=
str
(
segment
.
trace_id
),
traceId
=
str
(
segment
.
related_traces
[
0
]
),
traceSegmentId
=
str
(
segment
.
segment_id
),
service
=
config
.
service_name
,
serviceInstance
=
config
.
service_instance
,
...
...
@@ -86,6 +86,16 @@ class GrpcProtocol(Protocol):
key
=
str
(
tag
.
key
),
value
=
str
(
tag
.
val
),
)
for
tag
in
span
.
tags
],
refs
=
[
SegmentReference
(
refType
=
0
,
traceId
=
ref
.
trace_id
,
parentTraceSegmentId
=
ref
.
segment_id
,
parentSpanId
=
ref
.
span_id
,
parentService
=
ref
.
service
,
parentServiceInstance
=
ref
.
service_instance
,
parentEndpoint
=
ref
.
endpoint
,
networkAddressUsedAtPeer
=
ref
.
client_address
,
)
for
ref
in
span
.
refs
if
ref
.
trace_id
],
)
for
span
in
segment
.
spans
],
)
...
...
skywalking/plugins/sw_http/__init__.py
浏览文件 @
88b4567b
...
...
@@ -20,6 +20,7 @@ import traceback
from
skywalking
import
Layer
,
Component
from
skywalking.trace
import
tags
from
skywalking.trace.carrier
import
Carrier
from
skywalking.trace.context
import
get_context
from
skywalking.trace.tags
import
Tag
...
...
@@ -45,11 +46,15 @@ def install():
def
_sw_do_method
():
context
=
get_context
()
with
context
.
new_entry_span
(
op
=
this
.
path
)
as
span
:
carrier
=
Carrier
()
for
item
in
carrier
:
item
.
val
=
this
.
headers
[
item
.
key
.
capitalize
()]
with
context
.
new_entry_span
(
op
=
this
.
path
,
carrier
=
carrier
)
as
span
:
span
.
layer
=
Layer
.
Http
span
.
component
=
Component
.
General
span
.
peer
=
'%s:%s'
%
this
.
client_address
span
.
tag
(
Tag
(
key
=
tags
.
HttpMethod
,
val
=
method
))
_do_method
()
setattr
(
this
,
'do_'
+
method
,
_sw_do_method
)
...
...
skywalking/plugins/sw_request/__init__.py
浏览文件 @
88b4567b
...
...
@@ -19,6 +19,7 @@ import traceback
from
skywalking
import
Layer
,
Component
from
skywalking.trace
import
tags
from
skywalking.trace.carrier
import
Carrier
from
skywalking.trace.context
import
get_context
from
skywalking.trace.tags
import
Tag
...
...
@@ -35,9 +36,13 @@ def install():
def
_sw_open
(
this
:
OpenerDirector
,
fullurl
,
data
,
timeout
):
context
=
get_context
()
with
context
.
new_exit_span
(
op
=
fullurl
.
selector
or
'/'
,
peer
=
fullurl
.
host
)
as
span
:
carrier
=
Carrier
()
with
context
.
new_exit_span
(
op
=
fullurl
.
selector
or
'/'
,
peer
=
fullurl
.
host
,
carrier
=
carrier
)
as
span
:
span
.
layer
=
Layer
.
Http
span
.
component
=
Component
.
General
[
fullurl
.
add_header
(
item
.
key
,
item
.
val
)
for
item
in
carrier
]
try
:
res
=
_open
(
this
,
fullurl
,
data
,
timeout
)
span
.
tag
(
Tag
(
key
=
tags
.
HttpMethod
,
val
=
fullurl
.
get_method
()))
...
...
skywalking/trace/__init__.py
浏览文件 @
88b4567b
...
...
@@ -15,8 +15,6 @@
# limitations under the License.
#
import
threading
import
time
import
uuid
from
skywalking.utils.counter
import
AtomicCounter
...
...
@@ -25,10 +23,8 @@ _id = AtomicCounter()
class
ID
(
object
):
def
__init__
(
self
):
self
.
part1
=
str
(
uuid
.
uuid1
())
self
.
part2
=
threading
.
get_ident
()
# FIXME: thread ids are reused
self
.
part3
=
int
(
time
.
time
()
*
1000
)
*
10000
+
_id
.
next
()
def
__init__
(
self
,
raw_id
:
str
=
None
):
self
.
value
=
raw_id
or
str
(
uuid
.
uuid1
()).
replace
(
'-'
,
''
)
def
__str__
(
self
):
return
'%s.%s.%s'
%
(
self
.
part1
,
self
.
part2
,
self
.
part3
)
return
self
.
value
skywalking/trace/carrier/__init__.py
0 → 100644
浏览文件 @
88b4567b
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from
typing
import
List
from
skywalking.utils.lang
import
b64encode
,
b64decode
class
CarrierItem
(
object
):
def
__init__
(
self
,
key
:
str
=
''
,
val
:
str
=
''
):
self
.
key
=
key
# type: str
self
.
val
=
val
# type: str
@
property
def
key
(
self
):
return
self
.
__key
@
key
.
setter
def
key
(
self
,
key
:
str
):
self
.
__key
=
key
@
property
def
val
(
self
):
return
self
.
__val
@
val
.
setter
def
val
(
self
,
val
:
str
):
self
.
__val
=
val
class
Carrier
(
CarrierItem
):
def
__init__
(
self
):
super
(
Carrier
,
self
).
__init__
(
key
=
'sw8'
)
self
.
trace_id
=
''
# type: str
self
.
segment_id
=
''
# type: str
self
.
span_id
=
-
1
# type: int
self
.
service
=
''
# type: str
self
.
service_instance
=
''
# type: str
self
.
endpoint
=
''
# type: str
self
.
client_address
=
''
# type: str
self
.
items
=
[
self
]
# type: List[CarrierItem]
self
.
__iter_index
=
0
# type: int
@
property
def
val
(
self
)
->
str
:
return
'-'
.
join
([
'1'
,
b64encode
(
self
.
trace_id
),
b64encode
(
self
.
segment_id
),
str
(
self
.
span_id
),
b64encode
(
self
.
service
),
b64encode
(
self
.
service_instance
),
b64encode
(
self
.
endpoint
),
b64encode
(
self
.
client_address
),
])
@
val
.
setter
def
val
(
self
,
val
:
str
):
self
.
__val
=
val
if
not
val
:
return
parts
=
val
.
split
(
'-'
)
self
.
trace_id
=
b64decode
(
parts
[
1
])
self
.
segment_id
=
b64decode
(
parts
[
2
])
self
.
span_id
=
int
(
parts
[
3
])
self
.
service
=
b64decode
(
parts
[
4
])
self
.
service_instance
=
b64decode
(
parts
[
5
])
self
.
endpoint
=
b64decode
(
parts
[
6
])
self
.
client_address
=
b64decode
(
parts
[
7
])
def
__iter__
(
self
):
self
.
__iter_index
=
0
return
self
def
__next__
(
self
):
if
self
.
__iter_index
>=
len
(
self
.
items
):
raise
StopIteration
n
=
self
.
items
[
self
.
__iter_index
]
self
.
__iter_index
+=
1
return
n
skywalking/trace/context/__init__.py
浏览文件 @
88b4567b
...
...
@@ -20,6 +20,7 @@ import threading
from
typing
import
List
from
skywalking
import
agent
from
skywalking.trace.carrier
import
Carrier
from
skywalking.trace.segment
import
Segment
from
skywalking.trace.span
import
Span
,
Kind
,
NoopSpan
,
EntrySpan
,
ExitSpan
from
skywalking.utils.counter
import
Counter
...
...
@@ -36,48 +37,45 @@ class SpanContext(object):
def
new_local_span
(
self
,
op
:
str
)
->
Span
:
parent
=
self
.
spans
[
-
1
]
if
self
.
spans
else
None
# type: Span
pid
=
parent
.
sid
if
parent
else
-
1
# type: int
return
Span
(
context
=
self
,
sid
=
self
.
_sid
.
next
(),
pid
=
p
id
,
pid
=
p
arent
.
sid
if
parent
else
-
1
,
op
=
op
,
kind
=
Kind
.
Local
,
)
def
new_entry_span
(
self
,
op
:
str
)
->
Span
:
def
new_entry_span
(
self
,
op
:
str
,
carrier
:
'Carrier'
=
None
)
->
Span
:
parent
=
self
.
spans
[
-
1
]
if
self
.
spans
else
None
# type: Span
if
parent
is
not
None
and
parent
.
kind
.
is_entry
:
parent
.
op
=
op
return
parent
pid
=
parent
.
sid
if
parent
else
-
1
# type: int
return
EntrySpan
(
span
=
parent
if
parent
is
not
None
and
parent
.
kind
.
is_entry
else
EntrySpan
(
context
=
self
,
sid
=
self
.
_sid
.
next
(),
pid
=
pid
,
op
=
op
,
pid
=
parent
.
sid
if
parent
else
-
1
,
)
span
.
op
=
op
def
new_exit_span
(
self
,
op
:
str
,
peer
:
str
)
->
Span
:
parent
=
self
.
spans
[
-
1
]
if
self
.
spans
else
None
# type: Span
if
carrier
is
not
None
:
span
.
extract
(
carrier
=
carrier
)
if
parent
is
not
None
and
parent
.
kind
.
is_exit
:
return
parent
return
span
pid
=
parent
.
sid
if
parent
else
-
1
# type: int
def
new_exit_span
(
self
,
op
:
str
,
peer
:
str
,
carrier
:
'Carrier'
=
None
)
->
Span
:
parent
=
self
.
spans
[
-
1
]
if
self
.
spans
else
None
# type: Span
return
ExitSpan
(
span
=
parent
if
parent
is
not
None
and
parent
.
kind
.
is_exit
else
ExitSpan
(
context
=
self
,
sid
=
self
.
_sid
.
next
(),
pid
=
p
id
,
pid
=
p
arent
.
sid
if
parent
else
-
1
,
op
=
op
,
peer
=
peer
,
)
if
carrier
is
not
None
:
span
.
inject
(
carrier
=
carrier
)
return
span
def
start
(
self
,
span
:
Span
):
if
span
not
in
self
.
spans
:
self
.
spans
.
append
(
span
)
...
...
@@ -104,11 +102,11 @@ class NoopContext(SpanContext):
self
.
_depth
+=
1
return
self
.
_noop_span
def
new_entry_span
(
self
,
op
:
str
)
->
Span
:
def
new_entry_span
(
self
,
op
:
str
,
carrier
:
'Carrier'
=
None
)
->
Span
:
self
.
_depth
+=
1
return
self
.
_noop_span
def
new_exit_span
(
self
,
op
:
str
,
peer
:
str
)
->
Span
:
def
new_exit_span
(
self
,
op
:
str
,
peer
:
str
,
carrier
:
'Carrier'
=
None
)
->
Span
:
self
.
_depth
+=
1
return
self
.
_noop_span
...
...
skywalking/trace/segment/__init__.py
浏览文件 @
88b4567b
...
...
@@ -16,24 +16,44 @@
#
import
time
from
typing
import
List
from
typing
import
List
,
TYPE_CHECKING
from
skywalking.trace
import
ID
from
skywalking.trace.span
import
Span
from
skywalking.utils.lang
import
tostring
if
TYPE_CHECKING
:
from
skywalking.trace.carrier
import
Carrier
from
skywalking.trace.span
import
Span
class
SegmentRef
(
object
):
def
__init__
(
self
,
carrier
:
'Carrier'
):
self
.
ref_type
=
'CrossProcess'
# type: str
self
.
trace_id
=
carrier
.
trace_id
# type: str
self
.
segment_id
=
carrier
.
segment_id
# type: str
self
.
span_id
=
carrier
.
span_id
# type: int
self
.
service
=
carrier
.
service
# type: str
self
.
service_instance
=
carrier
.
service_instance
# type: str
self
.
endpoint
=
carrier
.
endpoint
# type: str
self
.
client_address
=
carrier
.
client_address
# type: str
class
_NewID
(
ID
):
pass
@
tostring
class
Segment
(
object
):
def
__init__
(
self
):
self
.
trace_id
=
ID
()
# type: ID
self
.
segment_id
=
ID
()
# type: ID
self
.
spans
=
[]
# type: List[Span]
self
.
timestamp
=
int
(
time
.
time
()
*
1000
)
# type: int
self
.
related_traces
=
[
_NewID
()]
# type: List[ID]
def
archive
(
self
,
span
:
Span
):
def
archive
(
self
,
span
:
'Span'
):
self
.
spans
.
append
(
span
)
class
SegmentRef
(
object
):
pass
def
relate
(
self
,
trace_id
:
ID
):
if
isinstance
(
self
.
related_traces
[
0
],
_NewID
):
del
self
.
related_traces
[
-
1
]
self
.
related_traces
.
append
(
trace_id
)
skywalking/trace/span/__init__.py
浏览文件 @
88b4567b
...
...
@@ -22,12 +22,14 @@ from copy import deepcopy
from
typing
import
List
from
typing
import
TYPE_CHECKING
from
skywalking
import
Kind
,
Layer
,
Log
,
Component
,
LogItem
from
skywalking
import
Kind
,
Layer
,
Log
,
Component
,
LogItem
,
config
from
skywalking.trace
import
ID
from
skywalking.trace.carrier
import
Carrier
from
skywalking.trace.segment
import
SegmentRef
,
Segment
from
skywalking.trace.tags
import
Tag
from
skywalking.utils.lang
import
tostring
if
TYPE_CHECKING
:
from
skywalking.trace.segment
import
SegmentRef
,
Segment
from
skywalking.trace.context
import
SpanContext
...
...
@@ -91,6 +93,20 @@ class Span(ABC):
return
self
def
inject
(
self
,
carrier
:
'Carrier'
)
->
'Span'
:
raise
RuntimeWarning
(
'can only inject context carrier into ExitSpan, this may be a potential bug in the agent, '
'please report this in https://github.com/apache/skywalking/issues if you encounter this. '
)
def
extract
(
self
,
carrier
:
'Carrier'
)
->
'Span'
:
if
carrier
is
None
:
return
self
self
.
context
.
segment
.
relate
(
ID
(
carrier
.
trace_id
))
return
self
def
__enter__
(
self
):
self
.
start
()
return
self
...
...
@@ -146,6 +162,19 @@ class EntrySpan(StackedSpan):
self
.
logs
=
[]
self
.
tags
=
[]
def
extract
(
self
,
carrier
:
'Carrier'
)
->
'Span'
:
Span
.
extract
(
self
,
carrier
)
if
carrier
is
None
:
return
self
ref
=
SegmentRef
(
carrier
=
carrier
)
if
ref
not
in
self
.
refs
:
self
.
refs
.
append
(
ref
)
return
self
@
tostring
class
ExitSpan
(
StackedSpan
):
...
...
@@ -171,6 +200,16 @@ class ExitSpan(StackedSpan):
layer
,
)
def
inject
(
self
,
carrier
:
'Carrier'
)
->
'Span'
:
carrier
.
trace_id
=
str
(
self
.
context
.
segment
.
related_traces
[
0
])
carrier
.
segment_id
=
str
(
self
.
context
.
segment
.
segment_id
)
carrier
.
span_id
=
self
.
sid
carrier
.
service
=
config
.
service_name
carrier
.
service_instance
=
config
.
service_instance
carrier
.
endpoint
=
self
.
context
.
spans
[
0
].
op
carrier
.
client_address
=
self
.
peer
return
self
def
start
(
self
):
self
.
_depth
+=
1
StackedSpan
.
start
(
self
)
...
...
@@ -180,3 +219,6 @@ class ExitSpan(StackedSpan):
class
NoopSpan
(
Span
):
def
__init__
(
self
,
context
:
'SpanContext'
=
None
,
kind
:
'Kind'
=
None
):
Span
.
__init__
(
self
,
context
=
context
,
kind
=
kind
)
def
inject
(
self
,
carrier
:
'Carrier'
)
->
'Span'
:
return
self
skywalking/utils/lang.py
浏览文件 @
88b4567b
...
...
@@ -15,6 +15,8 @@
# limitations under the License.
#
import
base64
def
tostring
(
cls
):
def
__str__
(
self
):
...
...
@@ -27,3 +29,11 @@ def tostring(cls):
cls
.
__str__
=
__str__
return
cls
def
b64encode
(
s
:
str
=
''
)
->
str
:
return
base64
.
b64encode
(
s
.
encode
(
'utf8'
)).
decode
(
'utf8'
)
def
b64decode
(
s
:
str
=
''
)
->
str
:
return
base64
.
b64decode
(
s
).
decode
(
'utf8'
)
tests/consumer.py
0 → 100644
浏览文件 @
88b4567b
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from
urllib
import
request
from
skywalking
import
agent
,
config
if
__name__
==
'__main__'
:
config
.
service_name
=
'consumer'
agent
.
start
()
import
socketserver
from
http.server
import
BaseHTTPRequestHandler
class
SimpleHTTPRequestHandler
(
BaseHTTPRequestHandler
):
def
do_GET
(
self
):
self
.
send_response
(
200
)
self
.
end_headers
()
req
=
request
.
Request
(
'http://localhost:9091/whatever'
)
with
request
.
urlopen
(
req
)
as
res
:
self
.
wfile
.
write
(
res
.
read
(
300
))
PORT
=
9090
Handler
=
SimpleHTTPRequestHandler
with
socketserver
.
TCPServer
((
""
,
PORT
),
Handler
)
as
httpd
:
print
(
"serving at port"
,
PORT
)
httpd
.
serve_forever
()
tests/main.py
浏览文件 @
88b4567b
...
...
@@ -17,7 +17,7 @@
from
time
import
sleep
from
skywalking
import
agent
,
config
,
Component
,
Layer
from
skywalking
import
agent
,
Component
,
Layer
from
skywalking.decorators
import
trace
from
skywalking.trace.context
import
SpanContext
,
get_context
...
...
tests/
test_http_plugin
.py
→
tests/
provider
.py
浏览文件 @
88b4567b
...
...
@@ -17,9 +17,10 @@
from
urllib
import
request
from
skywalking
import
agent
from
skywalking
import
agent
,
config
if
__name__
==
'__main__'
:
config
.
service_name
=
'provider'
agent
.
start
()
import
socketserver
...
...
@@ -32,9 +33,9 @@ if __name__ == '__main__':
self
.
end_headers
()
req
=
request
.
Request
(
'https://github.com/kezhenxu94'
)
with
request
.
urlopen
(
req
)
as
res
:
self
.
wfile
.
write
(
res
.
read
(
300
))
self
.
wfile
.
write
(
res
.
read
(
1024
))
PORT
=
909
0
PORT
=
909
1
Handler
=
SimpleHTTPRequestHandler
with
socketserver
.
TCPServer
((
""
,
PORT
),
Handler
)
as
httpd
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录