diff --git a/tests/system-test/0-others/udfpy/af_count.py b/tests/system-test/0-others/udfpy/af_count.py index 226e02235ffebae116f245af79a57cc977853641..ce29abca13866f9f311d1e868a5df68c71c5f735 100644 --- a/tests/system-test/0-others/udfpy/af_count.py +++ b/tests/system-test/0-others/udfpy/af_count.py @@ -1,3 +1,5 @@ +import pickle + def init(): pass diff --git a/tests/system-test/0-others/udfpy/af_sum.py b/tests/system-test/0-others/udfpy/af_sum.py index ac7aa1692424eba8aac4f85c1190de17b01ba8ef..8b88aba56ca08f64249f85a4df596d9fd204b05a 100644 --- a/tests/system-test/0-others/udfpy/af_sum.py +++ b/tests/system-test/0-others/udfpy/af_sum.py @@ -7,30 +7,20 @@ def destroy(): pass def start(): - return pickle.dumps([]) + return pickle.dumps(None) def finish(buf): - sums = pickle.loads(buf) - all = None - for sum in sums: - if all is None: - all = sum - else: - all += sum - return all + sum = pickle.loads(buf) + return sum def reduce(datablock, buf): (rows, cols) = datablock.shape() - sums = pickle.loads(buf) - sum = None + sum = pickle.loads(buf) for i in range(rows): val = datablock.data(i, 0) if val is not None: if sum is None: sum = val else: - sum += val - - if sum is not None: - sums.append(sum) - return pickle.dumps(sums) + sum += val + return pickle.dumps(sum) diff --git a/tests/system-test/0-others/udfpy_main.py b/tests/system-test/0-others/udfpy_main.py index eaadfbdbd68cadef3c35a883f702cffa0c612228..e76795ac28004efa19d4c3997c748cd3fde896f4 100644 --- a/tests/system-test/0-others/udfpy_main.py +++ b/tests/system-test/0-others/udfpy_main.py @@ -209,12 +209,12 @@ class TDTestCase: tdSql.checkData(i, j, result1[i][j]) # same value like select col1, udf_fun1(col1) from st - def verify_same_value(self, sql): + def verify_same_value(self, sql, col=0): tdSql.query(sql) nrows = tdSql.getRows() for i in range(nrows): - val = tdSql.getData(i, 0) - tdSql.checkData(i, 1, val) + val = tdSql.getData(i, col) + tdSql.checkData(i, col + 1, val) # verify multi values def verify_same_multi_values(self, sql): @@ -395,6 +395,24 @@ class TDTestCase: tdSql.execute(sql) tdLog.info(f" insert {rows} to child table {self.child_count} .") + + # create stream + def create_stream(self): + sql = f"create stream ma into sta subtable(concat('sta_',tbname)) \ + as select _wstart,count(col1),af_count_bigint(col1) from {self.stbname} partition by tbname interval(1s);" + tdSql.execute(sql) + tdLog.info(sql) + + # query stream + def verify_stream(self): + sql = f"select * from sta limit 10" + self.verify_same_value(sql, 1) + + # create tmq + def create_tmq(self): + sql = f"create topic topa as select concat(col12,t12),sf_concat_var(col12,t12) from {self.stbname};" + tdSql.execute(sql) + tdLog.info(sql) # run def run(self): @@ -402,14 +420,23 @@ class TDTestCase: stable = "meters" tbname = "d" count = 10 - rows = 50000 + rows = 5000 # do self.create_table(stable, tbname, count) - self.insert_data(tbname, rows) # create self.create_scalar_udfpy() self.create_aggr_udfpy() + + # create stream + self.create_stream() + + # create tmq + self.create_tmq() + + # insert data + self.insert_data(tbname, rows) + # query self.query_scalar_udfpy() self.query_aggr_udfpy()