diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 309d354849505163fb5e582fedb06a8f90f065ff..0c9ad78cd92ab00f5592757cb0712a52e6e7f646 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5311,9 +5311,10 @@ int32_t parseLimitClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t clauseIn if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { /* - * the limitation/offset value should be removed during retrieve data from virtual node, - * since the global order are done in client side, so the limitation should also - * be done at the client side. + * the offset value should be removed during retrieve data from virtual node, since the + * global order are done in client side, so the offset is applied at the client side + * However, note that the maximum allowed number of result for each table should be less + * than or equal to the value of limit. */ if (pQueryInfo->limit.limit > 0) { pQueryInfo->limit.limit = -1; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 71fc01923ab87104f6d77a5f920b9046fb36ee7b..4ec6ed30babb3d22c2cb1d050edb2c217c41b8eb 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -190,6 +190,7 @@ typedef struct SQueryRuntimeEnv { bool groupbyNormalCol; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not bool timeWindowInterpo;// if the time window start/end required interpolation + bool queryWindowIdentical; // all query time windows are identical for all tables in one group int32_t interBufSize; // intermediate buffer sizse int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file @@ -217,7 +218,8 @@ typedef struct SQInfo { STableGroupInfo tableGroupInfo; // table list SArray STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure SQueryRuntimeEnv runtimeEnv; - SArray* arrTableIdInfo; +// SArray* arrTableIdInfo; + SHashObj* arrTableIdInfo; int32_t groupIndex; /* diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6de57b32fdf769e3ff567fcd00da4de91e6d83ef..7e6950a735ca50cd2a9949483e614d1f9c204d66 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -197,6 +197,8 @@ static int32_t checkForQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); static void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type); +static STsdbQueryCond createTsdbQueryCond(SQuery* pQuery); +static STableIdInfo createTableIdInfo(SQuery* pQuery); bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { @@ -2781,6 +2783,7 @@ static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capa pQuery->rec.capacity = capacity; } +// TODO merge with enuserOutputBufferSimple static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; @@ -3916,13 +3919,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI SET_REVERSE_SCAN_FLAG(pRuntimeEnv); - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); setQueryStatus(pQuery, QUERY_NOT_COMPLETED); switchCtxOrder(pRuntimeEnv); @@ -4005,18 +4002,11 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { break; } - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, qstatus.curWindow); - if (pRuntimeEnv->pSecQueryHandle != NULL) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } + STsdbQueryCond cond = createTsdbQueryCond(pQuery); restoreTimeWindow(&pQInfo->tableGroupInfo, &cond); pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); if (pRuntimeEnv->pSecQueryHandle == NULL) { @@ -4541,16 +4531,19 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } - int32_t numOfTables = (int32_t)taosArrayGetSize(pQInfo->arrTableIdInfo); + int32_t numOfTables = taosHashGetSize(pQInfo->arrTableIdInfo); *(int32_t*)data = htonl(numOfTables); data += sizeof(int32_t); - for(int32_t i = 0; i < numOfTables; i++) { - STableIdInfo* pSrc = taosArrayGet(pQInfo->arrTableIdInfo, i); + + STableIdInfo* item = taosHashIterate(pQInfo->arrTableIdInfo, NULL); + while(item) { STableIdInfo* pDst = (STableIdInfo*)data; - pDst->uid = htobe64(pSrc->uid); - pDst->tid = htonl(pSrc->tid); - pDst->key = htobe64(pSrc->key); + pDst->uid = htobe64(item->uid); + pDst->tid = htonl(item->tid); + pDst->key = htobe64(item->key); + data += sizeof(STableIdInfo); + item = taosHashIterate(pQInfo->arrTableIdInfo, item); } // Check if query is completed or not for stable query or normal table query respectively. @@ -4877,13 +4870,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) return TSDB_CODE_SUCCESS; } - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); if (!isSTableQuery && (pQInfo->tableqinfoGroupInfo.numOfTables == 1) @@ -5276,6 +5263,41 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { return true; } +STsdbQueryCond createTsdbQueryCond(SQuery* pQuery) { + STsdbQueryCond cond = { + .colList = pQuery->colList, + .order = pQuery->order.order, + .numOfCols = pQuery->numOfCols, + }; + + TIME_WINDOW_COPY(cond.twindow, pQuery->window); + return cond; +} + +static STableIdInfo createTableIdInfo(SQuery* pQuery) { + assert(pQuery != NULL && pQuery->current != NULL); + + STableIdInfo tidInfo; + STableId* id = TSDB_TABLEID(pQuery->current->pTable); + + tidInfo.uid = id->uid; + tidInfo.tid = id->tid; + tidInfo.key = pQuery->current->lastKey; + + return tidInfo; +} + +static void updateTableIdInfo(SQuery* pQuery, SHashObj* pTableIdInfo) { + STableIdInfo tidInfo = createTableIdInfo(pQuery); + STableIdInfo* idinfo = taosHashGet(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid)); + if (idinfo != NULL) { + assert(idinfo->tid == tidInfo.tid && idinfo->uid == tidInfo.uid); + idinfo->key = tidInfo.key; + } else { + taosHashPut(pTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo)); + } +} + /** * super table query handler * 1. super table projection query, group-by on normal columns query, ts-comp query @@ -5295,18 +5317,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); - - qDebug("QInfo:%p last_row query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, pQInfo->groupIndex, - numOfGroups, group); - - STsdbQueryCond cond = { - .colList = pQuery->colList, - .order = pQuery->order.order, - .numOfCols = pQuery->numOfCols, - }; + SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + qDebug("QInfo:%p point interpolation query on group:%d, total group:%" PRIzu ", current group:%p", pQInfo, + pQInfo->groupIndex, numOfGroups, group); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *tx = taosArrayClone(group); @@ -5330,14 +5345,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { initCtxOutputBuf(pRuntimeEnv); - SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); + SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); taosArrayDestroy(s); // here we simply set the first table as current table - SArray* first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); + SArray *first = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex); pQuery->current = taosArrayGetP(first, 0); scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); @@ -5359,19 +5374,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { break; } } - } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query + } else if (pRuntimeEnv->groupbyNormalCol) { // group-by on normal columns query while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); + SArray *group = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, pQInfo->groupIndex); - qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, numOfGroups); + qDebug("QInfo:%p group by normal columns group:%d, total group:%" PRIzu "", pQInfo, pQInfo->groupIndex, + numOfGroups); - STsdbQueryCond cond = { - .colList = pQuery->colList, - .order = pQuery->order.order, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *tx = taosArrayClone(group); @@ -5394,7 +5404,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { longjmp(pRuntimeEnv->env, terrno); } - SArray* s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); + SArray *s = tsdbGetQueriedTableList(pRuntimeEnv->pQueryHandle); assert(taosArrayGetSize(s) >= 1); setTagVal(pRuntimeEnv, taosArrayGetP(s, 0), pQInfo->tsdb); @@ -5405,24 +5415,24 @@ static void sequentialTableProcess(SQInfo *pQInfo) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - // no results generated for current group, continue to try the next group + // no results generated for current group, continue to try the next group taosArrayDestroy(s); if (pWindowResInfo->size <= 0) { continue; } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns + pWindowResInfo->pResult[i]->closed = true; // enable return all results for group by normal columns SResultRow *pResult = pWindowResInfo->pResult[i]; for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - SResultRowCellInfo* pCell = getResultCell(pRuntimeEnv, pResult, j); + SResultRowCellInfo *pCell = getResultCell(pRuntimeEnv, pResult, j); pResult->numOfRows = (uint16_t)(MAX(pResult->numOfRows, pCell->numOfRes)); } } qDebug("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size, - pQInfo->groupIndex); + pQInfo->groupIndex); int32_t currentGroupIndex = pQInfo->groupIndex; pQuery->rec.rows = 0; @@ -5431,16 +5441,119 @@ static void sequentialTableProcess(SQInfo *pQInfo) { ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size); copyFromWindowResToSData(pQInfo, pWindowResInfo); - pQInfo->groupIndex = currentGroupIndex; //restore the group index + pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); clearClosedTimeWindow(pRuntimeEnv); break; } + } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL) { + //super table projection query with identical query time range for all tables. + SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; + + resetDefaultResInfoOutputBuf(pRuntimeEnv); + + SArray *group = GET_TABLEGROUP(pQInfo, 0); + assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && + 1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList)); + + void *pQueryHandle = pRuntimeEnv->pQueryHandle; + if (pQueryHandle == NULL) { + STsdbQueryCond con = createTsdbQueryCond(pQuery); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &con, &pQInfo->tableGroupInfo, pQInfo, &pQInfo->memRef); + pQueryHandle = pRuntimeEnv->pQueryHandle; + } + + // skip blocks without load the actual data block from file if no filter condition present + // skipBlocks(&pQInfo->runtimeEnv); + // if (pQuery->limit.offset > 0 && pQuery->numOfFilterCols == 0) { + // setQueryStatus(pQuery, QUERY_COMPLETED); + // return; + // } + + bool hasMoreBlock = true; + SQueryCostInfo *summary = &pRuntimeEnv->summary; + while ((hasMoreBlock = tsdbNextDataBlock(pQueryHandle)) == true) { + summary->totalBlocks += 1; + + if (IS_QUERY_KILLED(pQInfo)) { + longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo); + STableQueryInfo **pTableQueryInfo = + (STableQueryInfo **)taosHashGet(pQInfo->tableqinfoGroupInfo.map, &blockInfo.tid, sizeof(blockInfo.tid)); + if (pTableQueryInfo == NULL) { + break; + } + + pQuery->current = *pTableQueryInfo; + if (QUERY_IS_ASC_QUERY(pQuery)) { + assert(((*pTableQueryInfo)->win.skey <= (*pTableQueryInfo)->win.ekey) && + ((*pTableQueryInfo)->lastKey >= (*pTableQueryInfo)->win.skey) && + ((*pTableQueryInfo)->win.skey >= pQuery->window.skey && + (*pTableQueryInfo)->win.ekey <= pQuery->window.ekey)); + } else { + assert(((*pTableQueryInfo)->win.skey >= (*pTableQueryInfo)->win.ekey) && + ((*pTableQueryInfo)->lastKey <= (*pTableQueryInfo)->win.skey) && + ((*pTableQueryInfo)->win.skey <= pQuery->window.skey && + (*pTableQueryInfo)->win.ekey >= pQuery->window.ekey)); + } + + if (pRuntimeEnv->hasTagResults) { + setTagVal(pRuntimeEnv, pQuery->current->pTable, pQInfo->tsdb); + } + + uint32_t status = 0; + SDataStatis *pStatis = NULL; + SArray * pDataBlock = NULL; + + int32_t ret = loadDataBlockOnDemand(pRuntimeEnv, &pQuery->current->windowResInfo, pQueryHandle, &blockInfo, + &pStatis, &pDataBlock, &status); + if (ret != TSDB_CODE_SUCCESS) { + break; + } + + assert(status != BLK_DATA_DISCARD); + ensureOutputBuffer(pRuntimeEnv, &blockInfo); + + pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); + + summary->totalRows += blockInfo.rows; + qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%" PRId64, + GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes, + pQuery->current->lastKey); + + pQuery->rec.rows = getNumOfResult(pRuntimeEnv); + + // the flag may be set by tableApplyFunctionsOnBlock, clear it here + CLEAR_QUERY_STATUS(pQuery, QUERY_COMPLETED); + + updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); + skipResults(pRuntimeEnv); + + // the limitation of output result is reached, set the query completed + if (limitResults(pRuntimeEnv)) { + SET_STABLE_QUERY_OVER(pQInfo); + break; + } + + // while the output buffer is full or limit/offset is applied, query may be paused here + if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL|QUERY_COMPLETED)) { + break; + } + } + + if (!hasMoreBlock) { + setQueryStatus(pQuery, QUERY_COMPLETED); + SET_STABLE_QUERY_OVER(pQInfo); + } } else { /* - * 1. super table projection query, 2. ts-comp query - * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. + * the following two cases handled here. + * 1. ts-comp query, and 2. the super table projection query with different query time range for each table. + * If the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ if (pQInfo->groupIndex > 0) { @@ -5503,14 +5616,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * to ensure that, we can reset the query range once query on a meter is completed. */ pQInfo->tableIndex++; - - STableIdInfo tidInfo = {0}; - - STableId* id = TSDB_TABLEID(pQuery->current->pTable); - tidInfo.uid = id->uid; - tidInfo.tid = id->tid; - tidInfo.key = pQuery->current->lastKey; - taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); + updateTableIdInfo(pQuery, pQInfo->arrTableIdInfo); // if the buffer is full or group by each table, we need to jump out of the loop if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { @@ -5537,31 +5643,31 @@ static void sequentialTableProcess(SQInfo *pQInfo) { if (pQInfo->tableIndex >= pQInfo->tableqinfoGroupInfo.numOfTables) { setQueryStatus(pQuery, QUERY_COMPLETED); } - } - /* - * 1. super table projection query, group-by on normal columns query, ts-comp query - * 2. point interpolation query, last row query - * - * group-by on normal columns query and last_row query do NOT invoke the finalizer here, - * since the finalize stage will be done at the client side. - * - * projection query, point interpolation query do not need the finalizer. - * - * Only the ts-comp query requires the finalizer function to be executed here. - */ - if (isTSCompQuery(pQuery)) { - finalizeQueryResult(pRuntimeEnv); - } + /* + * 1. super table projection query, group-by on normal columns query, ts-comp query + * 2. point interpolation query, last row query + * + * group-by on normal columns query and last_row query do NOT invoke the finalizer here, + * since the finalize stage will be done at the client side. + * + * projection query, point interpolation query do not need the finalizer. + * + * Only the ts-comp query requires the finalizer function to be executed here. + */ + if (isTSCompQuery(pQuery)) { + finalizeQueryResult(pRuntimeEnv); + } - if (pRuntimeEnv->pTSBuf != NULL) { - pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; - } + if (pRuntimeEnv->pTSBuf != NULL) { + pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; + } - qDebug( - "QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 " points returned, total:%" PRId64 ", offset:%" PRId64, - pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, - pQuery->limit.offset); + qDebug("QInfo %p numOfTables:%" PRIu64 ", index:%d, numOfGroups:%" PRIzu ", %" PRId64 + " points returned, total:%" PRId64 ", offset:%" PRId64, + pQInfo, (uint64_t)pQInfo->tableqinfoGroupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, + pQuery->rec.total, pQuery->limit.offset); + } } static void doSaveContext(SQInfo *pQInfo) { @@ -5576,13 +5682,7 @@ static void doSaveContext(SQInfo *pQInfo) { SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); } - STsdbQueryCond cond = { - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - TIME_WINDOW_COPY(cond.twindow, pQuery->window); + STsdbQueryCond cond = createTsdbQueryCond(pQuery); // clean unused handle if (pRuntimeEnv->pSecQueryHandle != NULL) { @@ -5855,13 +5955,8 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) qDebug("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->current->lastKey, pQuery->window.ekey); } else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - STableIdInfo tidInfo; - STableId* id = TSDB_TABLEID(pQuery->current->pTable); - - tidInfo.uid = id->uid; - tidInfo.tid = id->tid; - tidInfo.key = pQuery->current->lastKey; - taosArrayPush(pQInfo->arrTableIdInfo, &tidInfo); + STableIdInfo tidInfo = createTableIdInfo(pQuery); + taosHashPut(pQInfo->arrTableIdInfo, &tidInfo.tid, sizeof(tidInfo.tid), &tidInfo, sizeof(STableIdInfo)); } if (!isTSCompQuery(pQuery)) { @@ -6859,8 +6954,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); } - int tableIndex = 0; - pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery); pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo)); @@ -6882,7 +6975,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou } // NOTE: pTableCheckInfo need to update the query time range and the lastKey info - pQInfo->arrTableIdInfo = taosArrayInit(tableIndex, sizeof(STableIdInfo)); + pQInfo->arrTableIdInfo = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); pQInfo->dataReady = QUERY_RESULT_NOT_READY; pQInfo->rspContext = NULL; pthread_mutex_init(&pQInfo->lock, NULL); @@ -6892,10 +6985,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQuery->window = pQueryMsg->window; changeExecuteScanOrder(pQInfo, pQueryMsg, stableQuery); + pQInfo->runtimeEnv.queryWindowIdentical = true; STimeWindow window = pQuery->window; int32_t index = 0; - for(int32_t i = 0; i < numOfGroups; ++i) { SArray* pa = taosArrayGetP(pQInfo->tableGroupInfo.pGroupList, i); @@ -6910,9 +7003,12 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou for(int32_t j = 0; j < s; ++j) { STableKeyInfo* info = taosArrayGet(pa, j); - void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); - window.skey = info->lastKey; + if (info->lastKey != pQuery->window.skey) { + pQInfo->runtimeEnv.queryWindowIdentical = false; + } + + void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo); STableQueryInfo* item = createTableQueryInfo(&pQInfo->runtimeEnv, info->pTable, window, buf); if (item == NULL) { goto _cleanup; @@ -7126,7 +7222,7 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo->pBuf); tsdbDestroyTableGroup(&pQInfo->tableGroupInfo); - taosArrayDestroy(pQInfo->arrTableIdInfo); + taosHashCleanup(pQInfo->arrTableIdInfo); pQInfo->signature = 0; @@ -7506,7 +7602,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co size_t size = getResultSize(pQInfo, &pQuery->rec.rows); size += sizeof(int32_t); - size += sizeof(STableIdInfo) * taosArrayGetSize(pQInfo->arrTableIdInfo); + size += sizeof(STableIdInfo) * taosHashGetSize(pQInfo->arrTableIdInfo); *contLen = (int32_t)(size + sizeof(SRetrieveTableRsp)); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 2c41d9bfc5c508731cd914936397e7e60513e8f0..f5f2da9f2533dbc98391450db00125afc1c7bcc3 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -83,8 +83,6 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR return; } -// assert(pWindowResInfo->size == 1); - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SResultRow *pWindowRes = pWindowResInfo->pResult[i]; clearResultRow(pRuntimeEnv, pWindowRes, pWindowResInfo->type); diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 3ac54eedd87c11a68c6fb73fda0e0ac6c5056a51..0e3e0d3e2437f4595179c880a37642e047eb4331 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -151,8 +151,9 @@ static void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode); */ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) { - if (capacity == 0 || fn == NULL) { - return NULL; + assert(fn != NULL); + if (capacity == 0) { + capacity = 4; } SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj));