_py_kafka.mdx 3.6 KB
Newer Older
S
sunpeng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
### python Kafka 客户端

Kafka 的 python 客户端可以参考文档 [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python)。推荐使用 [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) 和 [kafka-python](http://github.com/dpkp/kafka-python)。以下示例以 [kafka-python](http://github.com/dpkp/kafka-python) 为例。

### 从 Kafka 消费数据

Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端单条消费数据的示例如下:

```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
     print (msg)
```

单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端批量消费数据的示例如下:

```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
    msgs = consumer.poll(timeout_ms=500, max_records=1000)
    if msgs:
        print (msgs)
```

### Python 多线程

为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下:

```
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
```

### Python 多进程

单个python进程不能充分发挥多核 CPU 的性能,有时候我们会选择多进程的方式。在多进程的情况下,需要注意,Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下:

```
from multiprocessing import Process

ps = []
for i in range(5):
    p = Process(target=Consumer().consume())
    p.start()
    ps.append(p)

for p in ps:
    p.join()
```

除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。

### 完整示例

58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
<details>
<summary>kafka_example_perform</summary>

`kafka_example_perform` 是示例程序的入口

```py
{{#include docs/examples/python/kafka_example_perform.py}}
```
</details>

<details>
<summary>kafka_example_common</summary>

`kafka_example_common` 是示例程序的公共代码

S
sunpeng 已提交
73
```py
74
{{#include docs/examples/python/kafka_example_common.py}}
S
sunpeng 已提交
75
```
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
</details>

<details>
<summary>kafka_example_producer</summary>

`kafka_example_producer` 是示例程序的 producer 代码,负责生成并发送测试数据到 kafka

```py
{{#include docs/examples/python/kafka_example_producer.py}}
```
</details>

<details>
<summary>kafka_example_consumer</summary>

`kafka_example_consumer` 是示例程序的 consumer 代码,负责从 kafka 消费数据,并写入到 TDengine

```py
{{#include docs/examples/python/kafka_example_consumer.py}}
```
</details>

### 执行步骤

<details>
    <summary>执行 Python 示例程序</summary>

    1. 安装并启动 kafka

    2. python 环境准备
        - 安装 python3
        - 安装 taospy
        - 安装 kafka-python

    3. 执行示例程序

    程序的执行入口是 `kafka_example_perform.py`,获取程序完整的执行参数,请执行 help 命令。

    ```
    python3 kafka_example_perform.py --help
    ```

    以下为创建 100 个子表,每个子表 20000 条数据,kafka max poll 为 100,一个进程,每个进程一个处理线程的程序执行命令

    ```
    python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1
    ```

</details>