diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index a6d09b5d8d9c4771efbc00aca9bfbc63c60442f9..e4f67ba0def6cc16b9a1946e52e1ba11459e9f8d 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -282,6 +282,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) { SArray* tables = getTableList(pSql); if (tables == NULL) { + pSub->lastSyncTime = 0; //force to get table list next time return 0; } size_t numOfTables = taosArrayGetSize(tables); @@ -488,7 +489,15 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; - if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { + if (pSub->pTimer == NULL) { + int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime; + if (duration < (int64_t)(pSub->interval)) { + tscDebug("subscription consume too frequently, blocking..."); + taosMsleep(pSub->interval - (int32_t)duration); + } + } + + if (pSub->pSql->cmd.command == TSDB_SQL_RETRIEVE_EMPTY_RESULT) { //may reach here when retireve stable vgroup failed SSqlObj* pSql = recreateSqlObj(pSub); if (pSql == NULL) { return NULL; @@ -500,6 +509,11 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { } pSub->pSql = pSql; pSql->pSubscription = pSub; + + // no table list now, force to update it + tscDebug("begin table synchronization"); + if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; + tscDebug("table synchronization completed"); } tscSaveSubscriptionProgress(pSub); @@ -524,14 +538,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); } - if (pSub->pTimer == NULL) { - int64_t duration = taosGetTimestampMs() - pSub->lastConsumeTime; - if (duration < (int64_t)(pSub->interval)) { - tscDebug("subscription consume too frequently, blocking..."); - taosMsleep(pSub->interval - (int32_t)duration); - } - } - size_t size = taosArrayGetSize(pSub->progress) * sizeof(STableIdInfo); size += sizeof(SQueryTableMsg) + 4096; int code = tscAllocPayload(&pSql->cmd, (int)size);