diff --git a/docs/examples/go/go.sum b/docs/examples/go/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..13e13adaa189053696320a6eb9740daa319a98b7 --- /dev/null +++ b/docs/examples/go/go.sum @@ -0,0 +1,15 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/taosdata/driver-go/v3 v3.1.0/go.mod h1:H2vo/At+rOPY1aMzUV9P49SVX7NlXb3LAbKw+MCLrmU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/docs/examples/python/conn_native_pandas.py b/docs/examples/python/conn_native_pandas.py index 56942ef57085766cd128b03cabb7a357587eab16..f3bab15efbe6669a88828fb194682dbfedb382df 100644 --- a/docs/examples/python/conn_native_pandas.py +++ b/docs/examples/python/conn_native_pandas.py @@ -1,8 +1,11 @@ import pandas -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text engine = create_engine("taos://root:taosdata@localhost:6030/power") -df = pandas.read_sql("SELECT * FROM meters", engine) +conn = engine.connect() +df = pandas.read_sql(text("SELECT * FROM power.meters"), conn) +conn.close() + # print index print(df.index) diff --git a/docs/examples/python/conn_rest_pandas.py b/docs/examples/python/conn_rest_pandas.py index 0164080cd5a05e72dce40b1d111ea423623ff9b2..1b207d6ff10a353f3473116ce807cc8daf362ca7 100644 --- a/docs/examples/python/conn_rest_pandas.py +++ b/docs/examples/python/conn_rest_pandas.py @@ -1,8 +1,10 @@ import pandas -from sqlalchemy import create_engine +from sqlalchemy import create_engine, text engine = create_engine("taosrest://root:taosdata@localhost:6041") -df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine) +conn = engine.connect() +df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn) +conn.close() # print index print(df.index) diff --git a/docs/examples/python/connect_rest_examples.py b/docs/examples/python/connect_rest_examples.py index dba00b5a8279a3cbb3cab0a2d8b26bb312364479..0f8625ae5387a275f7b84948ad80191b8e443862 100644 --- a/docs/examples/python/connect_rest_examples.py +++ b/docs/examples/python/connect_rest_examples.py @@ -1,18 +1,19 @@ # ANCHOR: connect from taosrest import connect, TaosRestConnection, TaosRestCursor -conn: TaosRestConnection = connect(url="http://localhost:6041", - user="root", - password="taosdata", - timeout=30) +conn = connect(url="http://localhost:6041", + user="root", + password="taosdata", + timeout=30) # ANCHOR_END: connect # ANCHOR: basic # create STable -cursor: TaosRestCursor = conn.cursor() +cursor = conn.cursor() cursor.execute("DROP DATABASE IF EXISTS power") cursor.execute("CREATE DATABASE power") -cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") +cursor.execute( + "CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") # insert data cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000) @@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount) # get column names from cursor column_names = [meta[0] for meta in cursor.description] # get rows -data: list[tuple] = cursor.fetchall() +data = cursor.fetchall() print(column_names) for row in data: print(row) diff --git a/docs/examples/python/connection_usage_native_reference.py b/docs/examples/python/connection_usage_native_reference.py index a7179b4cf859eb440b535a797eeb8e2be1e33589..8b754ec7226e8fd25dbdeb27b28faebdcf612049 100644 --- a/docs/examples/python/connection_usage_native_reference.py +++ b/docs/examples/python/connection_usage_native_reference.py @@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test") # change database. same as execute "USE db" conn.select_db("test") conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") -affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)") +affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)") print("affected_row", affected_row) # output: # affected_row 3 @@ -16,10 +16,10 @@ print("affected_row", affected_row) # ANCHOR: query # Execute a sql and get its result set. It's useful for SELECT statement -result: taos.TaosResult = conn.query("SELECT * from weather") +result = conn.query("SELECT * from weather") # Get fields from result -fields: taos.field.TaosFields = result.fields +fields = result.fields for field in fields: print(field) # {name: ts, type: 9, bytes: 8} diff --git a/docs/examples/python/fast_write_example.py b/docs/examples/python/fast_write_example.py index c9d606388fdecd85f1468f24cc497ecc5941f035..626e3310b120b9415952614b4b110ed29f787582 100644 --- a/docs/examples/python/fast_write_example.py +++ b/docs/examples/python/fast_write_example.py @@ -1,15 +1,14 @@ # install dependencies: # recommend python >= 3.8 -# pip3 install faster-fifo # import logging import math +import multiprocessing import sys import time import os -from multiprocessing import Process -from faster_fifo import Queue +from multiprocessing import Process, Queue from mockdatasource import MockDataSource from queue import Empty from typing import List @@ -22,8 +21,7 @@ TABLE_COUNT = 1000 QUEUE_SIZE = 1000000 MAX_BATCH_SIZE = 3000 -read_processes = [] -write_processes = [] +_DONE_MESSAGE = '__DONE__' def get_connection(): @@ -44,41 +42,64 @@ def get_connection(): # ANCHOR: read -def run_read_task(task_id: int, task_queues: List[Queue]): +def run_read_task(task_id: int, task_queues: List[Queue], infinity): table_count_per_task = TABLE_COUNT // READ_TASK_COUNT - data_source = MockDataSource(f"tb{task_id}", table_count_per_task) + data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity) try: for batch in data_source: + if isinstance(batch, tuple): + batch = [batch] for table_id, rows in batch: # hash data to different queue i = table_id % len(task_queues) # block putting forever when the queue is full - task_queues[i].put_many(rows, block=True, timeout=-1) + for row in rows: + task_queues[i].put(row) + if not infinity: + for queue in task_queues: + queue.put(_DONE_MESSAGE) except KeyboardInterrupt: pass + finally: + logging.info('read task over') # ANCHOR_END: read + # ANCHOR: write -def run_write_task(task_id: int, queue: Queue): +def run_write_task(task_id: int, queue: Queue, done_queue: Queue): from sql_writer import SQLWriter log = logging.getLogger(f"WriteTask-{task_id}") writer = SQLWriter(get_connection) lines = None try: while True: - try: - # get as many as possible - lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE) + over = False + lines = [] + for _ in range(MAX_BATCH_SIZE): + try: + line = queue.get_nowait() + if line == _DONE_MESSAGE: + over = True + break + if line: + lines.append(line) + except Empty: + time.sleep(0.1) + if len(lines) > 0: writer.process_lines(lines) - except Empty: - time.sleep(0.01) + if over: + done_queue.put(_DONE_MESSAGE) + break except KeyboardInterrupt: pass except BaseException as e: log.debug(f"lines={lines}") raise e + finally: + writer.close() + log.debug('write task over') # ANCHOR_END: write @@ -103,47 +124,64 @@ def set_global_config(): # ANCHOR: monitor -def run_monitor_process(): +def run_monitor_process(done_queue: Queue): log = logging.getLogger("DataBaseMonitor") - conn = get_connection() - conn.execute("DROP DATABASE IF EXISTS test") - conn.execute("CREATE DATABASE test") - conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " - "TAGS (location BINARY(64), groupId INT)") + conn = None + try: + conn = get_connection() - def get_count(): - res = conn.query("SELECT count(*) FROM test.meters") - rows = res.fetch_all() - return rows[0][0] if rows else 0 + def get_count(): + res = conn.query("SELECT count(*) FROM test.meters") + rows = res.fetch_all() + return rows[0][0] if rows else 0 - last_count = 0 - while True: - time.sleep(10) - count = get_count() - log.info(f"count={count} speed={(count - last_count) / 10}") - last_count = count + last_count = 0 + while True: + try: + done = done_queue.get_nowait() + if done == _DONE_MESSAGE: + break + except Empty: + pass + time.sleep(10) + count = get_count() + log.info(f"count={count} speed={(count - last_count) / 10}") + last_count = count + finally: + conn.close() # ANCHOR_END: monitor # ANCHOR: main -def main(): +def main(infinity): set_global_config() logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, " f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") - monitor_process = Process(target=run_monitor_process) + conn = get_connection() + conn.execute("DROP DATABASE IF EXISTS test") + conn.execute("CREATE DATABASE IF NOT EXISTS test") + conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + "TAGS (location BINARY(64), groupId INT)") + conn.close() + + done_queue = Queue() + monitor_process = Process(target=run_monitor_process, args=(done_queue,)) monitor_process.start() - time.sleep(3) # waiting for database ready. + logging.debug(f"monitor task started with pid {monitor_process.pid}") task_queues: List[Queue] = [] + write_processes = [] + read_processes = [] + # create task queues for i in range(WRITE_TASK_COUNT): - queue = Queue(max_size_bytes=QUEUE_SIZE) + queue = Queue() task_queues.append(queue) # create write processes for i in range(WRITE_TASK_COUNT): - p = Process(target=run_write_task, args=(i, task_queues[i])) + p = Process(target=run_write_task, args=(i, task_queues[i], done_queue)) p.start() logging.debug(f"WriteTask-{i} started with pid {p.pid}") write_processes.append(p) @@ -151,13 +189,19 @@ def main(): # create read processes for i in range(READ_TASK_COUNT): queues = assign_queues(i, task_queues) - p = Process(target=run_read_task, args=(i, queues)) + p = Process(target=run_read_task, args=(i, queues, infinity)) p.start() logging.debug(f"ReadTask-{i} started with pid {p.pid}") read_processes.append(p) try: monitor_process.join() + for p in read_processes: + p.join() + for p in write_processes: + p.join() + time.sleep(1) + return except KeyboardInterrupt: monitor_process.terminate() [p.terminate() for p in read_processes] @@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues): if __name__ == '__main__': - main() + multiprocessing.set_start_method('spawn') + main(False) # ANCHOR_END: main diff --git a/docs/examples/python/kafka_example.py b/docs/examples/python/kafka_example.py index 735059eec0f3dcf5094810916e66a39db5682560..43f9183f7e25b680827aef836363ef5f0549468b 100644 --- a/docs/examples/python/kafka_example.py +++ b/docs/examples/python/kafka_example.py @@ -26,7 +26,8 @@ class Consumer(object): 'bath_consume': True, 'batch_size': 1000, 'async_model': True, - 'workers': 10 + 'workers': 10, + 'testing': False } LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose', @@ -46,11 +47,12 @@ class Consumer(object): def __init__(self, **configs): self.config: dict = self.DEFAULT_CONFIGS self.config.update(configs) - self.consumer = KafkaConsumer( - self.config.get('kafka_topic'), # topic - bootstrap_servers=self.config.get('kafka_brokers'), - group_id=self.config.get('kafka_group_id'), - ) + if not self.config.get('testing'): + self.consumer = KafkaConsumer( + self.config.get('kafka_topic'), # topic + bootstrap_servers=self.config.get('kafka_brokers'), + group_id=self.config.get('kafka_group_id'), + ) self.taos = taos.connect( host=self.config.get('taos_host'), user=self.config.get('taos_user'), @@ -60,7 +62,7 @@ class Consumer(object): ) if self.config.get('async_model'): self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers')) - self.tasks: list[Future] = [] + self.tasks = [] # tags and table mapping # key: {location}_{groupId} value: self.tag_table_mapping = {} i = 0 @@ -115,14 +117,14 @@ class Consumer(object): if self.taos is not None: self.taos.close() - def _run(self, f: Callable[[ConsumerRecord], bool]): + def _run(self, f): for message in self.consumer: if self.config.get('async_model'): self.pool.submit(f(message)) else: f(message) - def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]): + def _run_batch(self, f): while True: messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size')) if messages: @@ -140,7 +142,7 @@ class Consumer(object): logging.info('## insert sql %s', sql) return self.taos.execute(sql=sql) == 1 - def _to_taos_batch(self, messages: list[list[ConsumerRecord]]): + def _to_taos_batch(self, messages): sql = self._build_sql_batch(messages=messages) if len(sql) == 0: # decode error, skip return @@ -162,7 +164,7 @@ class Consumer(object): table_name = self._get_table_name(location=location, group_id=group_id) return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase) - def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str: + def _build_sql_batch(self, messages) -> str: sql_list = [] for partition_messages in messages: for message in partition_messages: @@ -186,7 +188,54 @@ def _get_location_and_group(key: str) -> (str, int): return fields[0], fields[1] +def test_to_taos(consumer: Consumer): + msg = { + 'location': 'California.SanFrancisco', + 'groupId': 1, + 'ts': '2022-12-06 15:13:38.643', + 'current': 3.41, + 'voltage': 105, + 'phase': 0.02027, + } + record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1, + topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None) + assert consumer._to_taos(message=record) + + +def test_to_taos_batch(consumer: Consumer): + records = [ + [ + ConsumerRecord(checksum=None, headers=None, offset=1, key=None, + value=json.dumps({'location': 'California.SanFrancisco', + 'groupId': 1, + 'ts': '2022-12-06 15:13:38.643', + 'current': 3.41, + 'voltage': 105, + 'phase': 0.02027, }), + partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None), + ConsumerRecord(checksum=None, headers=None, offset=1, key=None, + value=json.dumps({'location': 'California.LosAngles', + 'groupId': 2, + 'ts': '2022-12-06 15:13:39.643', + 'current': 3.41, + 'voltage': 102, + 'phase': 0.02027, }), + partition=1, topic='test', serialized_key_size=None, serialized_header_size=None, + serialized_value_size=None, timestamp=time.time(), timestamp_type=None), + ] + ] + + consumer._to_taos_batch(messages=records) + + if __name__ == '__main__': - consumer = Consumer(async_model=True) + consumer = Consumer(async_model=True, testing=True) + # init env consumer.init_env() - consumer.consume() \ No newline at end of file + # consumer.consume() + # test build sql + # test build sql batch + test_to_taos(consumer) + test_to_taos_batch(consumer) diff --git a/docs/examples/python/mockdatasource.py b/docs/examples/python/mockdatasource.py index 1c516a800e007934f8e6815f82024a53fea70073..9c702936ea6f1bdff3f604d376fd1925b4dc118e 100644 --- a/docs/examples/python/mockdatasource.py +++ b/docs/examples/python/mockdatasource.py @@ -10,13 +10,14 @@ class MockDataSource: "9.4,118,0.141,California.SanFrancisco,4" ] - def __init__(self, tb_name_prefix, table_count): + def __init__(self, tb_name_prefix, table_count, infinity=True): self.table_name_prefix = tb_name_prefix + "_" self.table_count = table_count self.max_rows = 10000000 self.current_ts = round(time.time() * 1000) - self.max_rows * 100 # [(tableId, tableName, values),] self.data = self._init_data() + self.infinity = infinity def _init_data(self): lines = self.samples * (self.table_count // 5 + 1) @@ -28,14 +29,19 @@ class MockDataSource: def __iter__(self): self.row = 0 - return self + if not self.infinity: + return iter(self._iter_data()) + else: + return self def __next__(self): """ next 1000 rows for each table. return: {tableId:[row,...]} """ - # generate 1000 timestamps + return self._iter_data() + + def _iter_data(self): ts = [] for _ in range(1000): self.current_ts += 100 @@ -47,3 +53,9 @@ class MockDataSource: rows = [table_name + ',' + t + ',' + values for t in ts] result.append((table_id, rows)) return result + + +if __name__ == '__main__': + datasource = MockDataSource('t', 10, False) + for data in datasource: + print(data) diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py index 758167376b009f21afc701be7d89c1bfbabdeb9f..3456981a7b9a174e38f8795ff7251ab3c675174b 100644 --- a/docs/examples/python/sql_writer.py +++ b/docs/examples/python/sql_writer.py @@ -10,6 +10,7 @@ class SQLWriter: self._tb_tags = {} self._conn = get_connection_func() self._max_sql_length = self.get_max_sql_length() + self._conn.execute("create database if not exists test") self._conn.execute("USE test") def get_max_sql_length(self): @@ -20,7 +21,7 @@ class SQLWriter: return int(r[1]) return 1024 * 1024 - def process_lines(self, lines: str): + def process_lines(self, lines: [str]): """ :param lines: [[tbName,ts,current,voltage,phase,location,groupId]] """ @@ -60,6 +61,7 @@ class SQLWriter: buf.append(q) sql_len += len(q) sql += " ".join(buf) + self.create_tables() self.execute_sql(sql) self._tb_values.clear() @@ -88,3 +90,22 @@ class SQLWriter: except BaseException as e: self.log.error("Execute SQL: %s", sql) raise e + + def close(self): + if self._conn: + self._conn.close() + + +if __name__ == '__main__': + def get_connection_func(): + conn = taos.connect() + return conn + + + writer = SQLWriter(get_connection_func=get_connection_func) + writer.execute_sql( + "create stable if not exists meters (ts timestamp, current float, voltage int, phase float) " + "tags (location binary(64), groupId int)") + writer.execute_sql( + "INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) " + "VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32)") diff --git a/docs/examples/python/tmq_example.py b/docs/examples/python/tmq_example.py index fafa81e8b552b32a47c9e96833ec6d5959dc538a..6f7fb87c89ce4cb96793d09a837f60ad54ae69bc 100644 --- a/docs/examples/python/tmq_example.py +++ b/docs/examples/python/tmq_example.py @@ -19,8 +19,14 @@ def init_tmq_env(db, topic): conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')") +def cleanup(db, topic): + conn = taos.connect() + conn.execute("drop topic if exists {}".format(topic)) + conn.execute("drop database if exists {}".format(db)) + + if __name__ == '__main__': - init_tmq_env("tmq_test", "tmq_test_topic") # init env + init_tmq_env("tmq_test", "tmq_test_topic") # init env consumer = Consumer( { "group.id": "tg2", @@ -33,9 +39,9 @@ if __name__ == '__main__': try: while True: - res = consumer.poll(100) + res = consumer.poll(1) if not res: - continue + break err = res.error() if err is not None: raise err @@ -46,3 +52,4 @@ if __name__ == '__main__': finally: consumer.unsubscribe() consumer.close() + cleanup("tmq_test", "tmq_test_topic") diff --git a/tests/docs-examples-test/python.sh b/tests/docs-examples-test/python.sh index 140d05395bdf6c32bbded25bb53ffaab523e3434..ccb391b7527fbf2490911d868d08c87436221162 100644 --- a/tests/docs-examples-test/python.sh +++ b/tests/docs-examples-test/python.sh @@ -23,7 +23,7 @@ python3 bind_param_example.py # 4 taos -s "drop database power" -python3 multi_bind_example.py +python3 multi_bind_example.py # 5 python3 query_example.py @@ -44,4 +44,43 @@ taos -s "drop database test" python3 json_protocol_example.py # 10 -# python3 subscribe_demo.py +pip install SQLAlchemy +pip install pandas +taosBenchmark -y -d power -t 10 -n 10 +python3 conn_native_pandas.py +python3 conn_rest_pandas.py +taos -s "drop database if exists power" + +# 11 +taos -s "create database if not exists test" +python3 connect_native_reference.py + +# 12 +python3 connect_rest_examples.py + +# 13 +python3 handle_exception.py + +# 14 +taosBenchmark -y -d power -t 2 -n 10 +python3 rest_client_example.py +taos -s "drop database if exists power" + +# 15 +python3 result_set_examples.py + +# 16 +python3 tmq_example.py + +# 17 +python3 sql_writer.py + +# 18 +python3 mockdatasource.py + +# 19 +python3 fast_write_example.py + +# 20 +pip3 install kafka-python +python3 kafka_example.py