From dbd22da833284b240dda7fe32935c1fb9a5009d0 Mon Sep 17 00:00:00 2001 From: Bomin Zhang Date: Fri, 17 Jul 2020 18:28:33 +0800 Subject: [PATCH] td-909: fix stream start time issue fix crash in taos_load_table_info --- src/client/src/tscSql.c | 2 + src/client/src/tscStream.c | 6 ++- tests/examples/c/apitest.c | 82 +++++++++++++++++++++++++++++++++++++- 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 49e4e027a0..b1c85dd662 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -880,6 +880,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { } SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); + pSql->pTscObj = taos; + pSql->signature = pSql; SSqlRes *pRes = &pSql->res; pRes->numOfTotal = 0; // the number of getting table meta from server diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 7c188ec969..b07627c87b 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -122,7 +122,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { pQueryInfo->window.ekey = pStream->etime; } } else { - pQueryInfo->window.skey = pStream->stime - pStream->interval; + pQueryInfo->window.skey = pStream->stime; int64_t etime = taosGetTimestamp(pStream->precision); // delay to wait all data in last time window if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { @@ -232,6 +232,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf (*pStream->fp)(pStream->param, res, row); } + if (!pStream->isProject) { + pStream->stime += pStream->slidingTime; + } // actually only one row is returned. this following is not necessary taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); } else { // numOfRows == 0, all data has been retrieved @@ -432,6 +435,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; + stime -= pStream->interval; tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); } else { int64_t newStime = (stime / pStream->interval) * pStream->interval; diff --git a/tests/examples/c/apitest.c b/tests/examples/c/apitest.c index 0be1f01bbe..bc628fb58f 100644 --- a/tests/examples/c/apitest.c +++ b/tests/examples/c/apitest.c @@ -98,7 +98,12 @@ static void check_row_count(int line, TAOS_RES* res, int expected) { static void verify_query(TAOS* taos) { prepare_data(taos); - int code = taos_validate_sql(taos, "select * from nonexisttable"); + int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9"); + if (code != 0) { + printf("\033[31mfailed to load table info: 0x%08x\033[0m\n", code); + } + + code = taos_validate_sql(taos, "select * from nonexisttable"); if (code == 0) { printf("\033[31mimpossible, the table does not exists\033[0m\n"); } @@ -127,9 +132,17 @@ static void verify_query(TAOS* taos) { code = taos_errno(res); printf("code=%d, error msg=%s\n", code, taos_errstr(res)); taos_free_result(res); + + res = taos_query(taos, "select * from meters"); + taos_stop_query(res); } +void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { + int rows = print_result(res, *(int*)param); + printf("%d rows consumed in subscribe_callback\n", rows); +} + static void verify_subscribe(TAOS* taos) { prepare_data(taos); @@ -188,6 +201,13 @@ static void verify_subscribe(TAOS* taos) { check_row_count(__LINE__, res, 1); taos_unsubscribe(tsub, 0); + + int blockFetch = 0; + tsub = taos_subscribe(taos, 1, "test", "select * from meters;", subscribe_callback, &blockFetch, 1000); + usleep(2000000); + taos_query(taos, "insert into t0 values('2020-01-01 00:05:00.001', 0);"); + usleep(2000000); + taos_unsubscribe(tsub, 0); } @@ -358,7 +378,60 @@ void verify_prepare(TAOS* taos) { taos_stmt_close(stmt); } +void retrieve_callback(void *param, TAOS_RES *tres, int numOfRows) +{ + if (numOfRows > 0) { + printf("%d rows async retrieved\n", numOfRows); + taos_fetch_rows_a(tres, retrieve_callback, param); + } else { + if (numOfRows < 0) { + printf("\033[31masync retrieve failed, code: %d\033[0m\n", numOfRows); + } else { + printf("async retrieve completed\n"); + } + taos_free_result(tres); + } +} + +void select_callback(void *param, TAOS_RES *tres, int code) +{ + if (code == 0 && tres) { + taos_fetch_rows_a(tres, retrieve_callback, param); + } else { + printf("\033[31masync select failed, code: %d\033[0m\n", code); + } +} + +void verify_async(TAOS* taos) { + prepare_data(taos); + taos_query_a(taos, "select * from meters", select_callback, NULL); + usleep(1000000); +} + +void stream_callback(void *param, TAOS_RES *res, TAOS_ROW row) { + int num_fields = taos_num_fields(res); + TAOS_FIELD* fields = taos_fetch_fields(res); + + printf("got one row from stream_callback\n"); + char temp[256]; + taos_print_row(temp, row, fields, num_fields); + puts(temp); +} + void verify_stream(TAOS* taos) { + prepare_data(taos); + TAOS_STREAM* strm = taos_open_stream( + taos, + "select count(*) from meters interval(1m)", + stream_callback, + 0, + NULL, + NULL); + printf("waiting for stream data\n"); + usleep(100000); + taos_query(taos, "insert into t0 values(now, 0)(now+5s,1)(now+10s, 2);"); + usleep(200000000); + taos_close_stream(strm); } int main(int argc, char *argv[]) { @@ -382,12 +455,19 @@ int main(int argc, char *argv[]) { printf("************ verify query *************\n"); verify_query(taos); + + printf("********* verify async query **********\n"); + verify_async(taos); + printf("*********** verify subscribe ************\n"); verify_subscribe(taos); + printf("************ verify prepare *************\n"); verify_prepare(taos); + printf("************ verify stream *************\n"); verify_stream(taos); + printf("done\n"); taos_close(taos); taos_cleanup(); -- GitLab