From f87322f6802ffa453ce9a4f12d14f1888929b4c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Jun 2020 11:06:59 +0800 Subject: [PATCH] [td-225] fix bugs in group by normal columns --- src/client/src/tscFunctionImpl.c | 13 +- src/client/src/tscSQLParser.c | 27 ++- src/client/src/tscServer.c | 4 +- src/query/src/qExecutor.c | 224 +++++++++++++----- src/query/src/qUtil.c | 12 +- tests/script/general/parser/groupby.sim | 2 +- .../script/general/parser/import_commit3.sim | 2 +- 7 files changed, 213 insertions(+), 71 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index a0deaa519a..457e187971 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -699,7 +699,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, } static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { - if (pCtx->order == TSDB_ORDER_ASC) { + if (pCtx->order != pCtx->param[0].i64Key) { return BLK_DATA_NO_NEEDED; } @@ -727,7 +727,7 @@ static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY } static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) { - if (pCtx->order == TSDB_ORDER_ASC) { + if (pCtx->order != pCtx->param[0].i64Key) { return BLK_DATA_NO_NEEDED; } @@ -1593,7 +1593,7 @@ static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } - + if (pCtx->order == TSDB_ORDER_DESC) { return; } @@ -1652,7 +1652,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) { * least one data in this block that is not null.(TODO opt for this case) */ static void last_function(SQLFunctionCtx *pCtx) { - if (pCtx->order == TSDB_ORDER_ASC) { + if (pCtx->order != pCtx->param[0].i64Key) { return; } @@ -1681,7 +1681,6 @@ static void last_function(SQLFunctionCtx *pCtx) { } static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) { - assert(pCtx->order != TSDB_ORDER_ASC); void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; @@ -1725,7 +1724,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) { * 1. for scan data in asc order, no need to check data * 2. for data blocks that are not loaded, no need to check data */ - if (pCtx->order == TSDB_ORDER_ASC) { + if (pCtx->order != pCtx->param[0].i64Key) { return; } @@ -1763,7 +1762,7 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) { * 1. for scan data in asc order, no need to check data * 2. for data blocks that are not loaded, no need to check data */ - if (pCtx->order == TSDB_ORDER_ASC) { + if (pCtx->order != pCtx->param[0].i64Key) { return; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 60415a8d74..9db81054a9 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1452,6 +1452,13 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema, SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false); tstrncpy(pExpr->aliasName, columnName, sizeof(pExpr->aliasName)); + + // set reverse order scan data blocks for last query + if (functionID == TSDB_FUNC_LAST) { + pExpr->numOfParams = 1; + pExpr->param[0].i64Key = TSDB_ORDER_DESC; + pExpr->param[0].nType = TSDB_DATA_TYPE_INT; + } // for all queries, the timestamp column needs to be loaded SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; @@ -1724,6 +1731,22 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIndex + i, &index) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } + + if (optr == TK_LAST) { // todo refactor + SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr; + if (pGroupBy->numOfGroupCols > 0) { + for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) { + SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k); + if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { // group by normal columns + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, colIndex + i); + pExpr->numOfParams = 1; + pExpr->param->i64Key = TSDB_ORDER_ASC; + + break; + } + } + } + } } } @@ -2586,9 +2609,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd* tscColumnListInsert(pQueryInfo->colList, &index); - SColIndex colIndex = { - .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId, - }; + SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId }; taosArrayPush(pGroupExpr->columnInfo, &colIndex); pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index cb6a736121..ce174c2473 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -430,7 +430,7 @@ void tscKillSTableQuery(SSqlObj *pSql) { /* * 1. if the subqueries are not launched or partially launched, we need to waiting the launched * query return to successfully free allocated resources. - * 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage, + * 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage, * set the res.code, and return. */ const int64_t MAX_WAITING_TIME = 10000; // 10 Sec. @@ -2200,7 +2200,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { * The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache * instead. */ - tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name); + tscTrace("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name); taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true); if (pTableMetaInfo->pTableMeta) { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index aa602ed661..13b4511177 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -354,7 +354,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin int16_t bytes) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t *p1 = (int32_t *)taosHashGet(pWindowResInfo->hashList, pData, bytes); + int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes); if (p1 != NULL) { pWindowResInfo->curIndex = *p1; } else { // more than the capacity, reallocate the resources @@ -919,12 +919,25 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; + int64_t v = -1; + // not assign result buffer yet, add new result buffer + switch(type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break; + case TSDB_DATA_TYPE_SMALLINT: v = GET_INT16_VAL(pData); break; + case TSDB_DATA_TYPE_INT: v = GET_INT32_VAL(pData); break; + case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break; + } + +// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2); SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); if (pWindowRes == NULL) { return -1; } - // not assign result buffer yet, add new result buffer + pWindowRes->window.skey = v; + pWindowRes->window.ekey = v; + if (pWindowRes->pos.pageId == -1) { int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage); if (ret != 0) { @@ -1022,12 +1035,16 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx return false; } - if (functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST) { - return !QUERY_IS_ASC_QUERY(pQuery); - } else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) { + if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) { return QUERY_IS_ASC_QUERY(pQuery); } - + + // todo add comments + if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) { + return pCtx->param[0].i64Key == pQuery->order.order; +// return !QUERY_IS_ASC_QUERY(pQuery); + } + // in the supplementary scan, only the following functions need to be executed if (IS_REVERSE_SCAN(pRuntimeEnv)) { return false; @@ -1079,7 +1096,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS int32_t j = 0; int32_t offset = -1; - + for (j = 0; j < pDataBlockInfo->rows; ++j) { offset = GET_COL_DATA_POS(pQuery, j, step); @@ -1478,6 +1495,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { } static bool isQueryKilled(SQInfo *pQInfo) { + return false; return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED); #if 0 /* @@ -1574,10 +1592,14 @@ static bool needReverseScan(SQuery *pQuery) { continue; } - if (((functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) && QUERY_IS_ASC_QUERY(pQuery)) || - ((functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_FIRST_DST) && !QUERY_IS_ASC_QUERY(pQuery))) { + if ((functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_FIRST_DST) && !QUERY_IS_ASC_QUERY(pQuery)) { return true; } + + if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) { + int32_t order = pQuery->pSelectExpr[i].base.arg->argValue.i64; + return order != pQuery->order.order; + } } return false; @@ -2030,6 +2052,34 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { return midPos; } +static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capacity) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + if (capacity < pQuery->rec.capacity) { + return; + } + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t bytes = pQuery->pSelectExpr[i].bytes; + assert(bytes > 0 && capacity > 0); + + char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage)); + if (tmp == NULL) { // todo handle the oom + assert(0); + } else { + pQuery->sdata[i] = (tFilePage *)tmp; + } + + // set the pCtx output buffer position + pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data; + } + + qTrace("QInfo:%p realloc output buffer to inc output buffer from: %d rows to:%d rows", GET_QINFO_ADDR(pRuntimeEnv), + pQuery->rec.capacity, capacity); + + pQuery->rec.capacity = capacity; +} + static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) { // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block SQuery* pQuery = pRuntimeEnv->pQuery; @@ -2916,8 +2966,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) { pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf; } } - - + updateNumOfResult(pRuntimeEnv, pQuery->rec.rows); } } @@ -3054,7 +3103,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus pQuery->window = pTableQueryInfo->win; } -void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { +void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) { SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; STableQueryInfo *pTableQueryInfo = pQuery->current; @@ -3496,18 +3545,32 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { assert(pQuery->rec.rows <= pQuery->rec.capacity); } -static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { +static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; // update the number of result for each, only update the number of rows for the corresponding window result. if (pQuery->intervalTime == 0) { - int32_t g = pTableQueryInfo->groupIndex; - assert(pRuntimeEnv->windowResInfo.size > 0); - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g)); - if (pWindowRes->numOfRows == 0) { - pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv); + for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) { + SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i]; + + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + int32_t functionId = pRuntimeEnv->pCtx[j].functionId; + if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) { + continue; + } + + pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes); + } } + +// int32_t g = pTableQueryInfo->groupIndex; +// assert(pRuntimeEnv->windowResInfo.size > 0); +// +// SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g)); +// if (pWindowRes->numOfRows == 0) { +// pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv); +// } } } @@ -4081,21 +4144,22 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) { SDataStatis *pStatis = NULL; SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); - if (!isIntervalQuery(pQuery)) { - int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; - setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step); - } else { // interval query - TSKEY nextKey = blockInfo.window.skey; - setIntervalQueryRange(pQInfo, nextKey); - /*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); + if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (!isIntervalQuery(pQuery)) { + int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1; + setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step); + } else { // interval query + TSKEY nextKey = blockInfo.window.skey; + setIntervalQueryRange(pQInfo, nextKey); + /*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo); + } } summary->totalRows += blockInfo.rows; stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey); qTrace("QInfo:%p check data block, uid:%"PRId64", tid:%d, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, lastKey:%" PRId64, - GET_QINFO_ADDR(pRuntimeEnv), blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, - blockInfo.rows, pQuery->current->lastKey); + pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey); } int64_t et = taosGetTimestampMs(); @@ -4220,7 +4284,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // here we simply set the first table as current table pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info; - scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); + scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); int64_t numOfRes = getNumOfResult(pRuntimeEnv); if (numOfRes > 0) { @@ -4233,10 +4297,84 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); + + if (pQuery->rec.rows >= pQuery->rec.capacity) { + setQueryStatus(pQuery, QUERY_RESBUF_FULL); + break; + } + } + } else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query + while (pQInfo->groupIndex < numOfGroups) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); + + qTrace("QInfo:%p group by normal columns group:%d, total group:%d", pQInfo, pQInfo->groupIndex, numOfGroups); + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .colList = pQuery->colList, + .order = pQuery->order.order, + .numOfCols = pQuery->numOfCols, + }; + + SArray *g1 = taosArrayInit(1, POINTER_BYTES); + SArray *tx = taosArrayClone(group); + taosArrayPush(g1, &tx); + + STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1}; + + // include only current table + if (pRuntimeEnv->pQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); + pRuntimeEnv->pQueryHandle = NULL; + } + + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo); + + SArray* s = tsdbGetQueriedTableIdList(pRuntimeEnv->pQueryHandle); + assert(taosArrayGetSize(s) >= 1); + + setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb); + + // here we simply set the first table as current table + scanMultiTableDataBlocks(pQInfo); + pQInfo->groupIndex += 1; + + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + + // no results generated for current group, continue to try the next group + if (pWindowResInfo->size <= 0) { + continue; + } + + for (int32_t i = 0; i < pWindowResInfo->size; ++i) { + SWindowStatus *pStatus = &pWindowResInfo->pResult[i].status; + pStatus->closed = true; // enable return all results for group by normal columns + + SWindowResult *pResult = &pWindowResInfo->pResult[i]; + for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { + pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes); + } + } + + qTrace("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size, + pQInfo->groupIndex); + int32_t currentGroupIndex = pQInfo->groupIndex; + + pQuery->rec.rows = 0; + pQInfo->groupIndex = 0; + + ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size); + copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); + + pQInfo->groupIndex = currentGroupIndex; //restore the group index + assert(pQuery->rec.rows == pWindowResInfo->size); + + clearClosedTimeWindow(pRuntimeEnv); + break; } } else { /* - * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query + * 1. super table projection query, 2. ts-comp query * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ @@ -4283,7 +4421,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); + scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); skipResults(pRuntimeEnv); // the limitation of output result is reached, set the query completed @@ -4349,25 +4487,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur; } - // todo refactor - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = &pWindowResInfo->pResult[i].status; - pStatus->closed = true; // enable return all results for group by normal columns - - SWindowResult *pResult = &pWindowResInfo->pResult[i]; - for (int32_t j = 0; j < pQuery->numOfOutput; ++j) { - pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes); - } - } - - pQInfo->groupIndex = 0; - pQuery->rec.rows = 0; - copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); - } - qTrace( "QInfo %p numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%"PRId64", offset:%" PRId64, pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total, @@ -4449,7 +4568,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { */ if (isIntervalQuery(pQuery)) { copyResToQueryResultBuf(pQInfo, pQuery); - #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num); #endif @@ -4527,7 +4645,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) pQuery->current = pTableInfo; // set current query table info - scanAllDataBlocks(pRuntimeEnv, pTableInfo->lastKey); + scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey); finalizeQueryResult(pRuntimeEnv); if (isQueryKilled(pQInfo)) { @@ -4560,7 +4678,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) } while (1) { - scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey); + scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey); finalizeQueryResult(pRuntimeEnv); if (isQueryKilled(pQInfo)) { @@ -4607,7 +4725,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) SQuery *pQuery = pRuntimeEnv->pQuery; while (1) { - scanAllDataBlocks(pRuntimeEnv, start); + scanOneTableDataBlocks(pRuntimeEnv, start); if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { return; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 143d86d5db..aa5550efcb 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -113,7 +113,9 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { for (int32_t i = 0; i < num; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->status.closed) { // remove the window slot from hash table - taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); + taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type); + printf("remove ============>%ld, remain size:%ld\n", pResult->window.skey, pWindowResInfo->hashList->size); + } else { break; } @@ -133,14 +135,16 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { } pWindowResInfo->size = remain; - + printf("---------------size:%ld\n", taosHashGetSize(pWindowResInfo->hashList)); for (int32_t k = 0; k < pWindowResInfo->size; ++k) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; - int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); + int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, + tDataTypeDesc[pWindowResInfo->type].nSize); int32_t v = (*p - num); assert(v >= 0 && v <= pWindowResInfo->size); - taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t)); + taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, tDataTypeDesc[pWindowResInfo->type].nSize, + (char *)&v, sizeof(int32_t)); } pWindowResInfo->curIndex = -1; diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index 513b3cbbbe..5d785a2fc3 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -355,7 +355,7 @@ if $data00 != 0 then return -1 endi -if $data01 != 800 then +if $data11 != 800 then return -1 endi diff --git a/tests/script/general/parser/import_commit3.sim b/tests/script/general/parser/import_commit3.sim index 916bf6d05e..99ece98278 100644 --- a/tests/script/general/parser/import_commit3.sim +++ b/tests/script/general/parser/import_commit3.sim @@ -25,7 +25,7 @@ step1: sql create database $db cache 16 print ====== create tables sql use $db - +sql reset query cache $i = 0 $ts = $ts0 $tb = $tbPrefix . $i -- GitLab