diff --git a/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx b/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx new file mode 100644 index 0000000000000000000000000000000000000000..ffb969a8a6d0bc61e23fee44f245a24236db5b1b --- /dev/null +++ b/docs/en/07-develop/03-insert-data/20-kafka-writting.mdx @@ -0,0 +1,46 @@ +--- +title: Write from Kafka +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import PyKafka from "./_py_kafka.mdx"; + +## About Kafka + +Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to [kafka documentation](https://kafka.apache.org/documentation/#gettingStarted). + +### kafka topic + +Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through `kafka-topics`. + +create a topic named `kafka-events`: + +``` +bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092 +``` + +Alter `kafka-events` topic to set partitions to 3: + +``` +bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092 +``` + +Show all topics and partitions in Kafka: + +``` +bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe +``` + +## Insert into TDengine + +We can write data into TDengine via SQL or Schemaless. For more information, please refer to [Insert Using SQL](/develop/insert-data/sql-writing/) or [High Performance Writing](/develop/insert-data/high-volume/) or [Schemaless Writing](/reference/schemaless/). + +## Examples + + + + + + + diff --git a/docs/en/07-develop/03-insert-data/02-influxdb-line.mdx b/docs/en/07-develop/03-insert-data/30-influxdb-line.mdx similarity index 100% rename from docs/en/07-develop/03-insert-data/02-influxdb-line.mdx rename to docs/en/07-develop/03-insert-data/30-influxdb-line.mdx diff --git a/docs/en/07-develop/03-insert-data/03-opentsdb-telnet.mdx b/docs/en/07-develop/03-insert-data/40-opentsdb-telnet.mdx similarity index 100% rename from docs/en/07-develop/03-insert-data/03-opentsdb-telnet.mdx rename to docs/en/07-develop/03-insert-data/40-opentsdb-telnet.mdx diff --git a/docs/en/07-develop/03-insert-data/04-opentsdb-json.mdx b/docs/en/07-develop/03-insert-data/50-opentsdb-json.mdx similarity index 100% rename from docs/en/07-develop/03-insert-data/04-opentsdb-json.mdx rename to docs/en/07-develop/03-insert-data/50-opentsdb-json.mdx diff --git a/docs/en/07-develop/03-insert-data/05-high-volume.md b/docs/en/07-develop/03-insert-data/60-high-volume.md similarity index 100% rename from docs/en/07-develop/03-insert-data/05-high-volume.md rename to docs/en/07-develop/03-insert-data/60-high-volume.md diff --git a/docs/en/07-develop/03-insert-data/_py_kafka.mdx b/docs/en/07-develop/03-insert-data/_py_kafka.mdx new file mode 100644 index 0000000000000000000000000000000000000000..dc43a0d415275189220287278547b015865e8d90 --- /dev/null +++ b/docs/en/07-develop/03-insert-data/_py_kafka.mdx @@ -0,0 +1,60 @@ +### python Kafka 客户端 + +For python kafka client, please refer to [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python). In this document, we use [kafka-python](http://github.com/dpkp/kafka-python). + +### consume from Kafka + +The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +for msg in consumer: + print (msg) +``` + +For higher performance, we can consume message from kafka in batch. The demo is as follows: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +while True: + msgs = consumer.poll(timeout_ms=500, max_records=1000) + if msgs: + print (msgs) +``` + +### multi-threading + +For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows: + +``` +from concurrent.futures import ThreadPoolExecutor, Future +pool = ThreadPoolExecutor(max_workers=10) +pool.submit(...) +``` + +### multi-process + +For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows: + +``` +from multiprocessing import Process + +ps = [] +for i in range(5): + p = Process(target=Consumer().consume()) + p.start() + ps.append(p) + +for p in ps: + p.join() +``` + +In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn. + +### Examples + +```py +{{#include docs/examples/python/kafka_example.py}} +``` diff --git a/docs/examples/python/kafka_example.py b/docs/examples/python/kafka_example.py new file mode 100644 index 0000000000000000000000000000000000000000..735059eec0f3dcf5094810916e66a39db5682560 --- /dev/null +++ b/docs/examples/python/kafka_example.py @@ -0,0 +1,192 @@ +#! encoding = utf-8 +import json +import time +from json import JSONDecodeError +from typing import Callable +import logging +from concurrent.futures import ThreadPoolExecutor, Future + +import taos +from kafka import KafkaConsumer +from kafka.consumer.fetcher import ConsumerRecord + + +class Consumer(object): + DEFAULT_CONFIGS = { + 'kafka_brokers': 'localhost:9092', + 'kafka_topic': 'python_kafka', + 'kafka_group_id': 'taos', + 'taos_host': 'localhost', + 'taos_user': 'root', + 'taos_password': 'taosdata', + 'taos_database': 'power', + 'taos_port': 6030, + 'timezone': None, + 'clean_after_testing': False, + 'bath_consume': True, + 'batch_size': 1000, + 'async_model': True, + 'workers': 10 + } + + LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', + 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale', + 'California.SantaClara', 'California.Cupertino'] + + CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1' + USE_DATABASE_SQL = 'use {}' + DROP_TABLE_SQL = 'drop table if exists meters' + DROP_DATABASE_SQL = 'drop database if exists {}' + CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \ + 'tags (location binary(64), groupId int)' + CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})' + INSERT_SQL_HEADER = "insert into " + INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})' + + def __init__(self, **configs): + self.config: dict = self.DEFAULT_CONFIGS + self.config.update(configs) + self.consumer = KafkaConsumer( + self.config.get('kafka_topic'), # topic + bootstrap_servers=self.config.get('kafka_brokers'), + group_id=self.config.get('kafka_group_id'), + ) + self.taos = taos.connect( + host=self.config.get('taos_host'), + user=self.config.get('taos_user'), + password=self.config.get('taos_password'), + port=self.config.get('taos_port'), + timezone=self.config.get('timezone'), + ) + if self.config.get('async_model'): + self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) + self.tasks: list[Future] = [] + # tags and table mapping # key: {location}_{groupId} value: + self.tag_table_mapping = {} + i = 0 + for location in self.LOCATIONS: + for j in range(1, 11): + table_name = 'd{}'.format(i) + self._cache_table(location=location, group_id=j, table_name=table_name) + i += 1 + + def init_env(self): + # create database and table + self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) + self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database'))) + self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database'))) + self.taos.execute(self.DROP_TABLE_SQL) + self.taos.execute(self.CREATE_STABLE_SQL) + for tags, table_name in self.tag_table_mapping.items(): + location, group_id = _get_location_and_group(tags) + self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id)) + + def consume(self): + logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic')) + try: + if self.config.get('bath_consume'): + self._run_batch(self._to_taos_batch) + else: + self._run(self._to_taos) + except KeyboardInterrupt: + logging.warning("## caught keyboard interrupt, stopping") + finally: + self.stop() + + def stop(self): + # close consumer + if self.consumer is not None: + self.consumer.commit() + self.consumer.close() + + # multi thread + if self.config.get('async_model'): + for task in self.tasks: + while not task.done(): + pass + if self.pool is not None: + self.pool.shutdown() + + # clean data + if self.config.get('clean_after_testing'): + self.taos.execute(self.DROP_TABLE_SQL) + self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) + # close taos + if self.taos is not None: + self.taos.close() + + def _run(self, f: Callable[[ConsumerRecord], bool]): + for message in self.consumer: + if self.config.get('async_model'): + self.pool.submit(f(message)) + else: + f(message) + + def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]): + while True: + messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size')) + if messages: + if self.config.get('async_model'): + self.pool.submit(f, messages.values()) + else: + f(list(messages.values())) + if not messages: + time.sleep(0.1) + + def _to_taos(self, message: ConsumerRecord) -> bool: + sql = self.INSERT_SQL_HEADER + self._build_sql(message.value) + if len(sql) == 0: # decode error, skip + return True + logging.info('## insert sql %s', sql) + return self.taos.execute(sql=sql) == 1 + + def _to_taos_batch(self, messages: list[list[ConsumerRecord]]): + sql = self._build_sql_batch(messages=messages) + if len(sql) == 0: # decode error, skip + return + self.taos.execute(sql=sql) + + def _build_sql(self, msg_value: str) -> str: + try: + data = json.loads(msg_value) + except JSONDecodeError as e: + logging.error('## decode message [%s] error ', msg_value, e) + return '' + location = data.get('location') + group_id = data.get('groupId') + ts = data.get('ts') + current = data.get('current') + voltage = data.get('voltage') + phase = data.get('phase') + + table_name = self._get_table_name(location=location, group_id=group_id) + return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) + + def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str: + sql_list = [] + for partition_messages in messages: + for message in partition_messages: + sql_list.append(self._build_sql(message.value)) + + return self.INSERT_SQL_HEADER + ' '.join(sql_list) + + def _cache_table(self, location: str, group_id: int, table_name: str): + self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name + + def _get_table_name(self, location: str, group_id: int) -> str: + return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id)) + + +def _tag_table_mapping_key(location: str, group_id: int): + return '{}_{}'.format(location, group_id) + + +def _get_location_and_group(key: str) -> (str, int): + fields = key.split('_') + return fields[0], fields[1] + + +if __name__ == '__main__': + consumer = Consumer(async_model=True) + consumer.init_env() + consumer.consume() \ No newline at end of file diff --git a/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx b/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx new file mode 100644 index 0000000000000000000000000000000000000000..32d3c2e5cbe5a167e4d7cb459ace8259c407b693 --- /dev/null +++ b/docs/zh/07-develop/03-insert-data/20-kafka-writting.mdx @@ -0,0 +1,47 @@ +--- +title: 从 Kafka 写入 +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import PyKafka from "./_py_kafka.mdx"; + +## Kafka 介绍 + +Apache Kafka 是开源的分布式消息分发平台,被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic,其中 Producer 是向 Kafka 发送消息的进程,Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考[官方文档](https://kafka.apache.org/documentation/#gettingStarted)。 + + +### kafka topic + +Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 `kafka-topics` 管理 topic。 + +创建名为 `kafka-events` 的topic: + +``` +bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092 +``` + +修改 `kafka-events` 的 partition 数量为 3: + +``` +bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092 +``` + +展示所有的 topic 和 partition: + +``` +bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe +``` + +## 写入 TDengine + +TDengine 支持 Sql 方式和 Schemaless 方式的数据写入,Sql 方式数据写入可以参考 [TDengine SQL 写入](/develop/insert-data/sql-writing/) 和 [TDengine 高效写入](/develop/insert-data/high-volume/)。Schemaless 方式数据写入可以参考 [TDengine Schemaless 写入](/reference/schemaless/) 文档。 + +## 示例代码 + + + + + + + diff --git a/docs/zh/07-develop/03-insert-data/02-influxdb-line.mdx b/docs/zh/07-develop/03-insert-data/30-influxdb-line.mdx similarity index 100% rename from docs/zh/07-develop/03-insert-data/02-influxdb-line.mdx rename to docs/zh/07-develop/03-insert-data/30-influxdb-line.mdx diff --git a/docs/zh/07-develop/03-insert-data/03-opentsdb-telnet.mdx b/docs/zh/07-develop/03-insert-data/40-opentsdb-telnet.mdx similarity index 100% rename from docs/zh/07-develop/03-insert-data/03-opentsdb-telnet.mdx rename to docs/zh/07-develop/03-insert-data/40-opentsdb-telnet.mdx diff --git a/docs/zh/07-develop/03-insert-data/04-opentsdb-json.mdx b/docs/zh/07-develop/03-insert-data/50-opentsdb-json.mdx similarity index 100% rename from docs/zh/07-develop/03-insert-data/04-opentsdb-json.mdx rename to docs/zh/07-develop/03-insert-data/50-opentsdb-json.mdx diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/60-high-volume.md similarity index 100% rename from docs/zh/07-develop/03-insert-data/05-high-volume.md rename to docs/zh/07-develop/03-insert-data/60-high-volume.md diff --git a/docs/zh/07-develop/03-insert-data/_py_kafka.mdx b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx new file mode 100644 index 0000000000000000000000000000000000000000..cd7edf557d4ae41df0f55f4456fe405515132e51 --- /dev/null +++ b/docs/zh/07-develop/03-insert-data/_py_kafka.mdx @@ -0,0 +1,60 @@ +### python Kafka 客户端 + +Kafka 的 python 客户端可以参考文档 [kafka client](https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python)。推荐使用 [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python) 和 [kafka-python](http://github.com/dpkp/kafka-python)。以下示例以 [kafka-python](http://github.com/dpkp/kafka-python) 为例。 + +### 从 Kafka 消费数据 + +Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端单条消费数据的示例如下: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +for msg in consumer: + print (msg) +``` + +单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 [kafka-python](http://github.com/dpkp/kafka-python) 客户端批量消费数据的示例如下: + +``` +from kafka import KafkaConsumer +consumer = KafkaConsumer('my_favorite_topic') +while True: + msgs = consumer.poll(timeout_ms=500, max_records=1000) + if msgs: + print (msgs) +``` + +### Python 多线程 + +为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下: + +``` +from concurrent.futures import ThreadPoolExecutor, Future +pool = ThreadPoolExecutor(max_workers=10) +pool.submit(...) +``` + +### Python 多进程 + +单个python进程不能充分发挥多核 CPU 的性能,有时候我们会选择多进程的方式。在多进程的情况下,需要注意,Kafka Consumer 的数量应该小于等于 Kafka Topic Partition 数量。Python 多进程示例代码如下: + +``` +from multiprocessing import Process + +ps = [] +for i in range(5): + p = Process(target=Consumer().consume()) + p.start() + ps.append(p) + +for p in ps: + p.join() +``` + +除了 Python 内置的多线程和多进程方式,还可以通过第三方库 gunicorn 实现并发。 + +### 完整示例 + +```py +{{#include docs/examples/python/kafka_example.py}} +```