未验证 提交 9bc64ad0 编写于 作者: H Humbertzhang 提交者: GitHub

[Plugin] Add plugin for PyMongo(support version 3.7.0 or above) (#60)

上级 9edca3d6
......@@ -11,6 +11,8 @@ Environment Variable | Description | Default
| `SW_AGENT_DISABLE_PLUGINS` | The name patterns in CSV pattern, plugins whose name matches one of the pattern won't be installed | `''` |
| `SW_MYSQL_TRACE_SQL_PARAMETERS` | Indicates whether to collect the sql parameters or not | `False` |
| `SW_MYSQL_SQL_PARAMETERS_MAX_LENGTH` | The maximum length of the collected parameter, parameters longer than the specified length will be truncated | `512` |
| `SW_PYMONGO_TRACE_PARAMETERS` | Indicates whether to collect the filters of pymongo | `False` |
| `SW_PYMONGO_PARAMETERS_MAX_LENGTH` | The maximum length of the collected filters, filters longer than the specified length will be truncated | `512` |
| `SW_IGNORE_SUFFIX` | If the operation name of the first span is included in this set, this segment should be ignored. | `.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg` |
| `SW_FLASK_COLLECT_HTTP_PARAMS`| This config item controls that whether the Flask plugin should collect the parameters of the request.| `false` |
| `SW_DJANGO_COLLECT_HTTP_PARAMS`| This config item controls that whether the Django plugin should collect the parameters of the request.| `false` |
......
......@@ -12,3 +12,4 @@ Library | Plugin Name
| [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
| [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
| [pika](https://pika.readthedocs.io/en/stable/) | `sw_rabbitmq` |
| [pymongo](https://pymongo.readthedocs.io/en/stable/) | `sw_pymongo` |
......@@ -19,6 +19,7 @@ MarkupSafe==1.1.1
packaging==20.4
pika==1.1.0
protobuf==3.12.4
pymongo==3.11.0
PyMySQL==0.10.0
pyparsing==2.4.7
pytz==2020.1
......
......@@ -30,6 +30,7 @@ class Component(Enum):
Django = 7004
Tornado = 7005
Redis = 7
MongoDB = 9
KafkaProducer = 40
KafkaConsumer = 41
RabbitmqProducer = 52
......
......@@ -29,6 +29,9 @@ disable_plugins = (os.getenv('SW_AGENT_DISABLE_PLUGINS') or '').split(',') # ty
mysql_trace_sql_parameters = True if os.getenv('SW_MYSQL_TRACE_SQL_PARAMETERS') and \
os.getenv('SW_MYSQL_TRACE_SQL_PARAMETERS') == 'True' else False # type: bool
mysql_sql_parameters_max_length = int(os.getenv('SW_MYSQL_SQL_PARAMETERS_MAX_LENGTH') or '512') # type: int
pymongo_trace_parameters = True if os.getenv('SW_PYMONGO_TRACE_PARAMETERS') and \
os.getenv('SW_PYMONGO_TRACE_PARAMETERS') == 'True' else False # type: bool
pymongo_parameters_max_length = int(os.getenv('SW_PYMONGO_PARAMETERS_MAX_LENGTH') or '512') # type: int
ignore_suffix = os.getenv('SW_IGNORE_SUFFIX') or '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,' \
'.mp4,.html,.svg ' # type: str
flask_collect_http_params = True if os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') and \
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import pkg_resources
from packaging import version
from skywalking import Layer, Component, config
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
logger = logging.getLogger(__name__)
def install():
try:
from pymongo.bulk import _Bulk
from pymongo.cursor import Cursor
from pymongo.pool import SocketInfo
# check pymongo version
pymongo_version = pkg_resources.get_distribution("pymongo").version
if version.parse(pymongo_version) < version.parse("3.7.0"):
logger.warning("support pymongo version 3.7.0 or above, current version:" + pymongo_version)
raise Exception
bulk_op_map = {
0: "insert",
1: "update",
2: "delete"
}
# handle insert_many and bulk write
inject_bulk_write(_Bulk, bulk_op_map)
# handle find() & find_one()
inject_cursor(Cursor)
# handle other commands
inject_socket_info(SocketInfo)
except Exception:
logger.warning('failed to install plugin %s', __name__)
def inject_socket_info(SocketInfo):
_command = SocketInfo.command
def _sw_command(this: SocketInfo, dbname, spec, *args, **kwargs):
# pymongo sends `ismaster` command continuously. ignore it.
if spec.get("ismaster") is None:
address = this.sock.getpeername()
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()
operation = list(spec.keys())[0]
sw_op = operation.capitalize() + "Operation"
with context.new_exit_span(op="MongoDB/" + sw_op, peer=peer, carrier=carrier) as span:
try:
result = _command(this, dbname, spec, *args, **kwargs)
span.layer = Layer.Database
span.component = Component.MongoDB
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=dbname))
if config.pymongo_trace_parameters:
# get filters
filters = _get_filter(operation, spec)
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))
except BaseException as e:
span.raised()
raise e
else:
result = _command(this, dbname, spec, *args, **kwargs)
return result
SocketInfo.command = _sw_command
def _get_filter(request_type, spec):
"""
:param request_type: the request param send to MongoDB
:param spec: maybe a bson.SON class or a dict
:return: filter string
"""
from bson import SON
if isinstance(spec, SON):
spec = spec.to_dict()
spec.pop(request_type)
elif isinstance(spec, dict):
spec = dict(spec)
spec.pop(request_type)
return request_type + " " + str(spec)
def inject_bulk_write(_Bulk, bulk_op_map):
_execute = _Bulk.execute
def _sw_execute(this: _Bulk, *args, **kwargs):
address = this.collection.database.client.address
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()
sw_op = "MixedBulkWriteOperation"
with context.new_exit_span(op="MongoDB/"+sw_op, peer=peer, carrier=carrier) as span:
span.layer = Layer.Database
span.component = Component.MongoDB
try:
bulk_result = _execute(this, *args, **kwargs)
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=this.collection.database.name))
if config.pymongo_trace_parameters:
filters = ""
bulk_ops = this.ops
for bulk_op in bulk_ops:
opname = bulk_op_map.get(bulk_op[0])
_filter = opname + " " + str(bulk_op[1])
filters = filters + _filter + " "
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))
except BaseException as e:
span.raised()
raise e
return bulk_result
_Bulk.execute = _sw_execute
def inject_cursor(Cursor):
__send_message = Cursor._Cursor__send_message
def _sw_send_message(this: Cursor, operation):
address = this.collection.database.client.address
peer = "%s:%s" % address
context = get_context()
carrier = Carrier()
op = "FindOperation"
with context.new_exit_span(op="MongoDB/"+op, peer=peer, carrier=carrier) as span:
span.layer = Layer.Database
span.component = Component.MongoDB
try:
# __send_message return nothing
__send_message(this, operation)
span.tag(Tag(key=tags.DbType, val="MongoDB"))
span.tag(Tag(key=tags.DbInstance, val=this.collection.database.name))
if config.pymongo_trace_parameters:
filters = "find " + str(operation.spec)
max_len = config.pymongo_parameters_max_length
filters = filters[0:max_len] + "..." if len(filters) > max_len else filters
span.tag(Tag(key=tags.DbStatement, val=filters))
except BaseException as e:
span.raised()
raise e
return
Cursor._Cursor__send_message = _sw_send_message
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2.1'
services:
collector:
extends:
service: collector
file: ../docker/docker-compose.base.yml
mongo:
image: mongo:4.2
hostname: mongo
ports:
- 27017:27017
healthcheck:
test: echo 'db.runCommand("ping").ok' | mongo mongo:27017/test --quiet
interval: 5s
timeout: 60s
retries: 120
networks:
- beyond
provider:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9091:9091
volumes:
- ./services/provider.py:/app/provider.py
command: ['bash', '-c', 'pip install flask && pip install pymongo && python3 /app/provider.py']
depends_on:
collector:
condition: service_healthy
mongo:
condition: service_healthy
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"]
interval: 5s
timeout: 60s
retries: 120
consumer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9090:9090
volumes:
- ./services/consumer.py:/app/consumer.py
command: ['bash', '-c', 'pip install flask && python3 /app/consumer.py']
depends_on:
collector:
condition: service_healthy
provider:
condition: service_healthy
networks:
beyond:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
segmentItems:
- serviceName: provider
segmentSize: 3
segments:
- segmentId: not null
spans:
- operationName: MongoDB/MixedBulkWriteOperation
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Database
startTime: gt 0
endTime: gt 0
componentId: 9
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- key: db.type
value: MongoDB
- key: db.instance
value: test-database
- key: db.statement
value: not null
- operationName: /insert_many
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://provider:9091/insert_many
- key: status.code
value: '200'
refs:
- parentEndpoint: /insert_many
networkAddress: provider:9091
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
- segmentId: not null
spans:
- operationName: MongoDB/FindOperation
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Database
startTime: gt 0
endTime: gt 0
componentId: 9
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- key: db.type
value: MongoDB
- key: db.instance
value: test-database
- key: db.statement
value: not null
- operationName: /find_one
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://provider:9091/find_one
- key: status.code
value: '200'
refs:
- parentEndpoint: /find_one
networkAddress: provider:9091
refType: CrossProcess
parentSpanId: 2
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
- segmentId: not null
spans:
- operationName: MongoDB/DeleteOperation
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Database
startTime: gt 0
endTime: gt 0
componentId: 9
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- key: db.type
value: MongoDB
- key: db.instance
value: test-database
- key: db.statement
value: not null
- operationName: /delete_one
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://provider:9091/delete_one
- key: status.code
value: '200'
refs:
- parentEndpoint: /delete_one
networkAddress: provider:9091
refType: CrossProcess
parentSpanId: 3
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
- serviceName: consumer
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: /insert_many
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
spanType: Exit
peer: provider:9091
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://provider:9091/insert_many
- key: status.code
value: '200'
- operationName: /find_one
operationId: 0
parentSpanId: 0
spanId: 2
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
spanType: Exit
peer: provider:9091
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://provider:9091/find_one
- key: status.code
value: '200'
- operationName: /delete_one
operationId: 0
parentSpanId: 0
spanId: 3
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
spanType: Exit
peer: provider:9091
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://provider:9091/delete_one
- key: status.code
value: '200'
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: http://0.0.0.0:9090/users
- key: status.code
value: '200'
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import requests
from skywalking import agent, config
if __name__ == '__main__':
config.service_name = 'consumer'
config.logging_level = 'DEBUG'
agent.start()
from flask import Flask, jsonify
app = Flask(__name__)
@app.route("/users", methods=["POST", "GET"])
def application():
requests.get("http://provider:9091/insert_many")
requests.get("http://provider:9091/find_one")
res = requests.get("http://provider:9091/delete_one")
return jsonify(res.json())
PORT = 9090
app.run(host='0.0.0.0', port=PORT, debug=True)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from skywalking import agent, config
from flask import Flask, jsonify
from pymongo import MongoClient
config.service_name = "provider"
config.logging_level = "DEBUG"
config.pymongo_trace_parameters = True
agent.start()
client = MongoClient('mongodb://mongo:27017/')
db = client['test-database']
collection = db['test-collection']
app = Flask(__name__)
@app.route("/insert_many", methods=["GET"])
def test_insert_many():
time.sleep(0.5)
new_posts = [{"song": "Despacito"},
{"artist": "Luis Fonsi"}]
result = collection.insert_many(new_posts)
return jsonify({"ok": result.acknowledged})
@app.route("/find_one", methods=["GET"])
def test_find_one():
time.sleep(0.5)
result = collection.find_one({"song": "Despacito"})
# have to get the result and use it. if not lint will report error
print(result)
return jsonify({"song": "Despacito"})
@app.route("/delete_one", methods=["GET"])
def test_delete_one():
time.sleep(0.5)
result = collection.delete_one({"song": "Despacito"})
return jsonify({"ok": result.acknowledged})
if __name__ == '__main__':
PORT = 9091
app.run(host="0.0.0.0", port=PORT, debug=True)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import time
import unittest
from os.path import dirname
from testcontainers.compose import DockerCompose
from tests.plugin import BasePluginTest
class TestPlugin(BasePluginTest):
@classmethod
def setUpClass(cls):
cls.compose = DockerCompose(filepath=dirname(inspect.getfile(cls)))
cls.compose.start()
cls.compose.wait_for(cls.url(('consumer', '9090'), 'users'))
def test_plugin(self):
time.sleep(10)
self.validate()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册