kafka_example_common.py 2.4 KB
Newer Older
1 2 3 4 5 6 7
#! encoding = utf-8
import taos

LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
             'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
             'California.SantaClara', 'California.Cupertino']

8
CREATE_DATABASE_SQL = 'create database if not exists {} keep 365 duration 10 buffer 16 wal_level 1 wal_retention_period 3600'
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
USE_DATABASE_SQL = 'use {}'
DROP_TABLE_SQL = 'drop table if exists meters'
DROP_DATABASE_SQL = 'drop database if exists {}'
CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) tags ' \
                    '(location binary(64), groupId int)'
CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'


def create_database_and_tables(host, port, user, password, db, table_count):
    tags_tables = _init_tags_table_names(table_count=table_count)
    conn = taos.connect(host=host, port=port, user=user, password=password)

    conn.execute(DROP_DATABASE_SQL.format(db))
    conn.execute(CREATE_DATABASE_SQL.format(db))
    conn.execute(USE_DATABASE_SQL.format(db))
    conn.execute(DROP_TABLE_SQL)
    conn.execute(CREATE_STABLE_SQL)
    for tags in tags_tables:
        location, group_id = _get_location_and_group(tags)
        tables = tags_tables[tags]
        for table_name in tables:
            conn.execute(CREATE_TABLE_SQL.format(table_name, location, group_id))
    conn.close()


def clean(host, port, user, password, db):
    conn = taos.connect(host=host, port=port, user=user, password=password)
    conn.execute(DROP_DATABASE_SQL.format(db))
    conn.close()


def _init_tags_table_names(table_count):
    tags_table_names = {}
    group_id = 0
    for i in range(table_count):
        table_name = 'd{}'.format(i)
        location_idx = i % len(LOCATIONS)
        location = LOCATIONS[location_idx]
        if location_idx == 0:
            group_id += 1
            if group_id > 10:
                group_id -= 10
        key = _tag_table_mapping_key(location=location, group_id=group_id)
        if key not in tags_table_names:
            tags_table_names[key] = []
        tags_table_names[key].append(table_name)

    return tags_table_names


def _tag_table_mapping_key(location, group_id):
    return '{}_{}'.format(location, group_id)


def _get_location_and_group(key):
    fields = key.split('_')
    return fields[0], fields[1]