diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 5df2dc4abae048167b776acaa9e57c4dff7cdeb7..b46829aedadd43ae42fb2d15c43fbf1ce0ced824 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -443,7 +443,7 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { pSubQueryInfo->tsBuf = NULL; // free result for async object will also free sqlObj - assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one resutl columns + assert(tscSqlExprNumOfExprs(pSubQueryInfo) == 1); // ts_comp query only requires one result columns taos_free_result(pPrevSub); SSqlObj *pNew = createSubqueryObj(pSql, (int16_t) i, tscJoinQueryCallback, pSupporter, TSDB_SQL_SELECT, NULL); @@ -834,6 +834,8 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow SSqlRes* pRes = &pSql->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + // todo, the type may not include TSDB_QUERY_TYPE_TAG_FILTER_QUERY assert(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_TAG_FILTER_QUERY)); if (pParentSql->res.code != TSDB_CODE_SUCCESS) { @@ -2605,12 +2607,17 @@ static SSqlObj *tscCreateSTableSubquery(SSqlObj *pSql, SRetrieveSupport *trsuppo SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pQueryInfo->type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; + + // clear the limit/offset info, since it should not be sent to vnode to be executed. + pQueryInfo->limit.limit = -1; + pQueryInfo->limit.offset = 0; + assert(pQueryInfo->numOfTables == 1 && pNew->cmd.numOfClause == 1 && trsupport->subqueryIndex < pSql->subState.numOfSub); // launch subquery for each vnode, so the subquery index equals to the vgroupIndex. STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, table_index); pTableMetaInfo->vgroupIndex = trsupport->subqueryIndex; - + pSql->pSubs[trsupport->subqueryIndex] = pNew; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 95cf28ec49346e500d00c1bcac9dc6f4055a0eb4..be3bf071975c796aec23b1721f043b69db80a05d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1722,10 +1722,15 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->fieldsInfo.internalField = taosArrayInit(4, sizeof(SInternalField)); assert(pQueryInfo->exprList == NULL); - pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); - pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); - pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; - pQueryInfo->resColumnId= -1000; + pQueryInfo->exprList = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES); + pQueryInfo->udColumnId = TSDB_UD_COLUMN_INDEX; + pQueryInfo->resColumnId = -1000; + pQueryInfo->limit.limit = -1; + pQueryInfo->limit.offset = 0; + + pQueryInfo->slimit.limit = -1; + pQueryInfo->slimit.offset = 0; } int32_t tscAddSubqueryInfo(SSqlCmd* pCmd) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 6c3e175856e299f4405fd60ad2ed28c6f432f537..cf3345c0be6da5ddf9acde2473c30393e9669d59 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -228,8 +228,6 @@ typedef struct SQuery { uint32_t status; // query status STableQueryInfo* current; - int32_t numOfCheckedBlocks; // number of check data blocks - void* tsdb; SMemRef memRef; STableGroupInfo tableGroupInfo; // table list SArray diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2567216a5813cd325eb2e67cd4dc8369b57915bb..843cf388aa93af0f026eba02bb736a8d4dc559e1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1121,8 +1121,8 @@ static void setArithParams(SArithmeticSupport* sas, SExprInfo *pExprInfo, SSData static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pCtx[i].size = pBlock->info.rows; pCtx[i].order = order; + pCtx[i].size = pBlock->info.rows; pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag; setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); @@ -1148,8 +1148,8 @@ static void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SS static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pCtx[i].size = pBlock->info.rows; pCtx[i].order = order; + pCtx[i].size = pBlock->info.rows; pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag; setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo); @@ -1163,7 +1163,6 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); pCtx[i].pInput = p->pData; - assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type && pCtx[i].inputBytes == p->info.bytes); uint32_t status = aAggs[pCtx[i].functionId].status; @@ -1254,8 +1253,7 @@ void doRowwiseTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* } static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, - int32_t pos, int32_t numOfRows, - SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) { + int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) { SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -2376,22 +2374,18 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); -// if (isTsCompQuery(pQuery)) { -// pRuntimeEnv->proot = createSeqTableBlockScanOperator(pRuntimeEnv->pTableScanner, pRuntimeEnv); - /*} else*/ if (!onlyQueryTags(pQuery)) { + if (!onlyQueryTags(pQuery)) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, pRuntimeEnv->pTableScanner, pQuery->pExpr1, pQuery->numOfOutput); setTableScanFilterOperatorInfo(pRuntimeEnv->pTableScanner->info, pRuntimeEnv->proot); } } - if (!pQuery->stableQuery || isProjQuery(pQuery)) { // TODO this problem should be handed at the client side - if (pQuery->limit.offset > 0) { - pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - } + if (pQuery->limit.offset > 0) { + pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); + } - if (pQuery->limit.limit > 0) { - pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); - } + if (pQuery->limit.limit > 0) { + pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); } return TSDB_CODE_SUCCESS; @@ -3460,6 +3454,7 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC // set tag value, by which the results are aggregated. int32_t offset = 0; memset(pRuntimeEnv->tagVal, 0, pQuery->tagLen); + for (int32_t idx = 0; idx < numOfOutput; ++idx) { SExprInfo* pLocalExprInfo = &pExpr[idx]; @@ -3481,6 +3476,36 @@ void setTagVal_rv(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pC offset += pLocalExprInfo->bytes; } + if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) { + //todo : use index to avoid iterator all possible output columns + for(int32_t i = 0; i < numOfOutput; ++i) { + if(pExpr[i].base.functionId != TSDB_FUNC_STDDEV_DST) { + continue; + } + + SSqlFuncMsg* pFuncMsg = &pExpr[i].base; + + pCtx[i].param[0].arr = NULL; + pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + + // TODO use hash to speedup this loop + int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult); + for(int32_t j = 0; j < numOfGroup; ++j) { + SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j); + if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) { + int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult); + for(int32_t k = 0; k < numOfCols; ++k) { + SStddevInterResult* pres = taosArrayGet(p->pResult, k); + if (pres->colId == pFuncMsg->colInfo.colId) { + pCtx[i].param[0].arr = pres->pResult; + break; + } + } + } + } + } + } + // set the join tag for first column SSqlFuncMsg* pFuncMsg = &pExprInfo->base; if (pQuery->stableQuery && @@ -4545,44 +4570,45 @@ void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, S // return 0; //} -//int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) { -// SQuery* pQuery = pRuntimeEnv->pQuery; -// -// if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) { -// return TSDB_CODE_SUCCESS; -// } -// -// int32_t numOfExprs = pQuery->numOfOutput; -// for(int32_t i = 0; i < numOfExprs; ++i) { -// SExprInfo* pExprInfo = &(pQuery->pExpr1[i]); -// if(pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { -// continue; -// } -// -// SSqlFuncMsg* pFuncMsg = &pExprInfo->base; -// -// pRuntimeEnv->pCtx[i].param[0].arr = NULL; -// pRuntimeEnv->pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int -// -// int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult); -// for(int32_t j = 0; j < numOfGroup; ++j) { -// SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j); -// if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) { -// -// int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult); -// for(int32_t k = 0; k < numOfCols; ++k) { -// SStddevInterResult* pres = taosArrayGet(p->pResult, k); -// if (pres->colId == pFuncMsg->colInfo.colId) { -// pRuntimeEnv->pCtx[i].param[0].arr = pres->pResult; -// break; -// } -// } -// } -// } -// } -// -// return 0; -//} +int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, + SExprInfo* pExpr) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) { + return TSDB_CODE_SUCCESS; + } + + int32_t numOfExprs = pQuery->numOfOutput; + for(int32_t i = 0; i < numOfExprs; ++i) { + SExprInfo* pExprInfo = &(pExpr[i]); + if(pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { + continue; + } + + SSqlFuncMsg* pFuncMsg = &pExprInfo->base; + + pCtx[i].param[0].arr = NULL; + pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + + // TODO use hash to speedup this loop + int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult); + for(int32_t j = 0; j < numOfGroup; ++j) { + SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j); + if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) { + int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult); + for(int32_t k = 0; k < numOfCols; ++k) { + SStddevInterResult* pres = taosArrayGet(p->pResult, k); + if (pres->colId == pFuncMsg->colInfo.colId) { + pCtx[i].param[0].arr = pres->pResult; + break; + } + } + } + } + } + + return 0; +} /* * There are two cases to handle: @@ -5447,6 +5473,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->topBotQuery = isTopBottomQuery(pQuery); pQuery->hasTagResults = hasTagValOutput(pQuery); pQuery->timeWindowInterpo = timeWindowInterpoRequired(pQuery); + pQuery->stabledev = isStabledev(pQuery); pRuntimeEnv->prevResult = prevResult; pRuntimeEnv->qinfo = pQInfo;