#! encoding = utf-8 import json import time from json import JSONDecodeError from typing import Callable import logging from concurrent.futures import ThreadPoolExecutor, Future import taos from kafka import KafkaConsumer from kafka.consumer.fetcher import ConsumerRecord class Consumer(object): DEFAULT_CONFIGS = { 'kafka_brokers': 'localhost:9092', 'kafka_topic': 'python_kafka', 'kafka_group_id': 'taos', 'taos_host': 'localhost', 'taos_user': 'root', 'taos_password': 'taosdata', 'taos_database': 'power', 'taos_port': 6030, 'timezone': None, 'clean_after_testing': False, 'bath_consume': True, 'batch_size': 1000, 'async_model': True, 'workers': 10, 'testing': False } LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', 'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale', 'California.SantaClara', 'California.Cupertino'] CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1' USE_DATABASE_SQL = 'use {}' DROP_TABLE_SQL = 'drop table if exists meters' DROP_DATABASE_SQL = 'drop database if exists {}' CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) ' \ 'tags (location binary(64), groupId int)' CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})' INSERT_SQL_HEADER = "insert into " INSERT_PART_SQL = 'power.{} values (\'{}\', {}, {}, {})' def __init__(self, **configs): self.config: dict = self.DEFAULT_CONFIGS self.config.update(configs) if not self.config.get('testing'): self.consumer = KafkaConsumer( self.config.get('kafka_topic'), # topic bootstrap_servers=self.config.get('kafka_brokers'), group_id=self.config.get('kafka_group_id'), ) self.taos = taos.connect( host=self.config.get('taos_host'), user=self.config.get('taos_user'), password=self.config.get('taos_password'), port=self.config.get('taos_port'), timezone=self.config.get('timezone'), ) if self.config.get('async_model'): self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) self.tasks = [] # tags and table mapping # key: {location}_{groupId} value: self.tag_table_mapping = {} i = 0 for location in self.LOCATIONS: for j in range(1, 11): table_name = 'd{}'.format(i) self._cache_table(location=location, group_id=j, table_name=table_name) i += 1 def init_env(self): # create database and table self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) self.taos.execute(self.CREATE_DATABASE_SQL.format(self.config.get('taos_database'))) self.taos.execute(self.USE_DATABASE_SQL.format(self.config.get('taos_database'))) self.taos.execute(self.DROP_TABLE_SQL) self.taos.execute(self.CREATE_STABLE_SQL) for tags, table_name in self.tag_table_mapping.items(): location, group_id = _get_location_and_group(tags) self.taos.execute(self.CREATE_TABLE_SQL.format(table_name, location, group_id)) def consume(self): logging.warning('## start consumer topic-[%s]', self.config.get('kafka_topic')) try: if self.config.get('bath_consume'): self._run_batch(self._to_taos_batch) else: self._run(self._to_taos) except KeyboardInterrupt: logging.warning("## caught keyboard interrupt, stopping") finally: self.stop() def stop(self): # close consumer if self.consumer is not None: self.consumer.commit() self.consumer.close() # multi thread if self.config.get('async_model'): for task in self.tasks: while not task.done(): pass if self.pool is not None: self.pool.shutdown() # clean data if self.config.get('clean_after_testing'): self.taos.execute(self.DROP_TABLE_SQL) self.taos.execute(self.DROP_DATABASE_SQL.format(self.config.get('taos_database'))) # close taos if self.taos is not None: self.taos.close() def _run(self, f): for message in self.consumer: if self.config.get('async_model'): self.pool.submit(f(message)) else: f(message) def _run_batch(self, f): while True: messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size')) if messages: if self.config.get('async_model'): self.pool.submit(f, messages.values()) else: f(list(messages.values())) if not messages: time.sleep(0.1) def _to_taos(self, message: ConsumerRecord) -> bool: sql = self.INSERT_SQL_HEADER + self._build_sql(message.value) if len(sql) == 0: # decode error, skip return True logging.info('## insert sql %s', sql) return self.taos.execute(sql=sql) == 1 def _to_taos_batch(self, messages): sql = self._build_sql_batch(messages=messages) if len(sql) == 0: # decode error, skip return self.taos.execute(sql=sql) def _build_sql(self, msg_value: str) -> str: try: data = json.loads(msg_value) except JSONDecodeError as e: logging.error('## decode message [%s] error ', msg_value, e) return '' location = data.get('location') group_id = data.get('groupId') ts = data.get('ts') current = data.get('current') voltage = data.get('voltage') phase = data.get('phase') table_name = self._get_table_name(location=location, group_id=group_id) return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) def _build_sql_batch(self, messages) -> str: sql_list = [] for partition_messages in messages: for message in partition_messages: sql_list.append(self._build_sql(message.value)) return self.INSERT_SQL_HEADER + ' '.join(sql_list) def _cache_table(self, location: str, group_id: int, table_name: str): self.tag_table_mapping[_tag_table_mapping_key(location=location, group_id=group_id)] = table_name def _get_table_name(self, location: str, group_id: int) -> str: return self.tag_table_mapping.get(_tag_table_mapping_key(location=location, group_id=group_id)) def _tag_table_mapping_key(location: str, group_id: int): return '{}_{}'.format(location, group_id) def _get_location_and_group(key: str) -> (str, int): fields = key.split('_') return fields[0], fields[1] def test_to_taos(consumer: Consumer): msg = { 'location': 'California.SanFrancisco', 'groupId': 1, 'ts': '2022-12-06 15:13:38.643', 'current': 3.41, 'voltage': 105, 'phase': 0.02027, } record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, serialized_value_size=None, timestamp=time.time(), timestamp_type=None) assert consumer._to_taos(message=record) def test_to_taos_batch(consumer: Consumer): records = [ [ ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps({'location': 'California.SanFrancisco', 'groupId': 1, 'ts': '2022-12-06 15:13:38.643', 'current': 3.41, 'voltage': 105, 'phase': 0.02027, }), partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, serialized_value_size=None, timestamp=time.time(), timestamp_type=None), ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps({'location': 'California.LosAngles', 'groupId': 2, 'ts': '2022-12-06 15:13:39.643', 'current': 3.41, 'voltage': 102, 'phase': 0.02027, }), partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, serialized_value_size=None, timestamp=time.time(), timestamp_type=None), ] ] consumer._to_taos_batch(messages=records) if __name__ == '__main__': consumer = Consumer(async_model=True, testing=True) # init env consumer.init_env() # consumer.consume() # test build sql # test build sql batch test_to_taos(consumer) test_to_taos_batch(consumer)