From b451e6859de7d18fa6a05179ecc8baadf3c3f68e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Dec 2020 14:34:34 +0800 Subject: [PATCH] [TD-2587]: fix bugs introduced by refactor. --- src/client/src/tscSub.c | 6 +++++- src/query/src/qExecutor.c | 11 +++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 52b74f7502..7f0b174ad3 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -69,14 +69,17 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt) { } void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts) { - if( sub == NULL) + if( sub == NULL) { return; + } + SSub* pSub = (SSub*)sub; SSubscriptionProgress target = {.uid = uid, .key = ts}; SSubscriptionProgress* p = taosArraySearch(pSub->progress, &target, tscCompareSubscriptionProgress); if (p != NULL) { p->key = ts; + tscDebug("subscribe:%s, uid:%"PRIu64" update sub start ts:%"PRId64, pSub->topic, p->uid, p->key); } } @@ -502,6 +505,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key; + tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); } if (pSub->pTimer == NULL) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fb6f5f2044..fa9fc6d1f3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1722,6 +1722,10 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pResultRowInfo, searchFn, pDataBlock); } + // update the lastkey of current table for projection/aggregation query + TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey; + pTableQueryInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + // interval query with limit applied int32_t numOfRes = 0; if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { @@ -4299,7 +4303,9 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); + int32_t total = 0; STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL); + while(item) { STableIdInfo* pDst = (STableIdInfo*)data; pDst->uid = htobe64(item->uid); @@ -4307,9 +4313,14 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data pDst->key = htobe64(item->key); data += sizeof(STableIdInfo); + total++; + + qDebug("QInfo:%p set subscribe info, tid:%d, uid:%"PRIu64", skey:%"PRId64, pQInfo, item->tid, item->uid, item->key); item = taosHashIterate(pQInfo->arrTableIdInfo, item); } + qDebug("QInfo:%p set %d subscribe info", pQInfo, total); + // Check if query is completed or not for stable query or normal table query respectively. if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { if (pQInfo->runtimeEnv.stableQuery) { -- GitLab