diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2d85c026afb17bd7b6bbdd47c65bf5f7301b49c4..69d8fc447efe9af55baed462260275fc35aeab95 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -362,6 +362,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { } for (int retry = 0; retry < 3; retry++) { + tscRemoveFromSqlList(pSql); + if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { tscTrace("begin meter synchronization"); char* sqlstr = pSql->sqlstr; @@ -380,6 +382,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->thandle = NULL; pSql->cmd.command = TSDB_SQL_SELECT; pSql->cmd.type = type; + + tscGetMeterMetaInfo(&pSql->cmd, 0)->vnodeIndex = 0; } tscDoQuery(pSql); diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index b168648702b21456a3b94affdff0418fc6549785..88ff223d7fdb7d58daaa5be02d0517ef63d0a50a 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -49,7 +49,7 @@ void check_row_count(int line, TAOS_RES* res, int expected) { void run_test(TAOS* taos) { - taos_query(taos, "drop database test;"); + taos_query(taos, "drop database if exists test;"); usleep(100000); taos_query(taos, "create database test tables 5;"); @@ -86,21 +86,26 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.001', 0, 'UK');"); - taos_query(taos, "insert into t1 using meters tags('london', 0) values('2020-01-01 00:03:00.001', 0, 'UK');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:03:00.000', 0, 'china');"); + taos_query(taos, "insert into t8 values('2020-01-01 00:01:03.000', 0, 'china');"); res = taos_consume(tsub); check_row_count(__LINE__, res, 2); - taos_query(taos, "insert into t1 using meters tags('shanghai', 0) values('2020-01-01 00:03:00.002', 0, 'china');"); + taos_query(taos, "insert into t2 values('2020-01-01 00:01:02.001', 0, 'UK');"); + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.001', 0, 'UK');"); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 2); + + taos_query(taos, "insert into t1 values('2020-01-01 00:03:00.002', 0, 'china');"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); // keep progress information and restart subscription taos_unsubscribe(tsub, 1); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.000', 0, 'china');"); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); - check_row_count(__LINE__, res, 22); + check_row_count(__LINE__, res, 24); // keep progress information and continue previous subscription taos_unsubscribe(tsub, 1); @@ -112,27 +117,22 @@ void run_test(TAOS* taos) { taos_unsubscribe(tsub, 0); tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); - check_row_count(__LINE__, res, 22); + check_row_count(__LINE__, res, 24); // single meter subscription taos_unsubscribe(tsub, 0); tsub = taos_subscribe(taos, 0, "test", "select * from t0;", NULL, NULL, 0); res = taos_consume(tsub); - check_row_count(__LINE__, res, 4); + check_row_count(__LINE__, res, 5); res = taos_consume(tsub); check_row_count(__LINE__, res, 0); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.001', 0, 'china');"); + taos_query(taos, "insert into t0 values('2020-01-01 00:04:00.001', 0, 'china');"); res = taos_consume(tsub); check_row_count(__LINE__, res, 1); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.002', 0, 'china');"); - taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:04:00.000', 0, 'china');"); - res = taos_consume(tsub); - check_row_count(__LINE__, res, 2); - taos_unsubscribe(tsub, 0); }