From 44536fdcf746a9d32ce79fa104d645e3da3e1e98 Mon Sep 17 00:00:00 2001 From: localvar Date: Tue, 31 Dec 2019 17:53:16 +0800 Subject: [PATCH] TBASE-1427 --- src/client/src/tscServer.c | 26 ++++++----- src/client/src/tscSub.c | 74 +++++++++++++++++++++--------- src/system/detail/src/vnodeRead.c | 1 + src/system/detail/src/vnodeShell.c | 20 ++++---- 4 files changed, 79 insertions(+), 42 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ca96103593..03b66a1996 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -3543,18 +3543,20 @@ int tscProcessRetrieveRspFromVnode(SSqlObj *pSql) { tscSetResultPointer(pCmd, pRes); - TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); - int16_t offset = tscFieldInfoGetOffset(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); - char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; - - int32_t numOfMeters = htonl(*(int32_t*)p); - p += sizeof(int32_t); - for (int i = 0; i < numOfMeters; i++) { - int64_t uid = htobe64(*(int64_t*)p); - p += sizeof(int64_t); - TSKEY key = htobe64(*(TSKEY*)p); - p += sizeof(TSKEY); - tscUpdateSubscriptionProgress(pSql, uid, key); + if (pSql->pSubscription != NULL) { + TAOS_FIELD *pField = tscFieldInfoGetField(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); + int16_t offset = tscFieldInfoGetOffset(pCmd, pCmd->fieldsInfo.numOfOutputCols - 1); + char* p = pRes->data + (pField->bytes + offset) * pRes->numOfRows; + + int32_t numOfMeters = htonl(*(int32_t*)p); + p += sizeof(int32_t); + for (int i = 0; i < numOfMeters; i++) { + int64_t uid = htobe64(*(int64_t*)p); + p += sizeof(int64_t); + TSKEY key = htobe64(*(TSKEY*)p); + p += sizeof(TSKEY); + tscUpdateSubscriptionProgress(pSql, uid, key); + } } pRes->row = 0; diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 5a2ae4ee4e..079c7441ce 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -32,6 +32,7 @@ typedef struct SSubscriptionProgress { } SSubscriptionProgress; typedef struct SSub { + int64_t lastSyncTime; void * signature; TAOS * taos; void * pTimer; @@ -152,45 +153,44 @@ static void tscProcessSubscribeTimer(void *handle, void *tmrId) { } -TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { - STscObj* pObj = (STscObj*)taos; - if (pObj == NULL || pObj->signature != pObj) { - globalCode = TSDB_CODE_DISCONNECTED; - tscError("connection disconnected"); - return NULL; - } - - SSub* pSub = tscCreateSubscription(pObj, sql); - if (pSub == NULL) { - return NULL; - } - +bool tscUpdateSubscription(STscObj* pObj, SSub* pSub) { int code = (uint8_t)tsParseSql(pSub->pSql, pObj->acctId, pObj->db, false); if (code != TSDB_CODE_SUCCESS) { taos_unsubscribe(pSub); - return NULL; + return false; } + int numOfMeters = 0; + SSubscriptionProgress* progress = NULL; + // ??? if there's more than one vnode SSqlCmd* pCmd = &pSub->pSql->cmd; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(pCmd, 0); if (UTIL_METER_IS_NOMRAL_METER(pMeterMetaInfo)) { - pSub->numOfMeters = 1; - pSub->progress = calloc(1, sizeof(SSubscriptionProgress)); - pSub->progress[0].uid = pMeterMetaInfo->pMeterMeta->uid; + numOfMeters = 1; + progress = calloc(1, sizeof(SSubscriptionProgress)); + int64_t uid = pMeterMetaInfo->pMeterMeta->uid; + progress[0].uid = uid; + progress[0].key = tscGetSubscriptionProgress(pSub->pSql, uid); } else { SMetricMeta* pMetricMeta = pMeterMetaInfo->pMetricMeta; SVnodeSidList *pVnodeSidList = tscGetVnodeSidList(pMetricMeta, pMeterMetaInfo->vnodeIndex); - pSub->numOfMeters = pVnodeSidList->numOfSids; - pSub->progress = calloc(pSub->numOfMeters, sizeof(SSubscriptionProgress)); - for (int32_t i = 0; i < pSub->numOfMeters; ++i) { + numOfMeters = pVnodeSidList->numOfSids; + progress = calloc(numOfMeters, sizeof(SSubscriptionProgress)); + for (int32_t i = 0; i < numOfMeters; ++i) { SMeterSidExtInfo *pMeterInfo = tscGetMeterSidInfo(pVnodeSidList, i); - pSub->progress[i].uid = pMeterInfo->uid; + int64_t uid = pMeterInfo->uid; + progress[i].uid = uid; + progress[i].key = tscGetSubscriptionProgress(pSub->pSql, uid); } - qsort(pSub->progress, pSub->numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); + qsort(progress, numOfMeters, sizeof(SSubscriptionProgress), tscCompareSubscriptionProgress); } + free(pSub->progress); + pSub->numOfMeters = numOfMeters; + pSub->progress = progress; + // timestamp must in the output column SFieldInfo* pFieldInfo = &pCmd->fieldsInfo; tscFieldInfoSetValue(pFieldInfo, pFieldInfo->numOfOutputCols, TSDB_DATA_TYPE_TIMESTAMP, "_c0", TSDB_KEYSIZE); @@ -198,6 +198,30 @@ TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp tscFieldInfoUpdateVisible(pFieldInfo, pFieldInfo->numOfOutputCols - 1, false); tscFieldInfoCalOffset(pCmd); + pSub->lastSyncTime = taosGetTimestampMs(); + + return true; +} + + +TAOS_SUB *taos_subscribe(TAOS *taos, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { + STscObj* pObj = (STscObj*)taos; + if (pObj == NULL || pObj->signature != pObj) { + globalCode = TSDB_CODE_DISCONNECTED; + tscError("connection disconnected"); + return NULL; + } + + SSub* pSub = tscCreateSubscription(pObj, sql); + if (pSub == NULL) { + return NULL; + } + pSub->taos = taos; + + if (!tscUpdateSubscription(pObj, pSub)) { + return NULL; + } + if (fp != NULL) { pSub->fp = fp; pSub->interval = interval; @@ -212,6 +236,12 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSub *pSub = (SSub *)tsub; if (pSub == NULL) return NULL; + if (taosGetTimestampMs() - pSub->lastSyncTime > 30 * 10 * 1000) { + taos_query(pSub->taos, "reset query cache;"); + // TODO: clear memory + if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; + } + SSqlObj* pSql = pSub->pSql; SSqlRes *pRes = &pSql->res; diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 2a913665cd..9612cc6eb6 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -1105,6 +1105,7 @@ int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQueryMsg) { pSids[0] = (SMeterSidExtInfo *)pMsg; pSids[0]->sid = htonl(pSids[0]->sid); pSids[0]->uid = htobe64(pSids[0]->uid); + pSids[0]->key = htobe64(pSids[0]->key); for (int32_t j = 1; j < pQueryMsg->numOfSids; ++j) { pSids[j] = (SMeterSidExtInfo *)((char *)pSids[j - 1] + sizeof(SMeterSidExtInfo) + pQueryMsg->tagLength); diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 2c8fe35cca..e527164df1 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -441,7 +441,9 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { // buffer size for progress information, including meter count, // and for each meter, including 'uid' and 'TSKEY'. - int progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t); + int progressSize = 0; + if (pQInfo->pMeterQuerySupporter != NULL) + progressSize = pQInfo->pMeterQuerySupporter->numOfMeters * (sizeof(int64_t) + sizeof(TSKEY)) + sizeof(int32_t); pStart = taosBuildRspMsgWithSize(pObj->thandle, TSDB_MSG_TYPE_RETRIEVE_RSP, progressSize + size + 100); if (pStart == NULL) { @@ -476,13 +478,15 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { // write the progress information of each meter to response // this is required by subscriptions - *((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); - pMsg += sizeof(int32_t); - for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { - *((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid); - pMsg += sizeof(int64_t); - *((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); - pMsg += sizeof(TSKEY); + if (progressSize > 0) { + *((int32_t*)pMsg) = htonl(pQInfo->pMeterQuerySupporter->numOfMeters); + pMsg += sizeof(int32_t); + for (int32_t i = 0; i < pQInfo->pMeterQuerySupporter->numOfMeters; i++) { + *((int64_t*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->uid); + pMsg += sizeof(int64_t); + *((TSKEY*)pMsg) = htobe64(pQInfo->pMeterQuerySupporter->pMeterSidExtInfo[i]->key); + pMsg += sizeof(TSKEY); + } } msgLen = pMsg - pStart; -- GitLab