未验证 提交 44e8c443 编写于 作者: H huawei 提交者: GitHub

Add Rabbitmq Plugin (#53)

上级 588b3f77
......@@ -81,6 +81,7 @@ Library | Plugin Name
| [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` |
| [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
| [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
| [pika](https://pika.readthedocs.io/en/stable/) | `sw_rabbitmq` |
## API
......
......@@ -51,6 +51,7 @@ setup(
"redis",
"kafka-python",
"tornado",
"pika",
],
},
classifiers=[
......
......@@ -32,6 +32,8 @@ class Component(Enum):
Redis = 7
KafkaProducer = 40
KafkaConsumer = 41
RabbitmqProducer = 52
RabbitmqConsumer = 53
class Layer(Enum):
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
logger = logging.getLogger(__name__)
def install():
# noinspection PyBroadException
try:
from pika.channel import Channel
_basic_publish = Channel.basic_publish
__on_deliver = Channel._on_deliver
Channel.basic_publish = _sw_basic_publish_func(_basic_publish)
Channel._on_deliver = _sw__on_deliver_func(__on_deliver)
except Exception:
logger.warning('failed to install plugin %s', __name__)
def _sw_basic_publish_func(_basic_publish):
def _sw_basic_publish(this, exchange,
routing_key,
body,
properties=None,
mandatory=False):
peer = '%s:%s' % (this.connection.params.host, this.connection.params.port)
context = get_context()
carrier = Carrier()
import pika
with context.new_exit_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key + "/Producer" or "/",
peer=peer, carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqProducer
properties = pika.BasicProperties() if properties is None else properties
if properties.headers is None:
headers = {}
for item in carrier:
headers[item.key] = item.val
properties.headers = headers
else:
for item in carrier:
properties.headers[item.key] = item.val
try:
res = _basic_publish(this, exchange,
routing_key,
body,
properties=properties,
mandatory=mandatory)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))
except BaseException as e:
span.raised()
raise e
return res
return _sw_basic_publish
def _sw__on_deliver_func(__on_deliver):
def _sw__on_deliver(this, method_frame, header_frame, body):
peer = '%s:%s' % (this.connection.params.host, this.connection.params.port)
context = get_context()
exchange = method_frame.method.exchange
routing_key = method_frame.method.routing_key
carrier = Carrier()
for item in carrier:
if item.key in header_frame.properties.headers:
item.val = header_frame.properties.headers[item.key]
with context.new_entry_span(op="RabbitMQ/Topic/" + exchange + "/Queue/" + routing_key
+ "/Consumer" or "", carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.RabbitmqConsumer
try:
__on_deliver(this, method_frame, header_frame, body)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=exchange))
span.tag(Tag(key=tags.MqQueue, val=routing_key))
except BaseException as e:
span.raised()
raise e
return _sw__on_deliver
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2.1'
services:
collector:
extends:
service: collector
file: ../docker/docker-compose.base.yml
rabbitmq-server:
image: rabbitmq:latest
hostname: rabbitmq-server
ports:
- 5672:5672
- 15672:15672
environment:
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_VHOST=/
networks:
- beyond
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/5672"]
interval: 5s
timeout: 60s
retries: 120
producer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9090:9090
volumes:
- ./services/producer.py:/app/producer.py
command: ['bash', '-c', 'pip install flask && pip install pika && python3 /app/producer.py']
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
collector:
condition: service_healthy
rabbitmq-server:
condition: service_healthy
consumer:
condition: service_healthy
consumer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9091:9091
volumes:
- ./services/consumer.py:/app/consumer.py
command: ['bash', '-c', 'pip install flask && pip install pika && python3 /app/consumer.py']
healthcheck:
test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
collector:
condition: service_healthy
rabbitmq-server:
condition: service_healthy
networks:
beyond:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
segmentItems:
- serviceName: producer
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: RabbitMQ/Topic/test/Queue/test/Producer
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: MQ
tags:
- key: mq.broker
value: 'rabbitmq-server:5672'
- key: mq.topic
value: test
- key: mq.queue
value: test
startTime: gt 0
endTime: gt 0
componentId: 52
spanType: Exit
peer: rabbitmq-server:5672
skipAnalysis: false
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: GET
- key: url
value: http://0.0.0.0:9090/users
- key: status.code
value: '200'
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
- serviceName: consumer
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: RabbitMQ/Topic/test/Queue/test/Consumer
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
tags:
- key: mq.broker
value: 'rabbitmq-server:5672'
- key: mq.topic
value: test
- key: mq.queue
value: test
refs:
- parentEndpoint: RabbitMQ/Topic/test/Queue/test/Producer
networkAddress: 'rabbitmq-server:5672'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: producer
traceId: not null
startTime: gt 0
endTime: gt 0
componentId: 53
spanType: Entry
peer: ''
skipAnalysis: false
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from skywalking import config, agent
if __name__ == '__main__':
config.service_name = 'consumer'
config.logging_level = 'INFO'
agent.start()
import pika
parameters = (pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F"))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare("test")
channel.exchange_declare("test")
channel.queue_bind(exchange='test', queue="test", routing_key='test')
for method_frame, properties, body in channel.consume('test'):
# Display the message parts and acknowledge the message
print(method_frame, properties, body)
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop after 10 messages
if method_frame.delivery_tag == 10:
break
try:
# Loop so we can communicate with RabbitMQ
connection.ioloop.start()
except KeyboardInterrupt:
# Gracefully close the connection
connection.close()
# Loop until we're fully closed, will stop on its own
connection.ioloop.start()
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from skywalking import agent, config
if __name__ == '__main__':
config.service_name = 'producer'
config.logging_level = 'INFO'
agent.start()
from flask import Flask, jsonify
app = Flask(__name__)
import pika
parameters = (pika.URLParameters("amqp://admin:admin@rabbitmq-server:5672/%2F"))
@app.route("/users", methods=["POST", "GET"])
def application():
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare("test")
channel.exchange_declare("test")
channel.queue_bind(exchange='test', queue="test", routing_key='test')
channel.basic_publish(exchange='test', routing_key='test', properties=pika.BasicProperties(
headers={'key': 'value'}
),
body=b'Test message.')
connection.close()
return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
PORT = 9090
app.run(host='0.0.0.0', port=PORT, debug=True)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import time
import unittest
from os.path import abspath, dirname
from testcontainers.compose import DockerCompose
from tests.plugin import BasePluginTest
class TestPlugin(BasePluginTest):
@classmethod
def setUpClass(cls):
cls.compose = DockerCompose(filepath=dirname(abspath(__file__)))
cls.compose.start()
cls.compose.wait_for(cls.url(('producer', '9090'), 'users'))
def test_request_plugin(self):
time.sleep(3)
self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)), 'expected.data.yml'))
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册