未验证 提交 9165b1fc 编写于 作者: H huawei 提交者: GitHub

feature: add Kafka Plugin (#50)

上级 36a4c53e
......@@ -78,6 +78,7 @@ Library | Plugin Name
| [PyMySQL](https://pymysql.readthedocs.io/en/latest/) | `sw_pymysql` |
| [Django](https://www.djangoproject.com/) | `sw_django` |
| [redis-py](https://github.com/andymccurdy/redis-py/) | `sw_redis` |
| [kafka-python](https://kafka-python.readthedocs.io/en/master/) | `sw_kafka` |
| [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
## API
......
......@@ -47,6 +47,7 @@ setup(
"Werkzeug",
"pymysql",
"redis",
"kafka-python",
"tornado",
],
},
......
......@@ -30,6 +30,8 @@ class Component(Enum):
Django = 7004
Tornado = 7005
Redis = 7
KafkaProducer = 40
KafkaConsumer = 41
class Layer(Enum):
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from skywalking import Layer, Component
from skywalking.trace import tags
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
logger = logging.getLogger(__name__)
def install():
# noinspection PyBroadException
try:
from kafka import KafkaProducer
from kafka import KafkaConsumer
_send = KafkaProducer.send
__poll_once = KafkaConsumer._poll_once
KafkaProducer.send = _sw_send_func(_send)
KafkaConsumer._poll_once = _sw__poll_once_func(__poll_once)
except Exception:
logger.warning('failed to install plugin %s', __name__)
def _sw__poll_once_func(__poll_once):
def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
res = __poll_once(this, timeout_ms, max_records, update_offsets=update_offsets)
if res:
brokers = ";".join(this.config["bootstrap_servers"])
context = get_context()
topics = ";".join(this._subscription.subscription or
[t.topic for t in this._subscription._user_assignment])
with context.new_entry_span(
op="Kafka/" + topics + "/Consumer/" + (this.config["group_id"] or "")) as span:
for consumerRecords in res.values():
for record in consumerRecords:
carrier = Carrier()
for item in carrier:
for header in record.headers:
if item.key == header[0]:
item.val = str(header[1])
span.extract(carrier)
span.tag(Tag(key=tags.MqBroker, val=brokers))
span.tag(Tag(key=tags.MqTopic, val=topics))
span.layer = Layer.MQ
span.component = Component.KafkaConsumer
return res
return _sw__poll_once
def _sw_send_func(_send):
def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
peer = ";".join(this.config["bootstrap_servers"])
context = get_context()
carrier = Carrier()
with context.new_exit_span(op="Kafka/" + topic + "/Producer" or "/", peer=peer, carrier=carrier) as span:
span.layer = Layer.MQ
span.component = Component.KafkaProducer
if headers is None:
headers = []
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))
else:
for item in carrier:
headers.append((item.key, item.val.encode("utf-8")))
try:
res = _send(this, topic, value=value, key=key, headers=headers, partition=partition,
timestamp_ms=timestamp_ms)
span.tag(Tag(key=tags.MqBroker, val=peer))
span.tag(Tag(key=tags.MqTopic, val=topic))
except BaseException as e:
span.raised()
raise e
return res
return _sw_send
......@@ -28,3 +28,6 @@ DbInstance = 'db.instance'
DbStatement = 'db.statement'
DbSqlParameters = 'db.sql.parameters'
HttpParams = 'http.params'
MqBroker = 'mq.broker'
MqTopic = 'mq.topic'
MqQueue = 'mq.queue'
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2.1'
services:
collector:
extends:
service: collector
file: ../docker/docker-compose.base.yml
zookeeper-server:
image: zookeeper:3.4
hostname: zookeeper-server
ports:
- 2181:2181
networks:
- beyond
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/2181"]
interval: 5s
timeout: 60s
retries: 120
kafka-server:
image: bitnami/kafka:2.1.1
hostname: kafka-server
ports:
- 9092:9092
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181
- KAFKA_BROKER_ID=1
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
networks:
- beyond
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9092"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
zookeeper-server:
condition: service_healthy
producer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9090:9090
volumes:
- ./services/producer.py:/app/producer.py
command: ['bash', '-c', 'pip install flask && pip install kafka-python && python3 /app/producer.py']
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
collector:
condition: service_healthy
kafka-server:
condition: service_healthy
consumer:
condition: service_healthy
consumer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9091:9091
volumes:
- ./services/consumer.py:/app/consumer.py
command: ['bash', '-c', 'pip install flask && pip install kafka-python && python3 /app/consumer.py']
healthcheck:
test: ["CMD", "bash", "-c", "ps -ef | grep /app/consumer | grep -v grep"]
interval: 5s
timeout: 60s
retries: 120
depends_on:
collector:
condition: service_healthy
kafka-server:
condition: service_healthy
networks:
beyond:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
segmentItems:
- serviceName: producer
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: Kafka/skywalking/Producer
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: MQ
tags:
- key: mq.broker
value: 'kafka-server:9092'
- key: mq.topic
value: skywalking
startTime: gt 0
endTime: gt 0
componentId: 40
spanType: Exit
peer: kafka-server:9092
skipAnalysis: false
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: GET
- key: url
value: http://0.0.0.0:9090/users
- key: status.code
value: '200'
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
- serviceName: consumer
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: Kafka/skywalking/Consumer/skywalking
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: MQ
tags:
- key: mq.broker
value: 'kafka-server:9092'
- key: mq.topic
value: skywalking
refs:
- parentEndpoint: Kafka/skywalking/Producer
networkAddress: 'kafka-server:9092'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: producer
traceId: not null
startTime: gt 0
endTime: gt 0
componentId: 41
spanType: Entry
peer: ''
skipAnalysis: false
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from skywalking import config, agent
if __name__ == '__main__':
config.service_name = 'consumer'
config.logging_level = 'INFO'
agent.start()
topic = "skywalking"
server_list = ["kafka-server:9092"]
group_id = "skywalking"
client_id = "0"
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(group_id=group_id,
client_id=client_id,
bootstrap_servers=server_list)
partition = TopicPartition(topic, int(client_id))
consumer.assign([partition])
for msg in consumer:
print(msg)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from skywalking import agent, config
if __name__ == '__main__':
config.service_name = 'producer'
config.logging_level = 'INFO'
agent.start()
from flask import Flask, jsonify
from kafka import KafkaProducer
app = Flask(__name__)
producer = KafkaProducer(bootstrap_servers=['kafka-server:9092'], api_version=(1, 0, 1))
@app.route("/users", methods=["POST", "GET"])
def application():
producer.send('skywalking', b'some_message_bytes')
return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
PORT = 9090
app.run(host='0.0.0.0', port=PORT, debug=True)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import time
import unittest
from os.path import abspath, dirname
from testcontainers.compose import DockerCompose
from tests.plugin import BasePluginTest
class TestPlugin(BasePluginTest):
@classmethod
def setUpClass(cls):
cls.compose = DockerCompose(filepath=dirname(abspath(__file__)))
cls.compose.start()
cls.compose.wait_for(cls.url(('producer', '9090'), 'users'))
def test_request_plugin(self):
time.sleep(3)
self.validate(expected_file_name=os.path.join(dirname(abspath(__file__)), 'expected.data.yml'))
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册