diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index f3d7ef28c05fc3128622f8217341d52327d79ec0..32257f5a7c723d04ec0f8200c889a68d78cf7824 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -503,9 +503,19 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { SSqlCmd *pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); - if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single tabel subscription - pQueryInfo->window.skey = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, 0))->key; - tscDebug("subscribe:%s set subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); + if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription + + size_t size = taosArrayGetSize(pSub->progress); + TSKEY s = INT64_MAX; + for(int32_t i = 0; i < size; ++i) { + TSKEY k = ((SSubscriptionProgress*)taosArrayGet(pSub->progress, i))->key; + if (s > k) { + s = k; + } + } + + pQueryInfo->window.skey = s; + tscDebug("subscribe:%s set next round subscribe skey:%"PRId64, pSub->topic, pQueryInfo->window.skey); } if (pSub->pTimer == NULL) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ffb6a316a60c1dbdab8f7502507cdcfb73d94b28..dca0e1f46413eec50e8d13350ec4725701f5cb52 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3976,7 +3976,10 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) { return tidInfo; } -static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SHashObj* pTableIdInfo) { +static void updateTableIdInfo(STableQueryInfo* pTableQueryInfo, SSDataBlock* pBlock, SHashObj* pTableIdInfo, int32_t order) { + int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); + pTableQueryInfo->lastKey = ((order == TSDB_ORDER_ASC)? pBlock->info.window.ekey:pBlock->info.window.skey) + step; + STableIdInfo tidInfo = createTableIdInfo(pTableQueryInfo); STableIdInfo *idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); if (idinfo != NULL) { @@ -4408,7 +4411,10 @@ static SSDataBlock* doArithmeticOperation(void* param) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SOptrBasicInfo *pInfo = &pArithInfo->binfo; - pInfo->pRes->info.rows = 0; + SSDataBlock* pRes = pInfo->pRes; + int32_t order = pRuntimeEnv->pQuery->order.order; + + pRes->info.rows = 0; while(1) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); @@ -4417,17 +4423,20 @@ static SSDataBlock* doArithmeticOperation(void* param) { break; } - setTagValue(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->pQuery->current; + + // todo dynamic set tags + setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->pCtx, pBlock, pRuntimeEnv->pQuery->order.order); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); updateOutputBuf(pArithInfo, pBlock->info.rows); - arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - pInfo->pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - updateTableIdInfo(pRuntimeEnv->pQuery->current, pRuntimeEnv->pTableRetrieveTsMap); + arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); - if (pInfo->pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { + pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { break; } }