diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 2528720d7e9e757b1eb85491f3a4d5199954b3e2..9425b88bc3afc31deecc01217f79017be63b85fd 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -74,14 +74,14 @@ static bool allSubqueryDone(SSqlObj *pParentSql) { SSubqueryState *subState = &pParentSql->subState; //lock in caller - + tscDebug("%p total subqueries: %d", pParentSql, subState->numOfSub); for (int i = 0; i < subState->numOfSub; i++) { if (0 == subState->states[i]) { - tscDebug("%p subquery:%p,%d is NOT finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); + tscDebug("%p subquery:%p, index: %d NOT finished, abort query completion check", pParentSql, pParentSql->pSubs[i], i); done = false; break; } else { - tscDebug("%p subquery:%p,%d is finished, total:%d", pParentSql, pParentSql->pSubs[i], i, subState->numOfSub); + tscDebug("%p subquery:%p, index: %d finished", pParentSql, pParentSql->pSubs[i], i); } } diff --git a/src/query/inc/qTsbuf.h b/src/query/inc/qTsbuf.h index 5d055782c9b82a1444c97a62d429cc2ba9a53986..00cc4e897f130348b81a7d96419c1b292cacca8c 100644 --- a/src/query/inc/qTsbuf.h +++ b/src/query/inc/qTsbuf.h @@ -112,13 +112,11 @@ STSBuf* tsBufClone(STSBuf* pTSBuf); STSGroupBlockInfo* tsBufGetGroupBlockInfo(STSBuf* pTSBuf, int32_t id); -void tsBufFlush(STSBuf* pTSBuf); - +void tsBufFlush(STSBuf* pTSBuf); void tsBufResetPos(STSBuf* pTSBuf); -STSElem tsBufGetElem(STSBuf* pTSBuf); - bool tsBufNextPos(STSBuf* pTSBuf); +STSElem tsBufGetElem(STSBuf* pTSBuf); STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t id, tVariant* tag); STSCursor tsBufGetCursor(STSBuf* pTSBuf); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 204730c73ec7c6bec9c7bfdaadca88ad5ada190d..db6a6311348808c13741e3cda56e12f672ad1730 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -144,7 +144,7 @@ static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, S static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static bool hasMainOutput(SQuery *pQuery); -//static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo); +static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, STableQueryInfo *pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win); @@ -186,6 +186,9 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOp static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); static bool isPointInterpoQuery(SQuery *pQuery); +static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); +static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); +static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); // setup the output buffer for each operator static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { @@ -574,7 +577,6 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow // not assign result buffer yet, add new result buffer if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->intermediateResultRowSize); if (ret != TSDB_CODE_SUCCESS) { return -1; @@ -1201,7 +1203,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - // goto _end; + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } int32_t forwardStep = 0; @@ -1223,7 +1225,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow w = pRes->win; ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &w, masterScan, &pResult, groupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); - assert(ret == TSDB_CODE_SUCCESS && !resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); @@ -1238,7 +1244,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // restore current time window ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); - assert(ret == TSDB_CODE_SUCCESS); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } } // window start key interpolation @@ -1258,7 +1266,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &nextWin, masterScan, &pResult, groupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - break; + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } ekey = reviseWindowEkey(pQuery, &nextWin); @@ -1313,7 +1321,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn } for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { - pInfo->binfo.pCtx[k].size = 1; // TODO refactor: extract from here + pInfo->binfo.pCtx[k].size = 1; int32_t functionId = pInfo->binfo.pCtx[k].functionId; if (functionNeedToExecute(pRuntimeEnv, &pInfo->binfo.pCtx[k], functionId)) { aAggs[functionId].xFunctionF(&pInfo->binfo.pCtx[k], j); @@ -1609,8 +1617,6 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { return NULL; } -static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); - static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables) { qDebug("QInfo:%p setup runtime env", pRuntimeEnv->qinfo); SQuery *pQuery = pRuntimeEnv->pQuery; @@ -1618,10 +1624,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->pQuery = pQuery; - pQuery->interBufSize = getOutputInterResultBufSize(pQuery); - - calResultBufSize(pQuery, &pRuntimeEnv->resultInfo); - pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pRuntimeEnv->keyBuf = malloc(pQuery->maxSrcColumnSize + sizeof(int64_t)); pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv)); @@ -2122,7 +2124,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) -static bool doFilterOnBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { +static bool doFilterByBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { SQuery* pQuery = pRuntimeEnv->pQuery; if (pDataStatis == NULL || pQuery->numOfFilterCols == 0) { @@ -2390,7 +2392,43 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId); static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes); -//TODO refactor +static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { + SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; + uint32_t status = BLK_DATA_NO_NEEDED; + + int32_t numOfOutput = pTableScanInfo->numOfOutput; + for (int32_t i = 0; i < numOfOutput; ++i) { + int32_t functionId = pCtx[i].functionId; + int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; + + // group by + first/last should not apply the first/last block filter + status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); + if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { + return status; + } + } + + return status; +} + +static void doSetFilterColumnInfo(SQuery* pQuery, SSDataBlock* pBlock) { + if (pQuery->numOfFilterCols > 0 && pQuery->pFilterInfo[0].pData != NULL) { + return; + } + + // set the initial static data value filter expression + for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { + for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j); + + if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) { + pQuery->pFilterInfo[i].pData = pColInfo->pData; + break; + } + } + } +} + int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { *status = BLK_DATA_NO_NEEDED; @@ -2399,14 +2437,15 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa SQuery* pQuery = pRuntimeEnv->pQuery; int64_t groupId = pQuery->current->groupIndex; + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); SQInfo* pQInfo = pRuntimeEnv->qinfo; SQueryCostInfo* pCost = &pQInfo->summary; if (pRuntimeEnv->pTsBuf != NULL) { - *status = BLK_DATA_ALL_NEEDED; + (*status) = BLK_DATA_ALL_NEEDED; - if (pQuery->stableQuery) { + if (pQuery->stableQuery) { // todo refactor SExprInfo* pExprInfo = &pTableScanInfo->pExpr[0]; int16_t tagId = (int16_t)pExprInfo->base.arg->argValue.i64; SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagId); @@ -2414,6 +2453,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa // compare tag first tVariant t = {0}; doSetTagValueInParam(pQuery->current->pTable, tagId, &t, pColInfo->type, pColInfo->bytes); + setTimestampListJoinInfo(pRuntimeEnv, &t, pQuery->current); STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); if (!tsBufIsValidElem(&elem) || (tsBufIsValidElem(&elem) && (tVariantCompare(&t, elem.tag) != 0))) { @@ -2423,58 +2463,35 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa } } - if (pQuery->numOfFilterCols > 0) { - *status = BLK_DATA_ALL_NEEDED; - } else { // check if this data block is required to load - // Calculate all time windows that are overlapping or contain current data block. - // If current data block is contained by all possible time window, do not load current data block. - if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info)) { - *status = BLK_DATA_ALL_NEEDED; - } - - if ((*status) != BLK_DATA_ALL_NEEDED) { - // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, - // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer - if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - SResultRow* pResult = NULL; + // Calculate all time windows that are overlapping or contain current data block. + // If current data block is contained by all possible time window, do not load current data block. + if (pQuery->numOfFilterCols > 0 || pQuery->groupbyColumn || + (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, &pBlock->info))) { + (*status) = BLK_DATA_ALL_NEEDED; + } - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); - TSKEY k = QUERY_IS_ASC_QUERY(pQuery) ? pBlock->info.window.skey : pBlock->info.window.ekey; - STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { - // todo handle error in set result for timewindow - } - } + // check if this data block is required to load + if ((*status) != BLK_DATA_ALL_NEEDED) { + // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, + // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + SResultRow* pResult = NULL; - int32_t numOfOutput = pTableScanInfo->numOfOutput; - SQLFunctionCtx* pCtx = pTableScanInfo->pCtx; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; - if (pQuery->groupbyColumn) { - (*status) = BLK_DATA_ALL_NEEDED; - } else { - for (int32_t i = 0; i < numOfOutput; ++i) { - int32_t functionId = pCtx[i].functionId; - int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId; - - // group by + first/last should not apply the first/last block filter - if (functionId != TSDB_FUNC_FIRST_DST && functionId != TSDB_FUNC_LAST_DST) { - (*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId); - if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) { - break; - } - } else { - (*status) |= BLK_DATA_ALL_NEEDED; - break; - } - } + STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery); + if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, + pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, + pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } + + (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); } SDataBlockInfo* pBlockInfo = &pBlock->info; - if ((*status) == BLK_DATA_NO_NEEDED) { qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); @@ -2502,8 +2519,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockStatis[i].min), (char*)&(pBlock->pBlockStatis[i].max)); - if (!load) { - // current block has been discard due to filter applied + if (!load) { // current block has been discard due to filter applied pCost->discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); @@ -2515,7 +2531,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa } // current block has been discard due to filter applied - if (!doFilterOnBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->discardBlocks += 1; qDebug("QInfo:%p data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); @@ -2530,23 +2546,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa return terrno; } - if (pQuery->numOfFilterCols > 0 && pQuery->pFilterInfo[0].pData == NULL) { - // set the initial static data value filter expression - for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { - for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, j); - - if (pQuery->pFilterInfo[i].info.colId == pColInfo->info.colId) { - pQuery->pFilterInfo[i].pData = pColInfo->pData; - break; - } - } - } - } - + doSetFilterColumnInfo(pQuery, pBlock); if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL) { filterRowsInDataBlock(pRuntimeEnv, pQuery->pFilterInfo, pQuery->numOfFilterCols, pBlock, pRuntimeEnv->pTsBuf, - QUERY_IS_ASC_QUERY(pQuery)); + ascQuery); } } @@ -2623,33 +2626,23 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes) { tVariantDestroy(tag); + char* val = NULL; if (tagColId == TSDB_TBNAME_COLUMN_INDEX) { - char* val = tsdbGetTableName(pTable); + val = tsdbGetTableName(pTable); assert(val != NULL); - - tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), TSDB_DATA_TYPE_BINARY); } else { - char* val = tsdbGetTableTagVal(pTable, tagColId, type, bytes); - if (val == NULL) { - tag->nType = TSDB_DATA_TYPE_NULL; - return; - } - - if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - if (isNull(val, type)) { - tag->nType = TSDB_DATA_TYPE_NULL; - return; - } + val = tsdbGetTableTagVal(pTable, tagColId, type, bytes); + } - tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type); - } else { - if (isNull(val, type)) { - tag->nType = TSDB_DATA_TYPE_NULL; - return; - } + if (val == NULL || isNull(val, type)) { + tag->nType = TSDB_DATA_TYPE_NULL; + return; + } - tVariantCreateFromBinary(tag, val, bytes, type); - } + if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { + tVariantCreateFromBinary(tag, varDataVal(val), varDataLen(val), type); + } else { + tVariantCreateFromBinary(tag, val, bytes, type); } } @@ -2679,6 +2672,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); + return; } else { // set tag value, by which the results are aggregated. int32_t offset = 0; @@ -2705,58 +2699,15 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt offset += pLocalExprInfo->bytes; } + //todo : use index to avoid iterator all possible output columns if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) { - //todo : use index to avoid iterator all possible output columns - for(int32_t i = 0; i < numOfOutput; ++i) { - if(pExpr[i].base.functionId != TSDB_FUNC_STDDEV_DST) { - continue; - } - - SSqlFuncMsg* pFuncMsg = &pExpr[i].base; - - pCtx[i].param[0].arr = NULL; - pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int - - // TODO use hash to speedup this loop - int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult); - for(int32_t j = 0; j < numOfGroup; ++j) { - SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j); - if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) { - int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult); - for(int32_t k = 0; k < numOfCols; ++k) { - SStddevInterResult* pres = taosArrayGet(p->pResult, k); - if (pres->colId == pFuncMsg->colInfo.colId) { - pCtx[i].param[0].arr = pres->pResult; - break; - } - } - } - } - } + setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo); } + } - // set the join tag for first column - SSqlFuncMsg* pFuncMsg = &pExprInfo->base; - if (pQuery->stableQuery && - (pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && - (pRuntimeEnv->pTsBuf != NULL) && - pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { - assert(pFuncMsg->numOfParams == 1); - - int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; - SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); - - doSetTagValueInParam(pTable, tagColId, &pCtx[0].tag, pColInfo->type, pColInfo->bytes); - - int16_t tagType = pCtx[0].tag.nType; - if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo, - pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz); - } else { - qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo, - pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64); - } - } + // set the tsBuf start position before check each data block + if (pRuntimeEnv->pTsBuf != NULL) { + setCtxTagForJoin(pRuntimeEnv, &pCtx[0], pExprInfo, pTable); } } @@ -3224,78 +3175,95 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF } } -//int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { -// SQuery* pQuery = pRuntimeEnv->pQuery; -// -// assert(pRuntimeEnv->pTsBuf != NULL); -// -// // both the master and supplement scan needs to set the correct ts comp start position -// tVariant* pTag = &pRuntimeEnv->pCtx[0].tag; -// -// if (pTableQueryInfo->cur.vgroupIndex == -1) { -// tVariantAssign(&pTableQueryInfo->tag, pTag); -// -// STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, &pTableQueryInfo->tag); -// -// // failed to find data with the specified tag value and vnodeId -// if (!tsBufIsValidElem(&elem)) { -// if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { -// qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz); -// } else { -// qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64); -// } -// -// return false; -// } -// -// // keep the cursor info of current meter -// pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); -// if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { -// qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); -// } else { -// qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); -// } -// -// } else { -// tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur); -// -// if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { -// qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); -// } else { -// qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); -// } -// } -// -// return 0; -//} +void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { + SQuery* pQuery = pRuntimeEnv->pQuery; -int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExpr) { + SSqlFuncMsg* pFuncMsg = &pExprInfo->base; + if (pQuery->stableQuery && (pRuntimeEnv->pTsBuf != NULL) && + (pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && + (pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX)) { + assert(pFuncMsg->numOfParams == 1); + + int16_t tagColId = (int16_t)pExprInfo->base.arg->argValue.i64; + SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId); + + doSetTagValueInParam(pTable, tagColId, &pCtx->tag, pColInfo->type, pColInfo->bytes); + + int16_t tagType = pCtx[0].tag.nType; + if (tagType == TSDB_DATA_TYPE_BINARY || tagType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%s", pRuntimeEnv->qinfo, + pExprInfo->base.arg->argValue.i64, pCtx[0].tag.pz); + } else { + qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%" PRId64, pRuntimeEnv->qinfo, + pExprInfo->base.arg->argValue.i64, pCtx[0].tag.i64); + } + } +} + +int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, tVariant* pTag, STableQueryInfo *pTableQueryInfo) { SQuery* pQuery = pRuntimeEnv->pQuery; - if (pRuntimeEnv->prevResult == NULL || pQuery->groupbyColumn) { - return TSDB_CODE_SUCCESS; + assert(pRuntimeEnv->pTsBuf != NULL); + + // both the master and supplement scan needs to set the correct ts comp start position + if (pTableQueryInfo->cur.vgroupIndex == -1) { + tVariantAssign(&pTableQueryInfo->tag, pTag); + + STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTsBuf, pQuery->vgId, &pTableQueryInfo->tag); + + // failed to find data with the specified tag value and vnodeId + if (!tsBufIsValidElem(&elem)) { + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qError("QInfo:%p failed to find tag:%s in ts_comp", pRuntimeEnv->qinfo, pTag->pz); + } else { + qError("QInfo:%p failed to find tag:%" PRId64 " in ts_comp", pRuntimeEnv->qinfo, pTag->i64); + } + + return -1; + } + + // Keep the cursor info of current table + pTableQueryInfo->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } else { + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } + + } else { + tsBufSetCursor(pRuntimeEnv->pTsBuf, &pTableQueryInfo->cur); + if (pTag->nType == TSDB_DATA_TYPE_BINARY || pTag->nType == TSDB_DATA_TYPE_NCHAR) { + qDebug("QInfo:%p find tag:%s start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->pz, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } else { + qDebug("QInfo:%p find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", pRuntimeEnv->qinfo, pTag->i64, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); + } } + return 0; +} + +void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr) { + SQuery* pQuery = pRuntimeEnv->pQuery; + int32_t numOfExprs = pQuery->numOfOutput; for(int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExprInfo = &(pExpr[i]); - if(pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { + if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) { continue; } SSqlFuncMsg* pFuncMsg = &pExprInfo->base; pCtx[i].param[0].arr = NULL; - pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int + pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int // TODO use hash to speedup this loop - int32_t numOfGroup = (int32_t) taosArrayGetSize(pRuntimeEnv->prevResult); - for(int32_t j = 0; j < numOfGroup; ++j) { - SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, j); + int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult); + for (int32_t j = 0; j < numOfGroup; ++j) { + SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j); if (pQuery->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQuery->tagLen) == 0) { - int32_t numOfCols = (int32_t) taosArrayGetSize(p->pResult); - for(int32_t k = 0; k < numOfCols; ++k) { + int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult); + for (int32_t k = 0; k < numOfCols; ++k) { SStddevInterResult* pres = taosArrayGet(p->pResult, k); if (pres->colId == pFuncMsg->colInfo.colId) { pCtx[i].param[0].arr = pres->pResult; @@ -3306,7 +3274,6 @@ int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32 } } - return 0; } /* @@ -3895,15 +3862,16 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->vgId = vgId; pQuery->stableQuery = isSTableQuery; pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); + pQuery->interBufSize = getOutputInterResultBufSize(pQuery); pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (isSTableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0); pRuntimeEnv->pQuery = pQuery; pRuntimeEnv->pTsBuf = pTsBuf; pRuntimeEnv->cur.vgroupIndex = -1; + setResultBufSize(pQuery, &pRuntimeEnv->resultInfo); if (onlyQueryTags(pQuery)) { - // TODO refactor. pRuntimeEnv->resultInfo.capacity = 4096; pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQuery->pExpr1, pQuery->numOfOutput); } else if (pQuery->queryBlockDist) { @@ -5888,23 +5856,28 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) { } } -static void calResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) { - const int32_t RESULT_MSG_MIN_SIZE = 1024 * (1024 + 512); // bytes - const int32_t RESULT_MSG_MIN_ROWS = 8192; - const float RESULT_THRESHOLD_RATIO = 0.85f; +void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) { + const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); + + // the minimum number of rows for projection query + const int32_t MIN_ROWS_FOR_PRJ_QUERY = 8192; + const int32_t DEFAULT_MIN_ROWS = 4096; + + const float THRESHOLD_RATIO = 0.85f; if (isProjQuery(pQuery)) { - int32_t numOfRes = RESULT_MSG_MIN_SIZE / pQuery->resultRowSize; - if (numOfRes < RESULT_MSG_MIN_ROWS) { - numOfRes = RESULT_MSG_MIN_ROWS; + int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQuery->resultRowSize; + if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) { + numOfRes = MIN_ROWS_FOR_PRJ_QUERY; } pResultInfo->capacity = numOfRes; - pResultInfo->threshold = (int32_t)(numOfRes * RESULT_THRESHOLD_RATIO); } else { // in case of non-prj query, a smaller output buffer will be used. - pResultInfo->capacity = 4096; - pResultInfo->threshold = (int32_t)(pResultInfo->capacity * RESULT_THRESHOLD_RATIO); + pResultInfo->capacity = DEFAULT_MIN_ROWS; } + + pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO); + pResultInfo->total = 0; } SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs,