From 76ade609947a2b6e2d6ed7e95c6efa03c65a52a5 Mon Sep 17 00:00:00 2001 From: localvar Date: Wed, 8 Jan 2020 10:22:40 +0800 Subject: [PATCH] tbase-916 --- src/client/src/tscSub.c | 2 +- tests/examples/c/subscribe.c | 104 ++++++++++++++++++++++++++++++++++- 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 8293378abf..225d3f0e6f 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -33,10 +33,10 @@ typedef struct SSubscriptionProgress { } SSubscriptionProgress; typedef struct SSub { + void * signature; char topic[32]; int64_t lastSyncTime; int64_t lastConsumeTime; - void * signature; TAOS * taos; void * pTimer; SSqlObj * pSql; diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index a36de8e010..078929a9b8 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -5,7 +5,7 @@ #include #include #include // include TDengine header file - +#include void print_result(TAOS_RES* res) { TAOS_ROW row; @@ -24,13 +24,102 @@ void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { } +void check_row_count(int line, TAOS_RES* res, int expected) { + int actual = 0; + TAOS_ROW row; + while ((row = taos_fetch_row(res))) { + actual++; + } + if (actual != expected) { + printf("line %d: row count mismatch, expected: %d, actual: %d\n", line, expected, actual); + } else { + printf("line %d: %d rows consumed as expected\n", line, actual); + } +} + +void run_test(TAOS* taos) { + taos_query(taos, "drop database test;"); + + usleep(100000); + taos_query(taos, "create database test;"); + usleep(100000); + taos_query(taos, "use test;"); + usleep(100000); + taos_query(taos, "create table meters(ts timestamp, a int, b binary(20)) tags(loc binary(20), area int);"); + + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); + taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:00:00.000', 0, 'china');"); + taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:01:00.000', 0, 'china');"); + taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:02:00.000', 0, 'china');"); + taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); + taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:00:00.000', 0, 'UK');"); + taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); + taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); + taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"); + + // super tables subscription + + TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); + TAOS_RES* res = taos_consume(tsub); + check_row_count(__LINE__, res, 11); + + res = taos_consume(tsub); + check_row_count(__LINE__, res, 0); + + taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.001', 0, 'UK');"); + taos_query(taos, "insert into t1 using meters tags('london', 0) values('2020-01-01 00:03:00.001', 0, 'UK');"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 2); + + taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.002', 0, 'china');"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 1); + + // keep progress information and continue previous subscription + taos_unsubscribe(tsub, 1); + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); + tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 15); + + // don't keep progress information and continue previous subscription + taos_unsubscribe(tsub, 0); + tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 15); + + // single meter subscription + + taos_unsubscribe(tsub, 0); + tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 4); + + res = taos_consume(tsub); + check_row_count(__LINE__, res, 0); + + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.001', 0, 'china');"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 1); + + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.002', 0, 'china');"); + taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:04:00.000', 0, 'china');"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 2); + + taos_unsubscribe(tsub, 0); +} + + int main(int argc, char *argv[]) { const char* host = "127.0.0.1"; const char* user = "root"; const char* passwd = "taosdata"; const char* sql = "select * from meters;"; const char* topic = "test-multiple"; - int async = 1, restart = 0, keep = 1; + int async = 1, restart = 0, keep = 1, test = 0; TAOS_SUB* tsub = NULL; for (int i = 1; i < argc; i++) { @@ -68,6 +157,10 @@ int main(int argc, char *argv[]) { topic = "test-custom"; continue; } + if (strcmp(argv[i], "-test") == 0) { + test = 1; + continue; + } } // init TAOS @@ -79,6 +172,12 @@ int main(int argc, char *argv[]) { exit(1); } + if (test) { + run_test(taos); + taos_close(taos); + exit(0); + } + if (async) { tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000); } else { @@ -99,6 +198,7 @@ int main(int argc, char *argv[]) { } taos_unsubscribe(tsub, keep); + taos_close(taos); return 0; } -- GitLab