diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index c9495f8b3c0f892054c92f7314f3fa6b1894bbcb..74a9ee4533c5f1e0affe020377e831578d6cfffb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -364,6 +364,12 @@ typedef struct STableScanInfo { int64_t elapsedTime; } STableScanInfo; +typedef struct STagScanInfo { + SQueryRuntimeEnv *pRuntimeEnv; + SColumnInfo* pCols; + SSDataBlock* pRes; +} STagScanInfo; + typedef struct SAggOperatorInfo { SResultRowInfo resultRowInfo; STableQueryInfo *pTableQueryInfo; diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index b1ad96d21ddc2a4072e57aef6524ea9c1cd62924..49f4aa0872e9f4ed7a603732632d68a2bdd4d117 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -1841,7 +1841,7 @@ static void first_dist_function(SQLFunctionCtx *pCtx) { * 1. data block that are not loaded * 2. scan data files in desc order */ - if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) { + if (pCtx->order == TSDB_ORDER_DESC/* || pCtx->preAggVals.dataBlockLoaded == false*/) { return; } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 48a9795bae29e262066a7f3f93633273be7b3e9c..02bc9c8573f9428b6ce71ed90c7d357290eeee51 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -152,6 +152,7 @@ static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { } tw->ekey -= 1; } +static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); static void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult); static void setResultOutputBuf_rv(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols); @@ -163,7 +164,7 @@ static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, SDataStatis *pStatis, SExprInfo* pExprInfo); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); - static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx); +static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pSQLCtx); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); static void resetDefaultResInfoOutputBuf(SQueryRuntimeEnv *pRuntimeEnv); static bool hasMainOutput(SQuery *pQuery); @@ -191,6 +192,7 @@ static SOperatorInfo* createHashIntervalAggOperatorInfo(STableQueryInfo* pTableQ static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createHashGroupbyAggOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); static SOperatorInfo* createStableIntervalOperatorInfo(STableQueryInfo* pTableQueryInfo, SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv); static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); @@ -215,7 +217,8 @@ static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput) { idata.info.type = pExpr[i].type; idata.info.bytes = pExpr[i].bytes; idata.info.colId = pExpr[i].base.resColId; - idata.pData = calloc(4096, idata.info.bytes * 4096); + idata.pData = calloc(4096, idata.info.bytes); + taosArrayPush(res->pDataBlock, &idata); } @@ -2452,6 +2455,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf qDebug("QInfo:%p init runtime completed", GET_QINFO_ADDR(pRuntimeEnv)); + + // group by normal column, sliding window query, interval query are handled by interval query processor // if (!pQuery->stableQuery) { // interval (down sampling operation) if (QUERY_IS_INTERVAL_QUERY(pQuery)) { @@ -2487,7 +2492,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else { // diff/add/multiply/subtract/division assert(pQuery->checkResultBuf == 1); - pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + if (!onlyQueryTags(pQuery)) { + pRuntimeEnv->proot = createArithOperatorInfo(pQuery->current, pRuntimeEnv, pRuntimeEnv->pi); + setTableScanFilterOperatorInfo(pRuntimeEnv->pi->optInfo, pRuntimeEnv->proot); + } } if (pQuery->limit.offset > 0) { @@ -3230,10 +3238,24 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte } start += len; + len = 0; } } } + if (len > 0) { + int32_t cstart = numOfRows - len; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i); + + int16_t bytes = pColumnInfoData->info.bytes; + memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes); + } + + start += len; + len = 0; + } + pBlock->info.rows = start; pBlock->pBlockStatis = NULL; // clean the block statistics info @@ -4068,6 +4090,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR pCtx[i].resultInfo = pCellInfo; pCtx[i].pOutput = pData->pData; + assert(pCtx[i].pOutput != NULL); // set the timestamp output buffer for top/bottom/diff query int32_t functionId = pCtx->functionId; @@ -5668,7 +5691,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts pQuery->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr); - if (needReverseScan(pQuery)) { + if (onlyQueryTags(pQuery)) { + pRuntimeEnv->proot = createTagScanOperator(pRuntimeEnv); + } else if (needReverseScan(pQuery)) { pRuntimeEnv->pi = createBiDirectionTableScanInfo(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery), 1); } else { pRuntimeEnv->pi = createTableScanOperator(pRuntimeEnv->pQueryHandle, pRuntimeEnv, getNumOfScanTimes(pQuery)); @@ -6628,7 +6653,8 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { // todo check for query cancel tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); - if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1) { + if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1 || + (pQuery->current == NULL && pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)) { STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.map, &pBlock->info.tid, sizeof(pBlock->info.tid)); if (pTableQueryInfo == NULL) { @@ -6637,6 +6663,8 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) { pQuery->current = *pTableQueryInfo; doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo); + } else if (pTableScanInfo->pRuntimeEnv->pQuery->current == NULL) { + } // this function never returns error? @@ -6765,6 +6793,11 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { SHashIntervalOperatorInfo *pInfo = pDownstream->optInfo; + pTableScanInfo->pCtx = pInfo->pCtx; + pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; + } else if (strcasecmp(name, "ArithmeticOp") == 0) { + SArithOperatorInfo *pInfo = pDownstream->optInfo; + pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; } else { @@ -6815,10 +6848,8 @@ static SSDataBlock* doAggregation(void* param) { SQuery* pQuery = pRuntimeEnv->pQuery; int32_t order = pQuery->order.order; - SQLFunctionCtx* pCtx = createSQLFunctionCtx(pRuntimeEnv, pOperator->numOfOutput, pOperator->pExpr, order, pQuery->vgId); - SSDataBlock* pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); - setDefaultOutputBuf(pRuntimeEnv, pCtx, &pRuntimeEnv->resultRowInfo, pRes); + setDefaultOutputBuf(pRuntimeEnv, pAggInfo->pCtx, &pRuntimeEnv->resultRowInfo, pRes); SOperatorInfo* upstream = pOperator->upstream; pQuery->pos = 0; @@ -6835,8 +6866,8 @@ static SSDataBlock* doAggregation(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputSDataBlock(pOperator, pCtx, pBlock, order); - aggApplyFunctions(pRuntimeEnv, pOperator, pCtx, pBlock); + setInputSDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); + aggApplyFunctions(pRuntimeEnv, pOperator, pAggInfo->pCtx, pBlock); } pOperator->completed = true; @@ -6846,8 +6877,8 @@ static SSDataBlock* doAggregation(void* param) { finalizeQueryResult(pRuntimeEnv); } - pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pCtx, pOperator->numOfOutput); - destroySQLFunctionCtx(pCtx, pRes->info.numOfCols); + pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput); + destroySQLFunctionCtx(pAggInfo->pCtx, pRes->info.numOfCols); return pRes; } @@ -7369,7 +7400,156 @@ static SOperatorInfo* createFillOperatorInfo(STableQueryInfo* pTableQueryInfo, S return pOperator; } +static SSDataBlock* doTagScan(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + STagScanInfo *pTagScanInfo = pOperator->optInfo; + SQueryRuntimeEnv *pRuntimeEnv = pTagScanInfo->pRuntimeEnv; + SQuery* pQuery = pRuntimeEnv->pQuery; + + size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); + assert(numOfGroup == 0 || numOfGroup == 1); + + if (numOfGroup == 0) { + return NULL; + } + + SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); + + size_t num = taosArrayGetSize(pa); + assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); + + if (pTagScanInfo->pRes == NULL) { + pTagScanInfo->pRes = createOutputBuf(pOperator->pExpr, pOperator->numOfOutput); + } + + int32_t count = 0; +// int32_t functionId = pOperator->pExpr[0].base.functionId; + /*if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id + assert(pQuery->numOfOutput == 1); + + SExprInfo* pExprInfo = &pOperator->pExpr[0]; + int32_t rsize = pExprInfo->bytes; + count = 0; + + int16_t bytes = pExprInfo->bytes; + int16_t type = pExprInfo->type; + + for(int32_t i = 0; i < pQuery->numOfTags; ++i) { + if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) { + bytes = pQuery->tagColList[i].bytes; + type = pQuery->tagColList[i].type; + break; + } + } + + while(pRuntimeEnv->tableIndex < num && count < pQuery->rec.capacity) { + int32_t i = pRuntimeEnv->tableIndex++; + STableQueryInfo *item = taosArrayGetP(pa, i); + + char *output = pQuery->sdata[0]->data + count * rsize; + varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); + + output = varDataVal(output); + STableId* id = TSDB_TABLEID(item->pTable); + + *(int16_t *)output = 0; + output += sizeof(int16_t); + + *(int64_t *)output = id->uid; // memory align problem, todo serialize + output += sizeof(id->uid); + + *(int32_t *)output = id->tid; + output += sizeof(id->tid); + + *(int32_t *)output = pQuery->vgId; + output += sizeof(pQuery->vgId); + + if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + char* data = tsdbGetTableName(item->pTable); + memcpy(output, data, varDataTLen(data)); + } else { + char* data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes); + doSetTagValueToResultBuf(output, data, type, bytes); + } + + count += 1; + } + + qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count); + + } else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query + *(int64_t*) pQuery->sdata[0]->data = num; + + count = 1; + SET_STABLE_QUERY_OVER(pRuntimeEnv); + qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count); + } else*/ { // return only the tags|table name etc. + count = 0; + SSchema* tbnameSchema = tGetTbnameColumnSchema(); + + int32_t maxNumOfTables = (int32_t)pQuery->rec.capacity; +// if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pQuery->rec.capacity) { +// maxNumOfTables = (int32_t)pQuery->limit.limit; +// } + + while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) { + int32_t i = pRuntimeEnv->tableIndex++; + + SExprInfo* pExprInfo = pOperator->pExpr; + STableQueryInfo* item = taosArrayGetP(pa, i); + + char *data = NULL, *dst = NULL; + int16_t type = 0, bytes = 0; + for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + // not assign value in case of user defined constant output column + if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.colInfo.flag)) { + continue; + } + + SColumnInfoData* pColInfo = taosArrayGet(pTagScanInfo->pRes->pDataBlock, j); + + if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { + bytes = tbnameSchema->bytes; + type = tbnameSchema->type; + + data = tsdbGetTableName(item->pTable); + dst = pColInfo->pData + count * tbnameSchema->bytes; + } else { + type = pExprInfo[j].type; + bytes = pExprInfo[j].bytes; + + data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes); + dst = pColInfo->pData + count * pExprInfo[j].bytes; + } + + doSetTagValueToResultBuf(dst, data, type, bytes); + } + count += 1; + } + + pTagScanInfo->pRes->info.rows = count; + qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count); + } + + return pTagScanInfo->pRes; +} + +static SOperatorInfo* createTagScanOperator(SQueryRuntimeEnv* pRuntimeEnv) { + STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); + pInfo->pRuntimeEnv = pRuntimeEnv; + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "SeqTagScanOp"; + pOperator->blockingOptr = false; + pOperator->completed = false; + pOperator->optInfo = pInfo; + pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfOutput; + pOperator->exec = doTagScan; + pOperator->pExpr = pRuntimeEnv->pQuery->pExpr1; + return pOperator; +} /* * in each query, this function will be called only once, no retry for further result. @@ -8909,6 +9089,10 @@ void buildTagQueryResult(SQInfo* pQInfo) { return; } + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + pQuery->rec.rows = pRuntimeEnv->outputBuf->info.rows; + return; + SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); size_t num = taosArrayGetSize(pa);