_py_kafka.mdx 1.6 KB
Newer Older
S
sunpeng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
### python Kafka 客户端

For python kafka client, please refer to [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python). In this document, we use [kafka-python](http://github.com/dpkp/kafka-python).

### consume from Kafka

The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows:

```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
     print (msg)
```

For higher performance, we can consume message from kafka in batch. The demo is as follows:

```
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
    msgs = consumer.poll(timeout_ms=500, max_records=1000)
    if msgs:
        print (msgs)
```

### multi-threading

For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows:

```
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
```

### multi-process

For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows:

```
from multiprocessing import Process

ps = []
for i in range(5):
    p = Process(target=Consumer().consume())
    p.start()
    ps.append(p)

for p in ps:
    p.join()
```

In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.

### Examples

```py
{{#include docs/examples/python/kafka_example.py}}
```