# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
# 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
...
@@ -1735,7 +1735,7 @@ class TaskCreateStream(StateTransitionTask):
...
@@ -1735,7 +1735,7 @@ class TaskCreateStream(StateTransitionTask):
ifsub_tables:# if not empty
ifsub_tables:# if not empty
sub_tbname=sub_tables[0]
sub_tbname=sub_tables[0]
# create stream with query above sub_table
# create stream with query above sub_table
stream_sql='create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(sub_stream_name,dbname,sub_stream_tb_name,aggExpr,dbname,sub_tbname)
stream_sql='create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} where ts <now and ts >now -1h PARTITION BY tbname INTERVAL(5s) SLIDING(3s) FILL (prev) '.format(sub_stream_name,dbname,sub_stream_tb_name,aggExpr,dbname,sub_tbname)
try:
try:
self.execWtSql(wt,stream_sql)
self.execWtSql(wt,stream_sql)
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
Logging.debug("[OPS] stream is creating at {}".format(time.time()))
...
@@ -1749,7 +1749,7 @@ class TaskCreateStream(StateTransitionTask):
...
@@ -1749,7 +1749,7 @@ class TaskCreateStream(StateTransitionTask):
pass
pass
else:
else:
stream_sql='create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} PARTITION BY tbname INTERVAL(5s) SLIDING(3s) '.format(super_stream_name,dbname,super_stream_tb_name,aggExpr,dbname,stbname)
stream_sql='create stream {} into {}.{} as select {}, avg(speed) FROM {}.{} where ts <now and ts >now -1h PARTITION BY tbname INTERVAL(5s) SLIDING(3s) FILL (prev) '.format(super_stream_name,dbname,super_stream_tb_name,aggExpr,dbname,stbname)