diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 2ebe4b52a2291bf19e0490f21ee2f7f08779e658..c0a899bbe4c9953aea0b727fcbb5bbd54bd031a6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -120,6 +120,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py +,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py diff --git a/tests/pytest/util/cases.py b/tests/pytest/util/cases.py index 4830d2f8b09e2364388f2c3ae1ad24a740de5390..536b8f30d3582875e85d1260861d6839f8503f6f 100644 --- a/tests/pytest/util/cases.py +++ b/tests/pytest/util/cases.py @@ -17,6 +17,7 @@ import time import datetime import inspect import importlib +import traceback from util.log import * @@ -75,6 +76,7 @@ class TDCases: case.run() except Exception as e: tdLog.notice(repr(e)) + traceback.print_exc() tdLog.exit("%s failed" % (fileName)) case.stop() runNum += 1 diff --git a/tests/system-test/0-others/udfpy/af_count.py b/tests/system-test/0-others/udfpy/af_count.py new file mode 100644 index 0000000000000000000000000000000000000000..ce29abca13866f9f311d1e868a5df68c71c5f735 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_count.py @@ -0,0 +1,23 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps(0) + +def finish(buf): + count = pickle.loads(buf) + return count + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + count = pickle.loads(buf) + for i in range(rows): + val = datablock.data(i, 0) + if val is not None: + count += 1 + return pickle.dumps(count) \ No newline at end of file diff --git a/tests/system-test/0-others/udfpy/af_min.py b/tests/system-test/0-others/udfpy/af_min.py new file mode 100644 index 0000000000000000000000000000000000000000..9f1aadf4145c7bb585040d1f05640c913aab2aee --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_min.py @@ -0,0 +1,30 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + mins = pickle.loads(buf) + min_val = None + for min in mins: + if min_val is None or (min is not None and min < min_val): + min_val = min + return min_val + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + mins = pickle.loads(buf) + min = None + for i in range(rows): + val = datablock.data(i, 0) + if min is None or (val is not None and val < min) : + min = val + if min is not None: + mins.append(min) + return pickle.dumps(mins) diff --git a/tests/system-test/0-others/udfpy/af_null.py b/tests/system-test/0-others/udfpy/af_null.py new file mode 100644 index 0000000000000000000000000000000000000000..230eac6888245444a4f902274a83bb5671626515 --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_null.py @@ -0,0 +1,19 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps([]) + +def finish(buf): + return None + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + mins = pickle.loads(buf) + mins.append(None) + return pickle.dumps(mins) diff --git a/tests/system-test/0-others/udfpy/af_sum.py b/tests/system-test/0-others/udfpy/af_sum.py new file mode 100644 index 0000000000000000000000000000000000000000..8b88aba56ca08f64249f85a4df596d9fd204b05a --- /dev/null +++ b/tests/system-test/0-others/udfpy/af_sum.py @@ -0,0 +1,26 @@ +import pickle + +def init(): + pass + +def destroy(): + pass + +def start(): + return pickle.dumps(None) + +def finish(buf): + sum = pickle.loads(buf) + return sum + +def reduce(datablock, buf): + (rows, cols) = datablock.shape() + sum = pickle.loads(buf) + for i in range(rows): + val = datablock.data(i, 0) + if val is not None: + if sum is None: + sum = val + else: + sum += val + return pickle.dumps(sum) diff --git a/tests/system-test/0-others/udfpy/sf_concat_nch.py b/tests/system-test/0-others/udfpy/sf_concat_nch.py new file mode 100644 index 0000000000000000000000000000000000000000..84d8eb2c961c11282784f81b6206a36fba0f2edc --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_concat_nch.py @@ -0,0 +1,27 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + row = [] + for j in range(ncols): + val = block.data(i, j) + if val is None: + row = None + break + row.append(val.decode('utf_32_le')) + if row is None: + results.append(None) + else: + row_str = ''.join(row) + results.append(row_str.encode('utf_32_le')) + return results + + diff --git a/tests/system-test/0-others/udfpy/sf_concat_var.py b/tests/system-test/0-others/udfpy/sf_concat_var.py new file mode 100644 index 0000000000000000000000000000000000000000..fc8292c718d5c8def7cebc87ee7938b2c7bd3849 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_concat_var.py @@ -0,0 +1,26 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + row = [] + for j in range(ncols): + val = block.data(i, j) + if val is None: + row = None + break + row.append(val.decode('utf-8')) + if row is None: + results.append(None) + else: + results.append(''.join(row)) + return results + + diff --git a/tests/system-test/0-others/udfpy/sf_multi_args.py b/tests/system-test/0-others/udfpy/sf_multi_args.py new file mode 100644 index 0000000000000000000000000000000000000000..1026661d8d127eeb52b6b79f9b81dd29ccc24ba5 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_multi_args.py @@ -0,0 +1,23 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +# return origin column one value +def process(block): + (nrows, ncols) = block.shape() + results = [] + for i in range(nrows): + rows = [] + for j in range(ncols): + val = block.data(i, j) + if type(val) is bytes: + rows.append(val.decode('utf-8')) + else: + rows.append(repr(val)) + results.append(','.join(rows)) + return results + diff --git a/tests/system-test/0-others/udfpy/sf_null.py b/tests/system-test/0-others/udfpy/sf_null.py new file mode 100644 index 0000000000000000000000000000000000000000..c22ca95b19f93d2d494a4890154923626a6a445a --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_null.py @@ -0,0 +1,16 @@ + +# init +def init(): + pass + +# destroy +def destroy(): + pass + +# return origin column one value +def process(block): + (rows, cols) = block.shape() + results = [] + for i in range(rows): + results.append(None) + return results \ No newline at end of file diff --git a/tests/system-test/0-others/udfpy/sf_origin.py b/tests/system-test/0-others/udfpy/sf_origin.py new file mode 100644 index 0000000000000000000000000000000000000000..9158e044d2defb118683f56b889bf95bb7d04ee6 --- /dev/null +++ b/tests/system-test/0-others/udfpy/sf_origin.py @@ -0,0 +1,15 @@ +# init +def init(): + pass + +# destroy +def destroy(): + pass + +# return origin column one value +def process(block): + (rows, cols) = block.shape() + results = [] + for i in range(rows): + results.append(block.data(i,0)) + return results diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py new file mode 100644 index 0000000000000000000000000000000000000000..916b032edbde72a4ddc011608b924f95bb035c58 --- /dev/null +++ b/tests/system-test/0-others/udfpy_main.py @@ -0,0 +1,464 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + + +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * + +import random +import os +import subprocess + + +class PerfDB: + def __init__(self): + self.sqls = [] + self.spends = [] + + # execute + def execute(self, sql): + print(f" perfdb execute {sql}") + stime = time.time() + ret = tdSql.execute(sql, 1) + spend = time.time() - stime + + self.sqls.append(sql) + self.spends.append(spend) + return ret + + # query + def query(self, sql): + print(f" perfdb query {sql}") + start = time.time() + ret = tdSql.query(sql, None, 1) + spend = time.time() - start + self.sqls.append(sql) + self.spends.append(spend) + return ret + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + + # udf path + self.udf_path = os.path.dirname(os.path.realpath(__file__)) + "/udfpy" + + + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'tinyint', + 'col2': 'smallint', + 'col3': 'int', + 'col4': 'bigint', + 'col5': 'tinyint unsigned', + 'col6': 'smallint unsigned', + 'col7': 'int unsigned', + 'col8': 'bigint unsigned', + 'col9': 'float', + 'col10': 'double', + 'col11': 'bool', + 'col12': 'varchar(120)', + 'col13': 'nchar(100)', + } + self.tag_dict = { + 't1': 'tinyint', + 't2': 'smallint', + 't3': 'int', + 't4': 'bigint', + 't5': 'tinyint unsigned', + 't6': 'smallint unsigned', + 't7': 'int unsigned', + 't8': 'bigint unsigned', + 't9': 'float', + 't10': 'double', + 't11': 'bool', + 't12': 'varchar(120)', + 't13': 'nchar(100)', + } + + def set_stb_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}, " + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}, " + create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})' + return create_stb_sql + + # create stable and child tables + def create_table(self, stbname, tbname, count): + tdSql.execute("create database db wal_retention_period 4") + tdSql.execute('use db') + self.child_count = count + self.stbname = stbname + self.tbname = tbname + + # create stable + create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict) + tdSql.execute(create_table_sql) + + batch_size = 1000 + # create child table + for i in range(count): + ti = i % 128 + tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"' + sql = f'create table {tbname}{i} using {stbname} tags({tags});' + tdSql.execute(sql) + if i % batch_size == 0: + tdLog.info(f" create child table {i} ...") + + tdLog.info(f" create {count} child tables ok.") + + # create with dicts + def create_sf_dicts(self, dicts, filename): + for fun_name, out_type in dicts.items(): + sql = f' create function {fun_name} as "{self.udf_path}/{filename}" outputtype {out_type} language "Python" ' + tdSql.execute(sql) + tdLog.info(sql) + + # create_udfpy_function + def create_scalar_udfpy(self): + # scalar funciton + self.scalar_funs = { + 'sf0': 'timestamp', + 'sf1': 'tinyint', + 'sf2': 'smallint', + 'sf3': 'int', + 'sf4': 'bigint', + 'sf5': 'tinyint unsigned', + 'sf6': 'smallint unsigned', + 'sf7': 'int unsigned', + 'sf8': 'bigint unsigned', + 'sf9': 'float', + 'sf10': 'double', + 'sf11': 'bool', + 'sf12': 'varchar(120)', + 'sf13': 'nchar(100)' + } + # agg function + self.agg_funs = { + 'af1': 'tinyint', + 'af2': 'smallint', + 'af3': 'int', + 'af4': 'bigint', + 'af5': 'tinyint unsigned', + 'af6': 'smallint unsigned', + 'af7': 'int unsigned', + 'af8': 'bigint unsigned', + 'af9': 'float', + 'af10': 'double', + 'af11': 'bool', + 'af12': 'varchar(120)', + 'af13': 'nchar(100)', + 'af14': 'timestamp' + } + + # multi_args + self.create_sf_dicts(self.scalar_funs, "sf_origin.py") + fun_name = "sf_multi_args" + self.create_udf_sf(fun_name, f'{fun_name}.py', "binary(1024)") + + # all type check null + for col_name, col_type in self.column_dict.items(): + self.create_udf_sf(f"sf_null_{col_name}", "sf_null.py", col_type) + + # concat + fun_name = "sf_concat_var" + self.create_udf_sf(fun_name, f'{fun_name}.py', "varchar(1024)") + fun_name = "sf_concat_nch" + self.create_udf_sf(fun_name, f'{fun_name}.py', "nchar(1024)") + + + # fun_name == fun_name.py + def create_udf_sf(self, fun_name, file_name, out_type): + sql = f'create function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} language "Python" ' + tdSql.execute(sql) + tdLog.info(sql) + + def create_udf_af(self, fun_name, file_name, out_type, bufsize): + sql = f'create aggregate function {fun_name} as "{self.udf_path}/{file_name}" outputtype {out_type} bufsize {bufsize} language "Python" ' + tdSql.execute(sql) + tdLog.info(sql) + + + # sql1 query result eual with sql2 + def verify_same_result(self, sql1, sql2): + # query + result1 = tdSql.getResult(sql1) + tdSql.query(sql2) + + for i, row in enumerate(result1): + for j , val in enumerate(row): + tdSql.checkData(i, j, result1[i][j]) + + # same value like select col1, udf_fun1(col1) from st + def verify_same_value(self, sql, col=0): + tdSql.query(sql) + nrows = tdSql.getRows() + for i in range(nrows): + val = tdSql.getData(i, col) + tdSql.checkData(i, col + 1, val) + + # verify multi values + def verify_same_multi_values(self, sql): + tdSql.query(sql) + nrows = tdSql.getRows() + for i in range(nrows): + udf_val = tdSql.getData(i, 0) + vals = udf_val.split(',') + for j,val in enumerate(vals, 1): + tdSql.checkData(i, j, val) + + # query multi-args + def query_multi_args(self): + cols = list(self.column_dict.keys()) + list(self.tag_dict.keys()) + cols.remove("col13") + cols.remove("t13") + cols.remove("ts") + ncols = len(cols) + print(cols) + for i in range(2, ncols): + sample = random.sample(cols, i) + print(sample) + cols_name = ','.join(sample) + sql = f'select sf_multi_args({cols_name}),{cols_name} from {self.stbname} limit 10' + self.verify_same_multi_values(sql) + tdLog.info(sql) + + + # query_udfpy + def query_scalar_udfpy(self): + # col + for col_name, col_type in self.column_dict.items(): + for fun_name, out_type in self.scalar_funs.items(): + if col_type == out_type : + sql = f'select {col_name}, {fun_name}({col_name}) from {self.stbname} limit 10' + tdLog.info(sql) + self.verify_same_value(sql) + sql = f'select * from (select {col_name} as a, {fun_name}({col_name}) as b from {self.stbname} limit 100) order by b,a desc' + tdLog.info(sql) + self.verify_same_value(sql) + + # multi-args + self.query_multi_args() + + # all type check null + for col_name, col_type in self.column_dict.items(): + fun_name = f"sf_null_{col_name}" + sql = f'select {fun_name}({col_name}) from {self.stbname}' + tdSql.query(sql) + if col_type != "timestamp": + tdSql.checkData(0, 0, "None") + else: + val = tdSql.getData(0, 0) + if val is not None: + tdLog.exit(f" check {sql} not expect None.") + + # concat + sql = f'select sf_concat_var(col12, t12), concat(col12, t12) from {self.stbname} limit 1000' + self.verify_same_value(sql) + sql = f'select sf_concat_nch(col13, t13), concat(col13, t13) from {self.stbname} limit 1000' + self.verify_same_value(sql) + + # create aggregate + def create_aggr_udfpy(self): + + bufsize = 200 * 1024 + # all type check null + for col_name, col_type in self.column_dict.items(): + self.create_udf_af(f"af_null_{col_name}", "af_null.py", col_type, bufsize) + + # min + file_name = "af_min.py" + fun_name = "af_min_float" + self.create_udf_af(fun_name, file_name, f"float", bufsize) + fun_name = "af_min_int" + self.create_udf_af(fun_name, file_name, f"int", bufsize) + + # sum + file_name = "af_sum.py" + fun_name = "af_sum_float" + self.create_udf_af(fun_name, file_name, f"float", bufsize) + fun_name = "af_sum_int" + self.create_udf_af(fun_name, file_name, f"int", bufsize) + fun_name = "af_sum_bigint" + self.create_udf_af(fun_name, file_name, f"bigint", bufsize) + + # count + file_name = "af_count.py" + fun_name = "af_count_float" + self.create_udf_af(fun_name, file_name, f"float", bufsize) + fun_name = "af_count_int" + self.create_udf_af(fun_name, file_name, f"int", bufsize) + fun_name = "af_count_bigint" + self.create_udf_af(fun_name, file_name, f"bigint", bufsize) + + + # query aggregate + def query_aggr_udfpy(self) : + # all type check null + for col_name, col_type in self.column_dict.items(): + fun_name = f"af_null_{col_name}" + sql = f'select {fun_name}({col_name}) from {self.stbname}' + tdSql.query(sql) + if col_type != "timestamp": + tdSql.checkData(0, 0, "None") + else: + val = tdSql.getData(0, 0) + if val is not None: + tdLog.exit(f" check {sql} not expect None.") + + # min + sql = f'select min(col3), af_min_int(col3) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select min(col7), af_min_int(col7) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select min(col9), af_min_float(col9) from {self.stbname}' + self.verify_same_value(sql) + + # sum + sql = f'select sum(col1), af_sum_int(col1) from d0' + self.verify_same_value(sql) + sql = f'select sum(col3), af_sum_bigint(col3) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select sum(col9), af_sum_float(col9) from {self.stbname}' + self.verify_same_value(sql) + + # count + sql = f'select count(col1), af_count_int(col1) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select count(col7), af_count_bigint(col7) from {self.stbname}' + self.verify_same_value(sql) + sql = f'select count(col8), af_count_float(col8) from {self.stbname}' + self.verify_same_value(sql) + + # nest + sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname})' + self.verify_same_value(sql) + # group by + sql = f'select a+1000,b+1000 from (select count(col8) as a, af_count_float(col8) as b from {self.stbname} group by tbname)' + self.verify_same_value(sql) + # two filed expr + sql = f'select sum(col1+col2),af_sum_float(col1+col2) from {self.stbname};' + self.verify_same_value(sql) + # interval + sql = f'select af_sum_float(col2+col3),sum(col3+col2) from {self.stbname} interval(1s)' + self.verify_same_value(sql) + + + # insert to child table d1 data + def insert_data(self, tbname, rows): + ts = 1670000000000 + values = "" + batch_size = 500 + child_name = "" + for i in range(self.child_count): + for j in range(rows): + tj = j % 128 + cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"' + value = f'({ts+j},{cols})' + if values == "": + values = value + else: + values += f",{value}" + if j % batch_size == 0 or j + 1 == rows: + sql = f'insert into {tbname}{i} values {values};' + tdSql.execute(sql) + tdLog.info(f" child table={i} rows={j} insert data.") + values = "" + + # partial columns upate + sql = f'insert into {tbname}0(ts, col1, col9, col11) values(now, 100, 200, 0)' + tdSql.execute(sql) + sql = f'insert into {tbname}0(ts, col2, col5, col8) values(now, 100, 200, 300)' + tdSql.execute(sql) + sql = f'insert into {tbname}0(ts, col3, col7, col13) values(now, null, null, null)' + tdSql.execute(sql) + sql = f'insert into {tbname}0(ts) values(now)' + tdSql.execute(sql) + tdLog.info(f" insert {rows} to child table {self.child_count} .") + + + # create stream + def create_stream(self): + sql = f"create stream ma into sta subtable(concat('sta_',tbname)) \ + as select _wstart,count(col1),af_count_bigint(col1) from {self.stbname} partition by tbname interval(1s);" + tdSql.execute(sql) + tdLog.info(sql) + + # query stream + def verify_stream(self): + sql = f"select * from sta limit 10" + self.verify_same_value(sql, 1) + + # create tmq + def create_tmq(self): + sql = f"create topic topa as select concat(col12,t12),sf_concat_var(col12,t12) from {self.stbname};" + tdSql.execute(sql) + tdLog.info(sql) + + def install_taospy(self): + tdLog.info("install taospyudf...") + packs = ["taospyudf"] + for pack in packs: + subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-i', 'https://pypi.org/simple', '-U', pack]) + tdLog.info("call ldconfig...") + os.system("ldconfig") + tdLog.info("install taospyudf successfully.") + + # run + def run(self): + self.install_taospy() + + # var + stable = "meters" + tbname = "d" + count = 10 + rows = 5000 + # do + self.create_table(stable, tbname, count) + + # create + self.create_scalar_udfpy() + self.create_aggr_udfpy() + + # create stream + self.create_stream() + + # create tmq + self.create_tmq() + + # insert data + self.insert_data(tbname, rows) + + # query + self.query_scalar_udfpy() + self.query_aggr_udfpy() + + # show performance + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart.py b/tests/system-test/7-tmq/tmqDnodeRestart.py index afd54c9d02e864d124025d1983e0ecb2ad1ef2d3..3ad7d7692d101b23c819253218acbd73b67c15ab 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart.py @@ -146,7 +146,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query(queryString) + tdSql.query(queryString, None, 50) totalRowsFromQury = tdSql.getRows() tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury)) @@ -236,7 +236,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - tdSql.query(queryString) + tdSql.query(queryString, None, 50) totalRowsFromQuery = tdSql.getRows() tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery)) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index a8986351b7325802ba6a121db56a1cbc5dfe1295..140720af815cf317bf7665c40750b2d0fb2ebd37 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -84,15 +84,15 @@ SWords shellCommands[] = { {"create table using tags(", 0, 0, NULL}, {"create database " " " - " ;", - 0, 0, NULL}, + " ;", 0, 0, NULL}, {"create dnode ", 0, 0, NULL}, {"create index on ()", 0, 0, NULL}, {"create mnode on dnode ;", 0, 0, NULL}, {"create qnode on dnode ;", 0, 0, NULL}, {"create stream into as select", 0, 0, NULL}, // 26 append sub sql {"create topic as select", 0, 0, NULL}, // 27 append sub sql - {"create function ", 0, 0, NULL}, + {"create function as outputtype language ", 0, 0, NULL}, + {"create aggregate function as outputtype bufsize language ", 0, 0, NULL}, {"create user pass sysinfo 0;", 0, 0, NULL}, {"create user pass sysinfo 1;", 0, 0, NULL}, {"describe ", 0, 0, NULL}, @@ -105,7 +105,7 @@ SWords shellCommands[] = { {"drop qnode on dnode ;", 0, 0, NULL}, {"drop user ;", 0, 0, NULL}, // 40 - {"drop function", 0, 0, NULL}, + {"drop function ;", 0, 0, NULL}, {"drop consumer group on ", 0, 0, NULL}, {"drop topic ;", 0, 0, NULL}, {"drop stream ;", 0, 0, NULL}, @@ -272,6 +272,8 @@ char* key_systable[] = { "ins_subscriptions", "ins_streams", "ins_stream_tasks", "ins_vnodes", "ins_user_privileges", "perf_connections", "perf_queries", "perf_consumers", "perf_trans", "perf_apps"}; +char* udf_language[] = {"\'Python\'", "\'C\'"}; + // // ------- global variant define --------- // @@ -291,25 +293,29 @@ bool waitAutoFill = false; #define WT_VAR_USERNAME 4 #define WT_VAR_TOPIC 5 #define WT_VAR_STREAM 6 -#define WT_VAR_ALLTABLE 7 -#define WT_VAR_FUNC 8 -#define WT_VAR_KEYWORD 9 -#define WT_VAR_TBACTION 10 -#define WT_VAR_DBOPTION 11 -#define WT_VAR_ALTER_DBOPTION 12 -#define WT_VAR_DATATYPE 13 -#define WT_VAR_KEYTAGS 14 -#define WT_VAR_ANYWORD 15 -#define WT_VAR_TBOPTION 16 -#define WT_VAR_USERACTION 17 -#define WT_VAR_KEYSELECT 18 -#define WT_VAR_SYSTABLE 19 - -#define WT_VAR_CNT 20 - -#define WT_FROM_DB_MAX 6 // max get content from db +#define WT_VAR_UDFNAME 7 + +#define WT_FROM_DB_MAX 7 // max get content from db #define WT_FROM_DB_CNT (WT_FROM_DB_MAX + 1) +#define WT_VAR_ALLTABLE 8 +#define WT_VAR_FUNC 9 +#define WT_VAR_KEYWORD 10 +#define WT_VAR_TBACTION 11 +#define WT_VAR_DBOPTION 12 +#define WT_VAR_ALTER_DBOPTION 13 +#define WT_VAR_DATATYPE 14 +#define WT_VAR_KEYTAGS 15 +#define WT_VAR_ANYWORD 16 +#define WT_VAR_TBOPTION 17 +#define WT_VAR_USERACTION 18 +#define WT_VAR_KEYSELECT 19 +#define WT_VAR_SYSTABLE 20 +#define WT_VAR_LANGUAGE 21 + +#define WT_VAR_CNT 22 + + #define WT_TEXT 0xFF char dbName[256] = ""; // save use database name; @@ -319,13 +325,13 @@ TdThreadMutex tiresMutex; // save thread handle obtain var name from db server TdThread* threads[WT_FROM_DB_CNT]; // obtain var name with sql from server -char varTypes[WT_VAR_CNT][64] = {"", "", "", "", "", - "", "", "", "", "", - "", "", "", "", "", - "", "", "", ""}; +char varTypes[WT_VAR_CNT][64] = { + "", "", "", "", "", "", "", + "", "", "", "", "", "", "", + "", "", "", "", "", "", "", ""}; char varSqls[WT_FROM_DB_CNT][64] = {"show databases;", "show stables;", "show tables;", "show dnodes;", - "show users;", "show topics;", "show streams;"}; + "show users;", "show topics;", "show streams;", "show functions;"}; // var words current cursor, if user press any one key except tab, cursorVar can be reset to -1 int cursorVar = -1; @@ -390,7 +396,8 @@ void showHelp() { create qnode on dnode ;\n\ create stream into as select ...\n\ create topic as select ...\n\ - create function ...\n\ + create function as outputtype language \'C\' | \'Python\' ;\n\ + create aggregate function as outputtype bufsize language \'C\' | \'Python\';\n\ create user pass ...\n\ ----- D ----- \n\ describe \n\ @@ -401,7 +408,7 @@ void showHelp() { drop mnode on dnode ;\n\ drop qnode on dnode ;\n\ drop user ;\n\ - drop function ;\n\ + drop function ;\n\ drop consumer group ... \n\ drop topic ;\n\ drop stream ;\n\ @@ -643,6 +650,7 @@ bool shellAutoInit() { GenerateVarType(WT_VAR_USERACTION, user_actions, sizeof(user_actions) / sizeof(char*)); GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*)); GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*)); + GenerateVarType(WT_VAR_LANGUAGE, udf_language, sizeof(udf_language) / sizeof(char*)); return true; }