diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 03b66a199681186d80006639ec523f349ed99dbf..0972e5fe2dbbb45195dfc6709258fbe6b6b42785 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -46,8 +46,9 @@ int (*tscProcessMsgRsp[TSDB_SQL_MAX])(SSqlObj *pSql); void (*tscUpdateVnodeMsg[TSDB_SQL_MAX])(SSqlObj *pSql, char *buf); void tscProcessActivityTimer(void *handle, void *tmrId); int tscKeepConn[TSDB_SQL_MAX] = {0}; -TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid); -void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts); +TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid); +void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); +void tscSaveSubscriptionProgress(void* sub); static int32_t minMsgSize() { return tsRpcHeadSize + sizeof(STaosDigest); } @@ -1533,7 +1534,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn SMeterSidExtInfo *pMeterInfo = (SMeterSidExtInfo *)pMsg; pMeterInfo->sid = htonl(pMeterMeta->sid); pMeterInfo->uid = htobe64(pMeterMeta->uid); - pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql, pMeterMeta->uid)); + pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pMeterMeta->uid)); pMsg += sizeof(SMeterSidExtInfo); } else { SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); @@ -1544,7 +1545,7 @@ static char* doSerializeTableInfo(SSqlObj* pSql, int32_t numOfMeters, int32_t vn pMeterInfo->sid = htonl(pQueryMeterInfo->sid); pMeterInfo->uid = htobe64(pQueryMeterInfo->uid); - pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql, pQueryMeterInfo->uid)); + pMeterInfo->key = htobe64(tscGetSubscriptionProgress(pSql->pSubscription, pQueryMeterInfo->uid)); pMsg += sizeof(SMeterSidExtInfo); @@ -3555,8 +3556,9 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { p += sizeof(int64_t); TSKEY key = htobe64(*(TSKEY*)p); p += sizeof(TSKEY); - tscUpdateSubscriptionProgress(pSql, uid, key); + tscUpdateSubscriptionProgress(pSql->pSubscription, uid, key); } + tscSaveSubscriptionProgress(pSql->pSubscription); } pRes->row = 0; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 079c7441ce216e553dfd5108870e72b7f35a7d93..910f9412f295c60f24979abb3f0709c59e035df1 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -32,6 +32,7 @@ typedef struct SSubscriptionProgress { } SSubscriptionProgress; typedef struct SSub { + char topic[32]; int64_t lastSyncTime; void * signature; TAOS * taos; @@ -49,11 +50,11 @@ static int tscCompareSubscriptionProgress(const void* a, const void* b) { return ((const SSubscriptionProgress*)a)->uid - ((const SSubscriptionProgress*)b)->uid; } -TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid) { - if( pSql == NULL || pSql->pSubscription == NULL) +TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid) { + if (sub == NULL) return 0; - SSub* pSub = (SSub*)pSql->pSubscription; + SSub* pSub = (SSub*)sub; for (int s = 0, e = pSub->numOfMeters; s < e;) { int m = (s + e) / 2; SSubscriptionProgress* p = pSub->progress + m; @@ -68,11 +69,11 @@ TSKEY tscGetSubscriptionProgress(SSqlObj* pSql, int64_t uid) { return 0; } -void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) { - if( pSql == NULL || pSql->pSubscription == NULL) +void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { + if( sub == NULL) return; - SSub* pSub = (SSub*)pSql->pSubscription; + SSub* pSub = (SSub*)sub; for (int s = 0, e = pSub->numOfMeters; s < e;) { int m = (s + e) / 2; SSubscriptionProgress* p = pSub->progress + m; @@ -88,7 +89,7 @@ void tscUpdateSubscriptionProgress(SSqlObj* pSql, int64_t uid, TSKEY ts) { } -static SSub* tscCreateSubscription(STscObj* pObj, const char* sql) { +static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* sql) { SSub* pSub = calloc(1, sizeof(SSub)); if (pSub == NULL) { globalCode = TSDB_CODE_CLI_OUT_OF_MEMORY; @@ -125,6 +126,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* sql) { pSql->pSubscription = pSub; pSub->pSql = pSql; pSub->signature = pSub; + strncpy(pSub->topic, topic, sizeof(pSub->topic)); return pSub; failed: @@ -139,7 +141,7 @@ failed: } -static void tscProcessSubscribeTimer(void *handle, void *tmrId) { +static void tscProcessSubscriptionTimer(void *handle, void *tmrId) { SSub *pSub = (SSub *)handle; if (pSub == NULL || pSub->pTimer != tmrId) return; @@ -149,7 +151,7 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) { // TODO: memory leak } - taosTmrReset(tscProcessSubscribeTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); + taosTmrReset(tscProcessSubscriptionTimer, pSub->interval, pSub, tscTmr, &pSub->pTimer); } @@ -172,7 +174,7 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { progress = calloc(1, sizeof(SSubscriptionProgress)); int64_t uid = pMeterMetaInfo->pMeterMeta->uid; progress[0].uid = uid; - progress[0].key = tscGetSubscriptionProgress(pSub->pSql, uid); + progress[0].key = tscGetSubscriptionProgress(pSub, uid); } else { SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); @@ -182,7 +184,7 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); int64_t uid = pMeterInfo->uid; progress[i].uid = uid; - progress[i].key = tscGetSubscriptionProgress(pSub->pSql, uid); + progress[i].key = tscGetSubscriptionProgress(pSub, uid); } qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); } @@ -204,7 +206,93 @@ bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { } -TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { +static void tscLoadSubscriptionProgress(SSub* pSub) { + char buf[TSDB_MAX_SQL_LEN]; + sprintf(buf, "%s/subscribe/%s", dataDir, pSub->topic); + + FILE* fp = fopen(buf, "r"); + if (fp == NULL) { + tscTrace("subscription progress file does not exist: %s", pSub->topic); + return true; + } + + if (fgets(buf, sizeof(buf), fp) == NULL) { + tscTrace("invalid subscription progress file: %s", pSub->topic); + fclose(fp); + return false; + } + + for (int i = 0; i < sizeof(buf); i++) { + if (buf[i] == 0) + break; + if (buf[i] == '\r' || buf[i] == '\n') { + buf[i] = 0; + break; + } + } + if (strcmp(buf, pSub->pSql->sqlstr) != 0) { + tscTrace("subscription sql statement mismatch: %s", pSub->topic); + fclose(fp); + return false; + } + + if (fgets(buf, sizeof(buf), fp) == NULL || atoi(buf) < 0) { + tscTrace("invalid subscription progress file: %s", pSub->topic); + fclose(fp); + return false; + } + + int numOfMeters = atoi(buf); + SSubscriptionProgress* progress = calloc(numOfMeters, sizeof(SSubscriptionProgress)); + for (int i = 0; i < numOfMeters; i++) { + if (fgets(buf, sizeof(buf), fp) == NULL) { + fclose(fp); + free(progress); + return false; + } + int64_t uid, key; + sscanf(buf, "uid=%" SCNd64 ",progress=%" SCNd64, &uid, &key); + progress[i].uid = uid; + progress[i].key = key; + } + + fclose(fp); + + qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); + pSub->numOfMeters = numOfMeters; + pSub->progress = progress; + return true; +} + +void tscSaveSubscriptionProgress(void* sub) { + SSub* pSub = (SSub*)sub; + + char path[256]; + sprintf(path, "%s/subscribe", dataDir); + if (access(path, 0) != 0) { + mkdir(path, 0777); + } + + sprintf(path, "%s/subscribe/%s", dataDir, pSub->topic); + FILE* fp = fopen(path, "w+"); + if (fp == NULL) { + tscError("failed to create progress file for subscription: %s", pSub->topic); + return; + } + + fputs(pSub->pSql->sqlstr, fp); + fprintf(fp, "\n%d\n", pSub->numOfMeters); + for (int i = 0; i < pSub->numOfMeters; i++) { + int64_t uid = pSub->progress[i].uid; + TSKEY key = pSub->progress[i].key; + fprintf(fp, "uid=%" PRId64 ",progress=%" PRId64 "\n", uid, key); + } + + fclose(fp); +} + + +TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { STscObj* pObj = (STscObj*)taos; if (pObj == NULL || pObj->signature != pObj) { globalCode = TSDB_CODE_DISCONNECTED; @@ -212,12 +300,18 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp return NULL; } - SSub* pSub = tscCreateSubscription(pObj, sql); + SSub* pSub = tscCreateSubscription(pObj, topic, sql); if (pSub == NULL) { return NULL; } pSub->taos = taos; + if (restart) { + tscTrace("restart subscription: %s", topic); + } else { + tscLoadSubscriptionProgress(pSub); + } + if (!tscUpdateSubscription(pObj, pSub)) { return NULL; } @@ -226,7 +320,7 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp pSub->fp = fp; pSub->interval = interval; pSub->param = param; - taosTmrReset(tscProcessSubscribeTimer, 0, pSub, tscTmr, &pSub->pTimer); + taosTmrReset(tscProcessSubscriptionTimer, 0, pSub, tscTmr, &pSub->pTimer); } return pSub; @@ -236,7 +330,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; - if (taosGetTimestampMs() - pSub->lastSyncTime > 30 * 10 * 1000) { + if (taosGetTimestampMs() - pSub->lastSyncTime > 30 * 60 * 1000) { taos_query(pSub->taos, "reset query cache;"); // TODO: clear memory if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; diff --git a/src/inc/taos.h b/src/inc/taos.h index aa2ed02f9b17c9270230761fa3f728d98422f40a..bdba644bb69b2204d43db3edcdbbaa9a022d58f2 100644 --- a/src/inc/taos.h +++ b/src/inc/taos.h @@ -117,7 +117,7 @@ DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RE DLL_EXPORT void taos_fetch_row_a(TAOS_RES *res, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), void *param); typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code); -DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); +DLL_EXPORT TAOS_SUB *taos_subscribe(const char* topic, int restart, TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub); diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index ae73ba52492e61e36e10614c4f076748bf8e28f3..cc1733a747280435946199536abe33a538df6098 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -682,18 +682,14 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; -pQInfo->killed = 0; -/* if (isQueryKilled(pQuery)) { setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK); return; } -*/ TSKEY skey = pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[k]->key; if (skey > 0) { pQuery->skey = skey; - // pQuery->lastKey = ???; } bool dataInDisk = true; diff --git a/tests/examples/c/subscribe.c b/tests/examples/c/subscribe.c index b35dca772fcc940f184f6fa5839c728d34121884..7c76e1088cd9a72ccfcf3f103c036f5e1cf1a618 100644 --- a/tests/examples/c/subscribe.c +++ b/tests/examples/c/subscribe.c @@ -28,7 +28,7 @@ int main(int argc, char *argv[]) { const char* host = "127.0.0.1"; const char* user = "root"; const char* passwd = "taosdata"; - int async = 1; + int async = 1, restart = 0; TAOS_SUB* tsub = NULL; for (int i = 1; i < argc; i++) { @@ -44,8 +44,12 @@ int main(int argc, char *argv[]) { passwd = argv[i] + 3; continue; } - if (strncmp(argv[i], "-m=", 3) == 0) { - async = strcmp(argv[i] + 3, "sync"); + if (strcmp(argv[i], "-sync") == 0) { + async = 0; + continue; + } + if (strcmp(argv[i], "-restart") == 0) { + restart = 1; continue; } } @@ -60,9 +64,9 @@ int main(int argc, char *argv[]) { } if (async) { - tsub = taos_subscribe(taos, "select * from meters;", subscribe_callback, NULL, 1000); + tsub = taos_subscribe("test", restart, taos, "select * from meters;", subscribe_callback, NULL, 1000); } else { - tsub = taos_subscribe(taos, "select * from meters;", NULL, NULL, 0); + tsub = taos_subscribe("test", restart, taos, "select * from meters;", NULL, NULL, 0); } if (tsub == NULL) {