diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0972e5fe2dbbb45195dfc6709258fbe6b6b42785..af914f92663ffb841b7fe268a7d32945bd4867eb 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3558,7 +3558,6 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { p += sizeof(TSKEY); tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); } - tscSaveSubscriptionProgress(pSql->pSubscription); } pRes->row = 0; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 17a5031fa673f831cec9824d78742166ecdfb369..2d85c026afb17bd7b6bbdd47c65bf5f7301b49c4 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -49,7 +49,11 @@ typedef struct SSub { static int tscCompareSubscriptionProgress(const void* a, const void* b) { - return ((const SSubscriptionProgress*)a)->uid - ((const SSubscriptionProgress*)b)->uid; + const SSubscriptionProgress* x = (const SSubscriptionProgress*)a; + const SSubscriptionProgress* y = (const SSubscriptionProgress*)b; + if (x->uid > y->uid) return 1; + if (x->uid < y->uid) return -1; + return 0; } TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { @@ -175,7 +179,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { if (!UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); numOfMeters += pVnodeSidList->numOfSids; } } @@ -195,7 +199,7 @@ int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; numOfMeters = 0; for (int32_t i = 0; i < pMetricMeta->numOfVnodes; i++) { - SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); + SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, i); for (int32_t j = 0; j < pVnodeSidList->numOfSids; j++) { SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, j); int64_t uid = pMeterInfo->uid; @@ -344,6 +348,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; + tscSaveSubscriptionProgress(pSub); + SSqlObj* pSql = pSub->pSql; SSqlRes *pRes = &pSql->res; @@ -355,27 +361,36 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { } } - if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { - tscTrace("begin meter synchronization"); - char* sqlstr = pSql->sqlstr; - pSql->sqlstr = NULL; - taos_free_result_imp(pSql, 0); - pSql->sqlstr = sqlstr; - taosClearDataCache(tscCacheHandle); - if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; - tscTrace("meter synchronization completed"); - } else { - uint16_t type = pSql->cmd.type; - taos_free_result_imp(pSql, 1); - pRes->numOfRows = 1; - pRes->numOfTotal = 0; - pRes->qhandle = 0; - pSql->thandle = NULL; - pSql->cmd.command = TSDB_SQL_SELECT; - pSql->cmd.type = type; + for (int retry = 0; retry < 3; retry++) { + if (taosGetTimestampMs() - pSub->lastSyncTime > 10 * 60 * 1000) { + tscTrace("begin meter synchronization"); + char* sqlstr = pSql->sqlstr; + pSql->sqlstr = NULL; + taos_free_result_imp(pSql, 0); + pSql->sqlstr = sqlstr; + taosClearDataCache(tscCacheHandle); + if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; + tscTrace("meter synchronization completed"); + } else { + uint16_t type = pSql->cmd.type; + taos_free_result_imp(pSql, 1); + pRes->numOfRows = 1; + pRes->numOfTotal = 0; + pRes->qhandle = 0; + pSql->thandle = NULL; + pSql->cmd.command = TSDB_SQL_SELECT; + pSql->cmd.type = type; + } + + tscDoQuery(pSql); + if (pRes->code != TSDB_CODE_NOT_ACTIVE_TABLE) { + break; + } + // meter was removed, make sync time zero, so that next retry will + // do synchronization first + pSub->lastSyncTime = 0; } - tscDoQuery(pSql); if (pRes->code != TSDB_CODE_SUCCESS) { tscError("failed to query data, error code=%d", pRes->code); tscRemoveFromSqlList(pSql); @@ -394,7 +409,9 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { taosTmrStop(pSub->pTimer); } - if (!keepProgress) { + if (keepProgress) { + tscSaveSubscriptionProgress(pSub); + } else { char path[256]; sprintf(path, "%s/subscribe/%s", dataDir, pSub->topic); remove(path); diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index 04d000ba370c1ccf557fec6e5648ac29ff42e584..b168648702b21456a3b94affdff0418fc6549785 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -7,33 +7,30 @@ #include // include TDengine header file #include -void print_result(TAOS_RES* res) { +void print_result(TAOS_RES* res, int blockFetch) { TAOS_ROW row = NULL; int num_fields = taos_num_fields(res); TAOS_FIELD* fields = taos_fetch_fields(res); -#if 0 - - int nRows = taos_fetch_block(res, &row); - for (int i = 0; i < nRows; i++) { - char temp[256]; - taos_print_row(temp, row + i, fields, num_fields); - puts(temp); - } - -#else - - while ((row = taos_fetch_row(res))) { - char temp[256]; - taos_print_row(temp, row, fields, num_fields); - puts(temp); + if (blockFetch) { + int nRows = taos_fetch_block(res, &row); + for (int i = 0; i < nRows; i++) { + char temp[256]; + taos_print_row(temp, row + i, fields, num_fields); + puts(temp); + } + } else { + while ((row = taos_fetch_row(res))) { + char temp[256]; + taos_print_row(temp, row, fields, num_fields); + puts(temp); + } } - -#endif } + void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { - print_result(res); + print_result(res, *(int*)param); } @@ -50,11 +47,12 @@ void check_row_count(int line, TAOS_RES* res, int expected) { } } + void run_test(TAOS* taos) { taos_query(taos, "drop database test;"); usleep(100000); - taos_query(taos, "create database test;"); + taos_query(taos, "create database test tables 5;"); usleep(100000); taos_query(taos, "use test;"); usleep(100000); @@ -71,12 +69,19 @@ void run_test(TAOS* taos) { taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:00.000', 0, 'UK');"); taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:01.000', 0, 'UK');"); taos_query(taos, "insert into t2 using meters tags('london', 0) values('2020-01-01 00:01:02.000', 0, 'UK');"); + taos_query(taos, "insert into t3 using meters tags('tianjin', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); + taos_query(taos, "insert into t4 using meters tags('wuhan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); + taos_query(taos, "insert into t5 using meters tags('jinan', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); + taos_query(taos, "insert into t6 using meters tags('haikou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); + taos_query(taos, "insert into t7 using meters tags('nanjing', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); + taos_query(taos, "insert into t8 using meters tags('lanzhou', 0) values('2020-01-01 00:01:02.000', 0, 'china');"); + taos_query(taos, "insert into t9 using meters tags('tokyo', 0) values('2020-01-01 00:01:02.000', 0, 'japan');"); // super tables subscription TAOS_SUB* tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); TAOS_RES* res = taos_consume(tsub); - check_row_count(__LINE__, res, 11); + check_row_count(__LINE__, res, 18); res = taos_consume(tsub); check_row_count(__LINE__, res, 0); @@ -90,18 +95,24 @@ void run_test(TAOS* taos) { res = taos_consume(tsub); check_row_count(__LINE__, res, 1); - // keep progress information and continue previous subscription + // keep progress information and restart subscription taos_unsubscribe(tsub, 1); taos_query(taos, "insert into t0 using meters tags('beijing', 0) values('2020-01-01 00:03:00.000', 0, 'china');"); tsub = taos_subscribe(taos, 1, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); - check_row_count(__LINE__, res, 15); + check_row_count(__LINE__, res, 22); + + // keep progress information and continue previous subscription + taos_unsubscribe(tsub, 1); + tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); + res = taos_consume(tsub); + check_row_count(__LINE__, res, 0); // don't keep progress information and continue previous subscription taos_unsubscribe(tsub, 0); tsub = taos_subscribe(taos, 0, "test", "select * from meters;", NULL, NULL, 0); res = taos_consume(tsub); - check_row_count(__LINE__, res, 15); + check_row_count(__LINE__, res, 22); // single meter subscription @@ -132,8 +143,7 @@ int main(int argc, char *argv[]) { const char* passwd = "taosdata"; const char* sql = "select * from meters;"; const char* topic = "test-multiple"; - int async = 1, restart = 0, keep = 1, test = 0; - TAOS_SUB* tsub = NULL; + int async = 1, restart = 0, keep = 1, test = 0, blockFetch = 0; for (int i = 1; i < argc; i++) { if (strncmp(argv[i], "-h=", 3) == 0) { @@ -174,6 +184,10 @@ int main(int argc, char *argv[]) { test = 1; continue; } + if (strcmp(argv[i], "-block-fetch") == 0) { + blockFetch = 1; + continue; + } } // init TAOS @@ -191,9 +205,12 @@ int main(int argc, char *argv[]) { exit(0); } + TAOS_SUB* tsub = NULL; if (async) { - tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, NULL, 1000); + // create an asynchronized subscription, the callback function will be called every 1s + tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000); } else { + // create an synchronized subscription, need to call 'taos_consume' manually tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0); } @@ -206,8 +223,13 @@ int main(int argc, char *argv[]) { getchar(); } else while(1) { TAOS_RES* res = taos_consume(tsub); - print_result(res); - getchar(); + if (res == NULL) { + printf("failed to consume data."); + break; + } else { + print_result(res, blockFetch); + getchar(); + } } taos_unsubscribe(tsub, keep); @@ -215,4 +237,3 @@ int main(int argc, char *argv[]) { return 0; } -