From bc39673ca6aa2137111ab556dd903db6e8782a8f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 16 Apr 2021 14:48:41 +0800 Subject: [PATCH] [td-225] refactor code. --- src/client/inc/tscUtil.h | 7 +- src/client/inc/tsclient.h | 6 +- src/client/src/tscAsync.c | 2 +- src/client/src/tscLocalMerge.c | 377 ++++++++++++------ src/client/src/tscSQLParser.c | 128 +++--- src/client/src/tscServer.c | 26 +- src/client/src/tscSubquery.c | 45 +-- src/client/src/tscUtil.c | 73 ++-- src/query/inc/qExecutor.h | 14 +- src/query/inc/qExtbuffer.h | 2 +- src/query/src/qAggMain.c | 22 +- src/query/src/qExecutor.c | 230 ++++++++--- src/query/src/qExtbuffer.c | 7 +- src/query/src/qFill.c | 47 ++- src/query/src/qPlan.c | 11 +- src/query/src/queryMain.c | 3 +- tests/script/general/parser/limit2_query.sim | 3 + .../general/parser/select_with_tags.sim | 46 ++- 18 files changed, 681 insertions(+), 368 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 31ba3db88f..919e05ddd0 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -199,7 +199,7 @@ SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnI SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type, int16_t size); size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo); -void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex); +void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid); SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index); int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy); @@ -207,9 +207,8 @@ void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src); void tscSqlExprInfoDestroy(SArray* pExprInfo); SColumn* tscColumnClone(const SColumn* src); -bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex); -SColumn* tscColumnListInsert(SArray* pColList, SColumnIndex* colIndex, SSchema* pSchema); -SArray* tscColumnListClone(const SArray* src, int16_t tableIndex); +bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid); +SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema); void tscColumnListDestroy(SArray* pColList); void tscDequoteAndTrimToken(SStrToken* pToken); diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7b66a34c80..2ebba3a492 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -115,8 +115,10 @@ typedef struct SFieldInfo { } SFieldInfo; typedef struct SColumn { - SColumnIndex colIndex; - SColumnInfo info; + uint64_t tableUid; + int32_t columnIndex; +// SColumnIndex colIndex; + SColumnInfo info; } SColumn; typedef struct SCond { diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 129bd425f7..5bf7e280af 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -344,7 +344,7 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM // validate the table columns information for (int32_t i = 0; i < taosArrayGetSize(pQueryInfo->colList); ++i) { SColumn *pCol = taosArrayGetP(pQueryInfo->colList, i); - if (pCol->colIndex.columnIndex >= numOfCols) { + if (pCol->columnIndex >= numOfCols) { return pSql->retryReason; } } diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 198bf797d5..59d1cf7a43 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -30,7 +30,7 @@ typedef struct SCompareParam { int32_t groupOrderType; } SCompareParam; -bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index, char **buf); +bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndex, int32_t index, char **buf); int32_t treeComparator(const void *pLeft, const void *pRight, void *param) { int32_t pLeftIdx = *(int32_t *)pLeft; @@ -598,11 +598,19 @@ static int32_t createOrderDescriptor(tOrderDescriptor **pOrderDesc, SSqlCmd *pCm if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { int32_t numOfInternalOutput = (int32_t) tscSqlExprNumOfExprs(pQueryInfo); - int32_t startCols = numOfInternalOutput - pQueryInfo->groupbyExpr.numOfGroupCols; // the last "pQueryInfo->groupbyExpr.numOfGroupCols" columns are order-by columns for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { - orderColIndexList[i] = startCols++; + SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i); + for(int32_t j = 0; j < numOfInternalOutput; ++j) { + SExprInfo* pExprInfo = tscSqlExprGet(pQueryInfo, j); + + int32_t functionId = pExprInfo->base.functionId; + if (pColIndex->colId == pExprInfo->base.colInfo.colId && (functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TAG)) { + orderColIndexList[i] = j; + break; + } + } } if (pQueryInfo->interval.interval != 0) { @@ -1134,6 +1142,26 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc (*hasPrev) = true; } +static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) { + if (numOfRows <= 1) { + return ; + } + + for (int32_t k = 0; k < numOfOutput; ++k) { + if (pCtx[k].functionId != TSDB_FUNC_TAG) { + continue; + } + + int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result + char* src = pCtx[k].pOutput; + + for (int32_t i = 0; i < inc; ++i) { + pCtx[k].pOutput += pCtx[k].outputBytes; + memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes); + } + } +} + static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) { SMultiwayMergeInfo* pInfo = pOperator->info; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; @@ -1141,79 +1169,68 @@ static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, S char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { add[i] = pCtx[i].pInput; + pCtx[i].size = 1; } for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { - if (needToMergeRv(pBlock, pInfo->pMerge, i, pInfo->prevRow)) { + if (needToMergeRv(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { + for (int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; + } + for (int32_t j = 0; j < numOfExpr; ++j) { int32_t functionId = pCtx[j].functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } - - pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } } else { - for(int32_t j = 0; j < numOfExpr; ++j) { + for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor int32_t functionId = pCtx[j].functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } - - pCtx[j].size = 1; aAggs[functionId].xFinalize(&pCtx[j]); } int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); - pInfo->binfo.pRes->info.rows += numOfRows; - - if (i == 0) { - for(int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pOutput += pCtx[j].outputBytes; - aAggs[pCtx[j].functionId].init(&pCtx[j]); - } + setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - pCtx[j].size = 1; - aAggs[functionId].mergeFunc(&pCtx[j]); - } - } + pInfo->binfo.pRes->info.rows += numOfRows; for(int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pOutput += pCtx[j].outputBytes; - pCtx[j].pInput += pCtx[j].inputBytes; - + pCtx[j].pOutput += (pCtx[j].outputBytes * numOfRows); aAggs[pCtx[j].functionId].init(&pCtx[j]); } + for (int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; + } + for (int32_t j = 0; j < numOfExpr; ++j) { int32_t functionId = pCtx[j].functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } - - pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } } } else { + for (int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; + } + for (int32_t j = 0; j < numOfExpr; ++j) { int32_t functionId = pCtx[j].functionId; if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { continue; } - - pCtx[j].size = 1; aAggs[functionId].mergeFunc(&pCtx[j]); } } + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); } @@ -1330,17 +1347,12 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *t return (ret == 0); } -bool needToMergeRv(SSDataBlock* pBlock, SLocalMerger *pLocalMerge, int32_t index, char **buf) { +bool needToMergeRv(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { int32_t ret = 0; - tOrderDescriptor *pDesc = pLocalMerge->pDesc; - if (pDesc->orderInfo.numOfCols > 0) { -// if (pDesc->tsOrder == TSDB_ORDER_ASC) { // asc - ret = compare_aRv(pBlock, pDesc->orderInfo.colIndex, pDesc->orderInfo.numOfCols, index, buf, TSDB_ORDER_ASC); -// } else { // desc -// ret = compare_d(pLocalMerge->pDesc, 1, 0, pLocalMerge->prevRowOfInput, 1, 0, tmpBuffer->data); -// } - } - + size_t size = taosArrayGetSize(columnIndexList); + if (size > 0) { + ret = compare_aRv(pBlock, columnIndexList, size, index, buf, TSDB_ORDER_ASC); + } // if ret == 0, means the result belongs to the same group return (ret == 0); } @@ -1429,24 +1441,6 @@ bool genFinalResults(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurren return true; } -bool genFinalResultsRv(SSqlObj *pSql, SLocalMerger *pLocalMerge, bool noMoreCurrentGroupRes) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SQueryInfo * pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex); - tFilePage * pResBuf = pLocalMerge->pResultBuf; - SColumnModel *pModel = pLocalMerge->resColModel; - - pRes->code = TSDB_CODE_SUCCESS; - - tColModelCompact(pModel, pResBuf, pModel->capacity); - - // no interval query, no fill operation - genFinalResWithoutFill(pRes, pLocalMerge, pQueryInfo); - - return true; -} - void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge) {// reset output buffer to the beginning size_t t = tscSqlExprNumOfExprs(pQueryInfo); for (int32_t i = 0; i < t; ++i) { @@ -1844,7 +1838,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, char *buf, SColumnModel pBlock->info.rows += 1; } -SSDataBlock* doMultiwaySort(void* param) { +SSDataBlock* doMultiwaySort(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1891,6 +1885,7 @@ SSDataBlock* doMultiwaySort(void* param) { continue; } else { sameGroup = false; + *newgroup = true; break; } } @@ -1957,7 +1952,7 @@ static bool isSameGroupRv(SArray* orderColumnList, SSDataBlock* pBlock, char** d return true; } -SSDataBlock* doGlobalAggregate(void* param) { +SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1966,6 +1961,7 @@ SSDataBlock* doGlobalAggregate(void* param) { SMultiwayMergeInfo *pAggInfo = pOperator->info; SOperatorInfo *upstream = pOperator->upstream; + *newgroup = false; bool handleData = false; pAggInfo->binfo.pRes->info.rows = 0; @@ -1991,19 +1987,23 @@ SSDataBlock* doGlobalAggregate(void* param) { pAggInfo->pExistBlock = NULL; pAggInfo->hasDataBlockForNewGroup = false; handleData = true; + *newgroup = true; } } SSDataBlock* pBlock = NULL; while(1) { - pBlock = upstream->exec(upstream); + bool prev = *newgroup; + pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { + *newgroup = prev; break; } if (pAggInfo->hasGroupColData) { bool sameGroup = isSameGroupRv(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); if (!sameGroup) { + *newgroup = true; pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; savePrevOrderColumns(pAggInfo->prevRow, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasPrev); @@ -2014,7 +2014,7 @@ SSDataBlock* doGlobalAggregate(void* param) { // not belongs to the same group, return the result of current group setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); - // handle the output buffer problem + // todo: it may be overflow handle the output buffer problem updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows); doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock); @@ -2034,12 +2034,82 @@ SSDataBlock* doGlobalAggregate(void* param) { int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); pAggInfo->binfo.pRes->info.rows += numOfRows; + + setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows); + } + + SSDataBlock* pRes = pAggInfo->binfo.pRes; + { + SColumnInfoData* pInfoData = taosArrayGet(pRes->pDataBlock, 0); + + if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pRes->info.rows > 0) { + STimeWindow* w = &pRes->info.window; + w->skey = *(int64_t*)pInfoData->pData; + w->ekey = *(int64_t*)(((char*)pInfoData->pData) + TSDB_KEYSIZE * (pRes->info.rows - 1)); + } + } + + return (pRes->info.rows != 0)? pRes:NULL; +} + +static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { + SSLimitOperatorInfo *pInfo = pOperator->info; + assert(pInfo->currentGroupOffset >= 0); + + SSDataBlock* pBlock = NULL; + if (pInfo->currentGroupOffset == 0) { + pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + } + + if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { + while ((*newgroup) == false) { // ignore the remain blocks + pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; + } + } + } + + return pBlock; } - return (pAggInfo->binfo.pRes->info.rows != 0)? pAggInfo->binfo.pRes:NULL; + pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + while(1) { + if (*newgroup) { + pInfo->currentGroupOffset -= 1; + *newgroup = false; + } + + while ((*newgroup) == false) { + pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return NULL; + } + } + + // now we have got the first data block of the next group. + if (pInfo->currentGroupOffset == 0) { + return pBlock; + } + } + + return NULL; } -SSDataBlock* doSLimit(void* param) { +SSDataBlock* doSLimit(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2049,97 +2119,138 @@ SSDataBlock* doSLimit(void* param) { SSDataBlock* pBlock = NULL; while (1) { - pBlock = pOperator->upstream->exec(pOperator->upstream); + pBlock = skipGroupBlock(pOperator, newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; return NULL; } - if (!pInfo->hasPrev) { - savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); - } else { - bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); - if (!sameGroup) { // reset info for new group data - pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group - pInfo->rowsTotal = 0; - } + if (*newgroup) { // a new group arrives + pInfo->groupTotal += 1; + pInfo->rowsTotal = 0; + pInfo->currentOffset = pInfo->limit.offset; } - if (pInfo->currentGroupOffset == 0) { - if (pInfo->currentOffset == 0) { // TODO refactor + assert(pInfo->currentGroupOffset == 0); + + if (pInfo->currentOffset >= pBlock->info.rows) { + pInfo->currentOffset -= pBlock->info.rows; + } else { + if (pInfo->currentOffset == 0) { break; - } else if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; + } - // move the remain rows of this data block to the front. - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); + pBlock->info.rows = remain; - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); - } + // move the remain rows of this data block to the front. + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - pInfo->currentOffset = 0; - break; + int16_t bytes = pColInfoData->info.bytes; + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); } - } else { - if (pInfo->hasPrev) { - // Check if current data block belongs to current result group or not - bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); - if (sameGroup) { - continue; // ignore the data block of the same group and try next - } else { - //update the group column data by using the current group. - savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); - - pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group - pInfo->rowsTotal = 0; - - if ((--pInfo->currentGroupOffset) == 0) { - if (pInfo->currentOffset == 0) { // TODO refactor - break; - } else if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; - - // move the remain rows of this data block to the front. - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); - } - pInfo->currentOffset = 0; - break; - } - } - } - } else { + pInfo->currentOffset = 0; + break; + } + } + + /* + if (!pInfo->hasPrev) { + pInfo->groupTotal = 1; + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); + } else { + bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); + if (!sameGroup) { // reset info for new group data + pInfo->rowsTotal = 0; + pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); + } else { // data in current group has reached the limit, ignore the remain data of this group + if (pInfo->limit.limit > 0 && (pInfo->rowsTotal >= pInfo->limit.limit)) { + continue; + } } } - } +*/ +// if (pInfo->currentGroupOffset == 0) { +// if (pInfo->currentOffset == 0) { // TODO refactor +// break; +// } else if (pInfo->currentOffset >= pBlock->info.rows) { +// pInfo->currentOffset -= pBlock->info.rows; +// } else { +// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); +// pBlock->info.rows = remain; +// +// // move the remain rows of this data block to the front. +// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { +// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); +// +// int16_t bytes = pColInfoData->info.bytes; +// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); +// } +// +// pInfo->currentOffset = 0; +// break; +// } +// } else { +// if (pInfo->hasPrev) { +// // Check if current data block belongs to current result group or not +// bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); +// if (sameGroup) { +// continue; // ignore the data block of the same group and try next +// } else { +// //update the group column data by using the current group. +// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); +// +// pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group +// pInfo->rowsTotal = 0; +// +// if ((--pInfo->currentGroupOffset) == 0) { +// if (pInfo->currentOffset == 0) { // TODO refactor +// break; +// } else if (pInfo->currentOffset >= pBlock->info.rows) { +// pInfo->currentOffset -= pBlock->info.rows; +// } else { +// int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); +// pBlock->info.rows = remain; +// +// // move the remain rows of this data block to the front. +// for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { +// SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); +// +// int16_t bytes = pColInfoData->info.bytes; +// memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); +// } +// +// pInfo->currentOffset = 0; +// break; +// } +// } +// } +// } else { +// savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); +// } +// } +// } - if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) { - pInfo->groupTotal += 1; - if (pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort +// if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) { +// pInfo->groupTotal += 1; + if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort return NULL; } - } +// } if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); pInfo->rowsTotal = pInfo->limit.limit; - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; + if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) { + pOperator->status = OP_EXEC_DONE; + } + +// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); } else { pInfo->rowsTotal += pBlock->info.rows; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index a80a23240a..5aabd94428 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1399,10 +1399,9 @@ int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStr return (totalLen < TSDB_TABLE_FNAME_LEN) ? TSDB_CODE_SUCCESS : TSDB_CODE_TSC_INVALID_SQL; } -void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, SColumnIndex* pIndex) { - SColumnIndex tsCol = {.tableIndex = pIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; +void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t tableUid) { SSchema s = {.type = TSDB_DATA_TYPE_TIMESTAMP, .bytes = TSDB_KEYSIZE, .colId = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscColumnListInsert(pQueryInfo->colList, &tsCol, &s); + tscColumnListInsert(pQueryInfo->colList, PRIMARYKEY_TIMESTAMP_COL_INDEX, tableUid, &s); } static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t exprIndex, tSqlExprItem* pItem) { @@ -1482,7 +1481,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t insertResultField(pQueryInfo, exprIndex, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->base.aliasName, pExpr); // add ts column - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid); tbufCloseWriter(&bw); taosArrayDestroy(colList); @@ -1706,15 +1705,16 @@ int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnLi int8_t type, char* fieldName, SExprInfo* pSqlExpr) { for (int32_t i = 0; i < pColList->num; ++i) { int32_t tableIndex = pColList->ids[i].tableIndex; - STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[tableIndex]; - - int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta); + STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[tableIndex]->pTableMeta; + + int32_t numOfCols = tscGetNumOfColumns(pTableMeta); if (pColList->ids[i].columnIndex >= numOfCols) { continue; } - SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); - tscColumnListInsert(pQueryInfo->colList, &pColList->ids[i], &pSchema[pColList->ids[i].columnIndex]); + uint64_t uid = pTableMeta->id.uid; + SSchema* pSchema = tscGetTableSchema(pTableMeta); + tscColumnListInsert(pQueryInfo->colList, pColList->ids[i].columnIndex, uid, &pSchema[pColList->ids[i].columnIndex]); } TAOS_FIELD f = tscCreateField(type, fieldName, bytes); @@ -1736,7 +1736,7 @@ SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int32_t tab if (functionId == TSDB_FUNC_TAGPRJ) { index.columnIndex = colIndex - tscGetNumOfColumns(pTableMeta); - tscColumnListInsert(pTableMetaInfo->tagColList, &index, pSchema); + tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pSchema); } else { index.columnIndex = colIndex; } @@ -1765,7 +1765,7 @@ SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColInd STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, pIndex->tableIndex); if (TSDB_COL_IS_TAG(flag)) { - tscColumnListInsert(pTableMetaInfo->tagColList, pIndex, pColSchema); + tscColumnListInsert(pTableMetaInfo->tagColList, pIndex->columnIndex, pTableMetaInfo->pTableMeta->id.uid, pColSchema); } return pExpr; @@ -1828,8 +1828,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t } // add the primary timestamp column even though it is not required by user - if (pQueryInfo->pTableMetaInfo[index.tableIndex]->pTableMeta->tableType != TSDB_TEMP_TABLE) { - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + STableMeta* pTableMeta = pQueryInfo->pTableMetaInfo[index.tableIndex]->pTableMeta; + if (pTableMeta->tableType != TSDB_TEMP_TABLE) { + tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMeta->id.uid); } } else if (optr == TK_STRING || optr == TK_INTEGER || optr == TK_FLOAT) { // simple column projection query SColumnIndex index = COLUMN_INDEX_INITIALIZER; @@ -1871,7 +1872,8 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t } // add the primary timestamp column even though it is not required by user - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); + tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); } else { return TSDB_CODE_TSC_INVALID_SQL; } @@ -1916,16 +1918,15 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS } // for all queries, the timestamp column needs to be loaded - SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; SSchema s = {.colId = PRIMARYKEY_TIMESTAMP_COL_INDEX, .bytes = TSDB_KEYSIZE, .type = TSDB_DATA_TYPE_TIMESTAMP,}; - tscColumnListInsert(pQueryInfo->colList, &index, &s); + tscColumnListInsert(pQueryInfo->colList, PRIMARYKEY_TIMESTAMP_COL_INDEX, pExpr->base.uid, &s); // if it is not in the final result, do not add it SColumnList ids = createColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); if (finalResult) { insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->base.aliasName, pExpr); } else { - tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]), pSchema); + tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); } return TSDB_CODE_SUCCESS; @@ -2065,13 +2066,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else { for (int32_t i = 0; i < list.num; ++i) { SSchema* ps = tscGetTableSchema(pTableMetaInfo->pTableMeta); - tscColumnListInsert(pQueryInfo->colList, &list.ids[i], &ps[list.ids[i].columnIndex]); + tscColumnListInsert(pQueryInfo->colList, list.ids[i].columnIndex, pTableMetaInfo->pTableMeta->id.uid, + &ps[list.ids[i].columnIndex]); } } // the time stamp may be always needed if (index.tableIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); } return TSDB_CODE_SUCCESS; @@ -2176,10 +2178,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col pExpr->base.aliasName, pExpr); } else { assert(ids.num == 1); - tscColumnListInsert(pQueryInfo->colList, &(ids.ids[0]), pSchema); + tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); } - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + tscInsertPrimaryTsSourceColumn(pQueryInfo, pExpr->base.uid); return TSDB_CODE_SUCCESS; } case TSDB_FUNC_FIRST: @@ -2338,7 +2340,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col int8_t resultType = pSchema->type; int16_t resultSize = pSchema->bytes; - char val[8] = {0}; + char val[8] = {0}; SExprInfo* pExpr = NULL; if (functionId == TSDB_FUNC_PERCT || functionId == TSDB_FUNC_APERCT) { @@ -2357,7 +2359,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col * for dp = 0, it is actually min, * for dp = 100, it is max, */ - tscInsertPrimaryTsSourceColumn(pQueryInfo, &index); + tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); colIndex += 1; // the first column is ts pExpr = tscSqlExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pQueryInfo), resultSize, false); @@ -2398,7 +2400,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->base.aliasName, pExpr); } else { assert(ids.num == 1); - tscColumnListInsert(pQueryInfo->colList, &ids.ids[0], pSchema); + tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); } return TSDB_CODE_SUCCESS; @@ -2448,9 +2450,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } - tscColumnListInsert(pTableMetaInfo->tagColList, &index, &pSchema[index.columnIndex]); + tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid, + &pSchema[index.columnIndex]); SSchema* pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - + SSchema s = {0}; if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) { s = *tGetTbnameColumnSchema(); @@ -3054,14 +3057,14 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd taosArrayPush(pGroupExpr->columnInfo, &colIndex); index.columnIndex = relIndex; - tscColumnListInsert(pTableMetaInfo->tagColList, &index, pSchema); + tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pSchema); } else { // check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8); } - tscColumnListInsert(pQueryInfo->colList, &index, pSchema); + tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema); SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId }; taosArrayPush(pGroupExpr->columnInfo, &colIndex); @@ -3259,7 +3262,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC const char* msg2 = "binary column not support this operator"; const char* msg3 = "bool column not support this operator"; - SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex, pSchema); + SColumn* pColumn = tscColumnListInsert(pQueryInfo->colList, pIndex->columnIndex, pTableMeta->id.uid, pSchema); SColumnFilterInfo* pColFilter = NULL; /* @@ -3312,7 +3315,8 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC } } - pColumn->colIndex = *pIndex; + pColumn->columnIndex = pIndex->columnIndex; + pColumn->tableUid = pTableMeta->id.uid; return doExtractColumnFilterInfo(pCmd, pQueryInfo, pColFilter, pIndex, pExpr); } @@ -3412,7 +3416,7 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) { + if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid)) { // tscColumnListInsert(pTableMetaInfo->tagColList, &index, ); if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); @@ -3441,10 +3445,11 @@ static int32_t checkAndSetJoinCondInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tS (*rightNode)->tagColId = pTagSchema2->colId; if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { - index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); - if (!tscColumnExists(pTableMetaInfo->tagColList, &index)) { + STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; + index.columnIndex = index.columnIndex - tscGetNumOfColumns(pTableMeta); + if (!tscColumnExists(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid)) { - tscColumnListInsert(pTableMetaInfo->tagColList, &index, pTagSchema2); + tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMeta->id.uid, pTagSchema2); if (taosArrayGetSize(pTableMetaInfo->tagColList) > 1) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } @@ -4373,7 +4378,8 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE SColumnIndex index = {.tableIndex = i, .columnIndex = pIndex->colIndex - numOfCols}; SSchema* s = tscGetTableSchema(pTableMetaInfo->pTableMeta); - tscColumnListInsert(pTableMetaInfo->tagColList, &index, &s[pIndex->colIndex]); + tscColumnListInsert(pTableMetaInfo->tagColList, index.columnIndex, pTableMetaInfo->pTableMeta->id.uid, + &s[pIndex->colIndex]); } tsSetSTableQueryCond(&pQueryInfo->tagCond, uid, &bw); @@ -5755,7 +5761,8 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau if (pExpr == NULL || pExpr->base.functionId != TSDB_FUNC_TAG) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pParentQueryInfo, tableIndex); - int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, pTableMetaInfo->pTableMeta->id.uid); + uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; + int16_t colId = tscGetJoinTagColIdByUid(&pQueryInfo->tagCond, uid); SSchema* pTagSchema = tscGetColumnSchemaById(pTableMetaInfo->pTableMeta, colId); int16_t colIndex = tscGetTagColIndexById(pTableMetaInfo->pTableMeta, colId); @@ -5778,8 +5785,7 @@ void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClau SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); pColIndex->colIndex = relIndex; - index = (SColumnIndex) {.tableIndex = tableIndex, .columnIndex = relIndex}; - tscColumnListInsert(pTableMetaInfo->tagColList, &index, pTagSchema); + tscColumnListInsert(pTableMetaInfo->tagColList, relIndex, uid, pTagSchema); } } } @@ -6035,34 +6041,26 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) } static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { - const char* msg2 = "interval not allowed in group by normal column"; + const char* msg1 = "interval not allowed in group by normal column"; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSchema s = *tGetTbnameColumnSchema(); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); - int16_t bytes = 0; - int16_t type = 0; - char* name = NULL; + SSchema* tagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); + + SSchema* s = NULL; for (int32_t i = 0; i < pQueryInfo->groupbyExpr.numOfGroupCols; ++i) { SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, i); int16_t colIndex = pColIndex->colIndex; + if (colIndex == TSDB_TBNAME_COLUMN_INDEX) { - type = s.type; - bytes = s.bytes; - name = s.name; + s = tGetTbnameColumnSchema(); } else { if (TSDB_COL_IS_TAG(pColIndex->flag)) { - SSchema* tagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta); - - type = tagSchema[colIndex].type; - bytes = tagSchema[colIndex].bytes; - name = tagSchema[colIndex].name; + s = &tagSchema[colIndex]; } else { - type = pSchema[colIndex].type; - bytes = pSchema[colIndex].bytes; - name = pSchema[colIndex].name; + s = &pSchema[colIndex]; } } @@ -6070,34 +6068,33 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo if (TSDB_COL_IS_TAG(pColIndex->flag)) { SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex}; - SExprInfo* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, type, bytes, getNewResColId(pQueryInfo), bytes, true); - + SExprInfo* pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, s->type, s->bytes, + getNewResColId(pQueryInfo), s->bytes, true); + memset(pExpr->base.aliasName, 0, sizeof(pExpr->base.aliasName)); - tstrncpy(pExpr->base.aliasName, name, sizeof(pExpr->base.aliasName)); - + tstrncpy(pExpr->base.aliasName, s->name, sizeof(pExpr->base.aliasName)); + pExpr->base.colInfo.flag = TSDB_COL_TAG; // NOTE: tag column does not add to source column list SColumnList ids = createColumnList(1, 0, pColIndex->colIndex); - insertResultField(pQueryInfo, (int32_t)size, &ids, bytes, (int8_t)type, name, pExpr); + insertResultField(pQueryInfo, (int32_t)size, &ids, s->bytes, (int8_t)s->type, s->name, pExpr); } else { // if this query is "group by" normal column, time window query is not allowed if (isTimeWindowQuery(pQueryInfo)) { - return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1); } bool hasGroupColumn = false; for (int32_t j = 0; j < size; ++j) { SExprInfo* pExpr = tscSqlExprGet(pQueryInfo, j); - if (pExpr->base.colInfo.colId == pColIndex->colId) { + if ((pExpr->base.functionId == TSDB_FUNC_PRJ) && pExpr->base.colInfo.colId == pColIndex->colId) { + hasGroupColumn = true; break; } } - /* - * if the group by column does not required by user, add this column into the final result set - * but invisible to user - */ + //if the group by column does not required by user, add an invisible column into the final result set. if (!hasGroupColumn) { doAddGroupColumnForSubquery(pQueryInfo, i); } @@ -6154,6 +6151,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) { // check if all the tags prj columns belongs to the group by columns if (onlyTagPrjFunction(pQueryInfo) && allTagPrjInGroupby(pQueryInfo)) { + // It is a groupby aggregate query, the tag project function is not suitable for this case. updateTagPrjFunction(pQueryInfo); return doAddGroupbyColumnsOnDemand(pCmd, pQueryInfo); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index ffa11d9552..24690366c4 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -705,10 +705,11 @@ static int32_t serializeSqlExpr(SSqlExpr* pExpr, STableMetaInfo* pTableMetaInfo, return TSDB_CODE_TSC_INVALID_TABLE_NAME; } - if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { - tscError("%p table schema is not matched with parsed sql", addr); - return TSDB_CODE_TSC_INVALID_SQL; - } + //TODO disable it temporarily +// if (!tscValidateColumnId(pTableMetaInfo, pExpr->colInfo.colId, pExpr->numOfParams)) { +// tscError("%p table schema is not matched with parsed sql", addr); +// return TSDB_CODE_TSC_INVALID_SQL; +// } assert(pExpr->resColId < 0); SSqlExpr* pSqlExpr = (SSqlExpr *)(*pMsg); @@ -777,13 +778,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tableScanOperator = htonl(*tablescanOp); } - if (query.order.order == TSDB_ORDER_ASC) { - pQueryMsg->window.skey = htobe64(query.window.skey); - pQueryMsg->window.ekey = htobe64(query.window.ekey); - } else { - pQueryMsg->window.skey = htobe64(query.window.ekey); - pQueryMsg->window.ekey = htobe64(query.window.skey); - } + pQueryMsg->window.skey = htobe64(query.window.skey); + pQueryMsg->window.ekey = htobe64(query.window.ekey); pQueryMsg->order = htons(query.order.order); pQueryMsg->orderColId = htons(query.order.orderColId); @@ -1594,9 +1590,6 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { } uint64_t localQueryId = 0; -// SMultiwayMergeInfo* pInfo = (SMultiwayMergeInfo*) pQueryInfo->pQInfo->runtimeEnv.proot->info; -// pInfo->pMerge = pRes->pLocalMerger; - qTableQuery(pQueryInfo->pQInfo, &localQueryId); SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; pRes->numOfRows = (p != NULL)? p->info.rows: 0; @@ -2108,10 +2101,11 @@ int tscProcessShowRsp(SSqlObj *pSql) { SColumnIndex index = {0}; pSchema = pMetaMsg->schema; - + + uint64_t uid = pTableMetaInfo->pTableMeta->id.uid; for (int16_t i = 0; i < pMetaMsg->numOfColumns; ++i, ++pSchema) { index.columnIndex = i; - tscColumnListInsert(pQueryInfo->colList, &index, &pSchema[i]); + tscColumnListInsert(pQueryInfo->colList, i, uid, &pSchema[i]); TAOS_FIELD f = tscCreateField(pSchema->type, pSchema->name, pSchema->bytes); SInternalField* pInfo = tscFieldInfoAppend(pFieldInfo, &f); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1ae5041e78..24c87897d6 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -451,25 +451,6 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { free(pSupporter); } -/* - * need the secondary query process - * In case of count(ts)/count(*)/spread(ts) query, that are only applied to - * primary timestamp column , the secondary query is not necessary - * - */ -static UNUSED_FUNC bool needSecondaryQuery(SQueryInfo* pQueryInfo) { - size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); - - for (int32_t i = 0; i < numOfCols; ++i) { - SColumn* base = taosArrayGet(pQueryInfo->colList, i); - if (base->colIndex.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return true; - } - } - - return false; -} - static void filterVgroupTables(SQueryInfo* pQueryInfo, SArray* pVgroupTables) { int32_t num = 0; int32_t* list = NULL; @@ -598,10 +579,12 @@ static int32_t tscLaunchRealSubqueries(SSqlObj* pSql) { tscTagCondCopy(&pQueryInfo->tagCond, &pSupporter->tagCond); - pQueryInfo->colList = pSupporter->colList; - pQueryInfo->exprList = pSupporter->exprList; - pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; + pQueryInfo->colList = pSupporter->colList; + pQueryInfo->exprList = pSupporter->exprList; + pQueryInfo->fieldsInfo = pSupporter->fieldsInfo; pQueryInfo->groupbyExpr = pSupporter->groupInfo; + pQueryInfo->pUpstream = taosArrayInit(4, sizeof(POINTER_BYTES)); + pQueryInfo->pDownstream = taosArrayInit(4, sizeof(POINTER_BYTES)); assert(pNew->subState.numOfSub == 0 && pNew->cmd.numOfClause == 1 && pQueryInfo->numOfTables == 1); @@ -1868,11 +1851,10 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter assert(pNewQueryInfo != NULL); // update the table index - size_t num = taosArrayGetSize(pNewQueryInfo->colList); - for (int32_t i = 0; i < num; ++i) { - SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i); - pCol->colIndex.tableIndex = 0; - } +// size_t num = taosArrayGetSize(pNewQueryInfo->colList); +// for (int32_t i = 0; i < num; ++i) { +// SColumn* pCol = taosArrayGetP(pNewQueryInfo->colList, i); +// } pSupporter->colList = pNewQueryInfo->colList; pNewQueryInfo->colList = NULL; @@ -2402,9 +2384,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { } } - SColumnIndex columnIndex = {.tableIndex = 0, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX}; - tscInsertPrimaryTsSourceColumn(pNewQueryInfo, &columnIndex); - + tscInsertPrimaryTsSourceColumn(pNewQueryInfo, pTableMetaInfo->pTableMeta->id.uid); tscTansformFuncForSTableQuery(pNewQueryInfo); tscDebug( @@ -2791,7 +2771,10 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscFreeRetrieveSup(pSql); // set the command flag must be after the semaphore been correctly set. - pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { + pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE; + } + if (pParentSql->res.code == TSDB_CODE_SUCCESS) { (*pParentSql->fp)(pParentSql->param, pParentSql, 0); } else { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 44849171a0..8146ed134b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -598,7 +598,7 @@ static SColumnInfo* extractColumnInfoFromResult(STableMeta* pTableMeta, SArray* SSchema *pSchema = pTableMeta->schema; for(int32_t i = 0; i < numOfCols; ++i) { SColumn* pCol = taosArrayGetP(pTableCols, i); - int32_t index = pCol->colIndex.columnIndex; + int32_t index = pCol->columnIndex; pColInfo[i].type = pSchema[index].type; pColInfo[i].bytes = pSchema[index].bytes; @@ -613,7 +613,7 @@ typedef struct SDummyInputInfo { SSqlRes *pRes; // refactor: remove it } SDummyInputInfo; -SSDataBlock* doGetDataBlock(void* param) { +SSDataBlock* doGetDataBlock(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*) param; SDummyInputInfo *pInput = pOperator->info; @@ -634,6 +634,7 @@ SSDataBlock* doGetDataBlock(void* param) { } pInput->pRes->numOfRows = 0; + *newgroup = false; return pBlock; } @@ -1601,19 +1602,18 @@ int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepco return 0; } -bool tscColumnExists(SArray* pColumnList, SColumnIndex* pColIndex) { +bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid) { // ignore the tbname columnIndex to be inserted into source list - if (pColIndex->columnIndex < 0) { + if (columnIndex < 0) { return false; } size_t numOfCols = taosArrayGetSize(pColumnList); - int16_t col = pColIndex->columnIndex; int32_t i = 0; while (i < numOfCols) { SColumn* pCol = taosArrayGetP(pColumnList, i); - if ((pCol->colIndex.columnIndex != col) || (pCol->colIndex.tableIndex != pColIndex->tableIndex)) { + if ((pCol->columnIndex != columnIndex) || (pCol->tableUid != uid)) { ++i; continue; } else { @@ -1640,22 +1640,20 @@ void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src) { } } -// TODO refactor -SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex, SSchema* pSchema) { +SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema) { // ignore the tbname columnIndex to be inserted into source list - if (pColIndex->columnIndex < 0) { + if (columnIndex < 0) { return NULL; } size_t numOfCols = taosArrayGetSize(pColumnList); - int16_t col = pColIndex->columnIndex; int32_t i = 0; while (i < numOfCols) { SColumn* pCol = taosArrayGetP(pColumnList, i); - if (pCol->colIndex.columnIndex < col) { + if (pCol->columnIndex < columnIndex) { i++; - } else if (pCol->colIndex.tableIndex < pColIndex->tableIndex) { + } else if (pCol->tableUid < uid) { i++; } else { break; @@ -1668,22 +1666,24 @@ SColumn* tscColumnListInsert(SArray* pColumnList, SColumnIndex* pColIndex, SSche return NULL; } - b->colIndex = *pColIndex; - b->info.colId = pSchema->colId; - b->info.bytes = pSchema->bytes; - b->info.type = pSchema->type; + b->columnIndex = columnIndex; + b->tableUid = uid; + b->info.colId = pSchema->colId; + b->info.bytes = pSchema->bytes; + b->info.type = pSchema->type; taosArrayInsert(pColumnList, i, &b); } else { SColumn* pCol = taosArrayGetP(pColumnList, i); - if (i < numOfCols && (pCol->colIndex.columnIndex > col || pCol->colIndex.tableIndex != pColIndex->tableIndex)) { + if (i < numOfCols && (pCol->columnIndex > columnIndex || pCol->tableUid != uid)) { SColumn* b = calloc(1, sizeof(SColumn)); if (b == NULL) { return NULL; } - b->colIndex = *pColIndex; + b->columnIndex = columnIndex; + b->tableUid = uid; b->info.colId = pSchema->colId; b->info.bytes = pSchema->bytes; b->info.type = pSchema->type; @@ -1713,7 +1713,8 @@ SColumn* tscColumnClone(const SColumn* src) { return NULL; } - dst->colIndex = src->colIndex; + dst->columnIndex = src->columnIndex; + dst->tableUid = src->tableUid; dst->info.numOfFilters = src->info.numOfFilters; dst->info.filterInfo = tFilterInfoDup(src->info.filterInfo, src->info.numOfFilters); dst->info.type = src->info.type; @@ -1727,14 +1728,14 @@ static void tscColumnDestroy(SColumn* pCol) { free(pCol); } -void tscColumnListCopy(SArray* dst, const SArray* src, int16_t tableIndex) { +void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid) { assert(src != NULL && dst != NULL); size_t num = taosArrayGetSize(src); for (int32_t i = 0; i < num; ++i) { SColumn* pCol = taosArrayGetP(src, i); - if (pCol->colIndex.tableIndex == tableIndex || tableIndex < 0) { + if (pCol->tableUid == tableUid) { SColumn* p = tscColumnClone(pCol); taosArrayPush(dst, &p); } @@ -2223,6 +2224,9 @@ static void freeQueryInfoImpl(SQueryInfo* pQueryInfo) { taosArrayDestroy(pQueryInfo->pUpstream); taosArrayDestroy(pQueryInfo->pDownstream); + + pQueryInfo->pUpstream = NULL; + pQueryInfo->pDownstream = NULL; } void tscClearSubqueryInfo(SSqlCmd* pCmd) { @@ -3339,13 +3343,24 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu SExprInfo* pExpr = &pQueryAttr->pExpr1[i]; SSqlExpr* pse = &pQueryAttr->pExpr3[i].base; - *pse = pExpr->base; + memcpy(pse->aliasName, pExpr->base.aliasName, tListLen(pse->aliasName)); + + pse->uid = pExpr->base.uid; + pse->functionId = pExpr->base.functionId; + pse->resType = pExpr->base.resType; + pse->resBytes = pExpr->base.resBytes; + pse->interBytes = pExpr->base.interBytes; + pse->resColId = pExpr->base.resColId; + pse->offset = pExpr->base.offset; + pse->numOfParams = pExpr->base.numOfParams; + + pse->colInfo = pExpr->base.colInfo; pse->colInfo.colId = pExpr->base.resColId; pse->colInfo.colIndex = i; pse->colType = pExpr->base.resType; pse->colBytes = pExpr->base.resBytes; - pse->colInfo.flag = TSDB_COL_NORMAL; + pse->colInfo.flag = pExpr->base.colInfo.flag; for (int32_t j = 0; j < pExpr->base.numOfParams; ++j) { tVariantAssign(&pse->param[j], &pExpr->base.param[j]); @@ -3404,9 +3419,9 @@ static int32_t createTagColumnInfo(SQueryAttr* pQueryAttr, SQueryInfo* pQueryInf SSchema* pSchema = tscGetTableTagSchema(pTableMeta); for (int32_t i = 0; i < pQueryAttr->numOfTags; ++i) { SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); - SSchema* pColSchema = &pSchema[pCol->colIndex.columnIndex]; + SSchema* pColSchema = &pSchema[pCol->columnIndex]; - if ((pCol->colIndex.columnIndex >= numOfTagColumns || pCol->colIndex.columnIndex < TSDB_TBNAME_COLUMN_INDEX) || + if ((pCol->columnIndex >= numOfTagColumns || pCol->columnIndex < TSDB_TBNAME_COLUMN_INDEX) || (!isValidDataType(pColSchema->type))) { return TSDB_CODE_TSC_INVALID_SQL; } @@ -3447,7 +3462,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->order = pQueryInfo->order; pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo); - pQueryAttr->window = pQueryInfo->window; + + if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor + pQueryAttr->window = pQueryInfo->window; + } else { + pQueryAttr->window.skey = pQueryInfo->window.ekey; + pQueryAttr->window.ekey = pQueryInfo->window.skey; + } memcpy(&pQueryAttr->interval, &pQueryInfo->interval, sizeof(pQueryAttr->interval)); diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ebc751c3c9..9b9a361bf5 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -233,14 +233,14 @@ typedef struct SQueryAttr { int32_t vgId; } SQueryAttr; -typedef SSDataBlock* (*__operator_fn_t)(void* param); +typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; typedef struct SQueryRuntimeEnv { jmp_buf env; - SQueryAttr* pQueryAttr; + SQueryAttr* pQueryAttr; uint32_t status; // query status void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not @@ -411,6 +411,8 @@ typedef struct SArithOperatorInfo { SOptrBasicInfo binfo; int32_t bufCapacity; uint32_t seed; + + SSDataBlock *existDataBlock; } SArithOperatorInfo; typedef struct SLimitOperatorInfo { @@ -438,6 +440,8 @@ typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; SSDataBlock *pRes; int64_t totalInputRows; + + SSDataBlock *existNewGroupBlock; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { @@ -494,9 +498,9 @@ SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SEx SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); -SSDataBlock* doGlobalAggregate(void* param); -SSDataBlock* doMultiwaySort(void* param); -SSDataBlock* doSLimit(void* param); +SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); +SSDataBlock* doMultiwaySort(void* param, bool* newgroup); +SSDataBlock* doSLimit(void* param, bool* newgroup); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index 8e64810c6c..b851fbb3e0 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -238,7 +238,7 @@ int32_t compare_d(tOrderDescriptor *, int32_t numOfRow1, int32_t s1, char *data1 char *data2); struct SSDataBlock; -int32_t compare_aRv(struct SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order); +int32_t compare_aRv(struct SSDataBlock* pBlock, SArray* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order); int32_t columnValueAscendingComparator(char *f1, char *f2, int32_t type, int32_t bytes); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index cc13d64d2b..f9da39bcd3 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -3006,7 +3006,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { pInput->pHisto = (SHistogramInfo*) ((char *)pInput + sizeof(SAPercentileInfo)); pInput->pHisto->elems = (SHistBin*) ((char *)pInput->pHisto + sizeof(SHistogramInfo)); - + if (pInput->pHisto->numOfElems <= 0) { return; } @@ -3025,7 +3025,7 @@ static void apercentile_func_merge(SQLFunctionCtx *pCtx) { pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo)); tHistogramDestroy(&pRes); } - + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); pResInfo->hasResult = DATA_SET_FLAG; SET_VAL(pCtx, 1, 1); @@ -3036,7 +3036,11 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo * pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo *pOutput = GET_ROWCELL_INTERBUF(pResInfo); - + + if (pOutput->pHisto->numOfElems > 1000) { + printf("%d\n", pOutput->pHisto->numOfElems); + } + if (pCtx->currentStage == MERGE_STAGE) { if (pResInfo->hasResult == DATA_SET_FLAG) { // check for null assert(pOutput->pHisto->numOfElems > 0); @@ -3349,9 +3353,15 @@ static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { * @param pCtx * @return */ +static void copy_function(SQLFunctionCtx *pCtx); + static void tag_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, 1, 1); - tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true); + if (pCtx->currentStage == MERGE_STAGE) { + copy_function(pCtx); + } else { + tVariantDump(&pCtx->tag, pCtx->pOutput, pCtx->outputType, true); + } } static void tag_function_f(SQLFunctionCtx *pCtx, int32_t index) { @@ -5102,7 +5112,7 @@ SAggFunctionInfo aAggs[] = {{ }, { // 17 - "ts", + "ts_dummy", TSDB_FUNC_TS_DUMMY, TSDB_FUNC_TS_DUMMY, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, @@ -5115,7 +5125,7 @@ SAggFunctionInfo aAggs[] = {{ }, { // 18 - "tag", + "tag_dummy", TSDB_FUNC_TAG_DUMMY, TSDB_FUNC_TAG_DUMMY, TSDB_BASE_FUNC_SO, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fc031e8876..504b1b5169 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -193,7 +193,8 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO idata.info.bytes = pExpr[i].base.resBytes; idata.info.colId = pExpr[i].base.resColId; - idata.pData = calloc(1, MAX(idata.info.bytes * numOfRows, minSize)); // at least to hold a pointer on x64 platform + int32_t size = MAX(idata.info.bytes * numOfRows, minSize); + idata.pData = calloc(1, size); // at least to hold a pointer on x64 platform taosArrayPush(res->pDataBlock, &idata); } @@ -907,7 +908,8 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, setArithParams((SArithmeticSupport*)pCtx[i].param[1].pz, &pOperator->pExpr[i], pBlock); } else { SColIndex* pCol = &pOperator->pExpr[i].base.colInfo; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || pCol->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) { + if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || (pCol->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) || + (TSDB_COL_IS_TAG(pCol->flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); @@ -920,6 +922,16 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0); pCtx[i].ptsList = (int64_t*) tsInfo->pData; } + } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) { + SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo; + SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex); + + pCtx[i].pInput = p->pData; + assert(p->info.colId == pColIndex->colId && pCtx[i].inputType == p->info.type); + for(int32_t j = 0; j < pBlock->info.rows; ++j) { + char* dst = p->pData + j * p->info.bytes; + tVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true); + } } } } @@ -1701,7 +1713,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } - case OP_Arithmetic: { + case OP_Arithmetic: { // TODO refactor to remove arith operator. SOperatorInfo* prev = pRuntimeEnv->pTableScanner; if (i == 0) { pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); @@ -1710,6 +1722,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } } else { prev = pRuntimeEnv->proot; + assert(pQueryAttr->pExpr2 != NULL); pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); } @@ -2939,10 +2952,11 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows) { SSDataBlock* pDataBlock = pBInfo->pRes; - int32_t newSize = pDataBlock->info.rows + numOfInputRows; + int32_t newSize = pDataBlock->info.rows + numOfInputRows + 5; // extra output buffer if ((*bufCapacity) < newSize) { for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); + char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes); if (p != NULL) { pColInfo->pData = p; @@ -3431,6 +3445,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti int32_t orderType = (pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, orderType, pBlock); + // refactor : extract method SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { @@ -3861,8 +3876,9 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in pFillCol[i].col.bytes = pExprInfo->base.resBytes; pFillCol[i].col.type = (int8_t)pExprInfo->base.resType; pFillCol[i].col.offset = offset; + pFillCol[i].col.colId = pExprInfo->base.resColId; pFillCol[i].tagIndex = -2; - pFillCol[i].flag = TSDB_COL_NORMAL; // always be ta normal column for table query + pFillCol[i].flag = pExprInfo->base.colInfo.flag; // always be the normal column for table query pFillCol[i].functionId = pExprInfo->base.functionId; pFillCol[i].fillVal.i = fillVal[i]; @@ -4011,7 +4027,7 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { } } -static SSDataBlock* doTableScanImpl(void* param) { +static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; STableScanInfo* pTableScanInfo = pOperator->info; @@ -4020,6 +4036,8 @@ static SSDataBlock* doTableScanImpl(void* param) { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STableGroupInfo *pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; + *newgroup = false; + while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -4058,17 +4076,18 @@ static SSDataBlock* doTableScanImpl(void* param) { return NULL; } -static SSDataBlock* doTableScan(void* param) { +static SSDataBlock* doTableScan(void* param, bool *newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; STableScanInfo *pTableScanInfo = pOperator->info; SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; + *newgroup = false; while (pTableScanInfo->current < pTableScanInfo->times) { - SSDataBlock* p = doTableScanImpl(pOperator); + SSDataBlock* p = doTableScanImpl(pOperator, newgroup); if (p != NULL) { return p; } @@ -4102,6 +4121,7 @@ static SSDataBlock* doTableScan(void* param) { GET_QID(pRuntimeEnv), cond.twindow.skey, cond.twindow.ekey); } + SSDataBlock *p = NULL; if (pTableScanInfo->reverseTimes > 0) { setupEnvForReverseScan(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); @@ -4123,22 +4143,20 @@ static SSDataBlock* doTableScan(void* param) { pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey; } - SSDataBlock* p = doTableScanImpl(pOperator); - if (p != NULL) { - return p; - } + p = doTableScanImpl(pOperator, newgroup); } - return NULL; + return p; } -static SSDataBlock* doBlockInfoScan(void* param) { +static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { SOperatorInfo *pOperator = (SOperatorInfo*)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } STableScanInfo *pTableScanInfo = pOperator->info; + *newgroup = false; STableBlockDist tableBlockDist = {0}; tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables; @@ -4357,12 +4375,23 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { for(int32_t i = 0; i < numOfCols; ++i) { SColIndex* index = taosArrayGet(pOrderColumns, i); + + bool found = false; for(int32_t j = 0; j < pQuery->numOfOutput; ++j) { - if (index->colId == pQuery->pExpr1[j].base.colInfo.colId) { + SSqlExpr* pExpr = &pQuery->pExpr1[j].base; + + // TSDB_FUNC_TAG_DUMMY function needs to be ignored + if (index->colId == pExpr->colInfo.colId && + ((TSDB_COL_IS_TAG(pExpr->colInfo.flag) && pExpr->functionId == TSDB_FUNC_TAG) || + (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && pExpr->functionId == TSDB_FUNC_PRJ))) { index->colIndex = j; - index->colId = pQuery->pExpr1[j].base.resColId; + index->colId = pExpr->resColId; + found = true; + break; } } + + assert(found && index->colIndex >= 0 && index->colIndex < pQuery->numOfOutput); } return pOrderColumns; @@ -4393,7 +4422,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfOutput; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -4404,7 +4433,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, numOfCols = (pInfo->groupColumnList != NULL)? taosArrayGetSize(pInfo->groupColumnList):0; pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); - offset = POINTER_BYTES * numOfOutput; + offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset; @@ -4453,7 +4482,7 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx int32_t numOfCols = (pInfo->orderColumnList != NULL)? taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfOutput; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -4481,7 +4510,7 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { } // this is a blocking operator -static SSDataBlock* doAggregate(void* param) { +static SSDataBlock* doAggregate(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4498,7 +4527,7 @@ static SSDataBlock* doAggregate(void* param) { SOperatorInfo* upstream = pOperator->upstream; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { break; } @@ -4524,7 +4553,7 @@ static SSDataBlock* doAggregate(void* param) { return pInfo->pRes; } -static SSDataBlock* doSTableAggregate(void* param) { +static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4551,7 +4580,7 @@ static SSDataBlock* doSTableAggregate(void* param) { SOperatorInfo* upstream = pOperator->upstream; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { break; } @@ -4587,7 +4616,7 @@ static SSDataBlock* doSTableAggregate(void* param) { return pInfo->pRes; } -static SSDataBlock* doArithmeticOperation(void* param) { +static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; SArithOperatorInfo* pArithInfo = pOperator->info; @@ -4599,13 +4628,55 @@ static SSDataBlock* doArithmeticOperation(void* param) { pRes->info.rows = 0; + if (pArithInfo->existDataBlock) { // TODO refactor + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; + + SSDataBlock* pBlock = pArithInfo->existDataBlock; + pArithInfo->existDataBlock = NULL; + *newgroup = true; + + // todo dynamic set tags + if (pTableQueryInfo != NULL) { + setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); + updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows); + + arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + + if (pTableQueryInfo != NULL) { // TODO refactor + updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); + } + + pRes->info.rows = getNumOfResult(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); + if (pRes->info.rows >= pRuntimeEnv->resultInfo.threshold) { + clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); + return pRes; + } + } + while(1) { - SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); + bool prevVal = *newgroup; + + // The upstream exec may change the value of the newgroup, so use a local variable instead. + SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); if (pBlock == NULL) { + assert(*newgroup == false); + + *newgroup = prevVal; setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); break; } + // Return result of the previous group in the firstly. + if (newgroup && pRes->info.rows > 0) { + pArithInfo->existDataBlock = pBlock; + clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); + return pInfo->pRes; + } + STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // todo dynamic set tags @@ -4633,7 +4704,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } -static SSDataBlock* doLimit(void* param) { +static SSDataBlock* doLimit(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*)param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4644,7 +4715,7 @@ static SSDataBlock* doLimit(void* param) { SSDataBlock* pBlock = NULL; while (1) { - pBlock = pOperator->upstream->exec(pOperator->upstream); + pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -4684,7 +4755,7 @@ static SSDataBlock* doLimit(void* param) { return pBlock; } -static SSDataBlock* doIntervalAgg(void* param) { +static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4710,7 +4781,7 @@ static SSDataBlock* doIntervalAgg(void* param) { SOperatorInfo* upstream = pOperator->upstream; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { break; } @@ -4739,7 +4810,7 @@ static SSDataBlock* doIntervalAgg(void* param) { return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; } -static SSDataBlock* doSTableIntervalAgg(void* param) { +static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4763,7 +4834,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { SOperatorInfo* upstream = pOperator->upstream; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { break; } @@ -4791,7 +4862,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { return pIntervalInfo->pRes; } -static SSDataBlock* doSessionWindowAgg(void* param) { +static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4818,7 +4889,7 @@ static SSDataBlock* doSessionWindowAgg(void* param) { SOperatorInfo* upstream = pOperator->upstream; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { break; } @@ -4847,7 +4918,7 @@ static SSDataBlock* doSessionWindowAgg(void* param) { return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; } -static SSDataBlock* hashGroupbyAggregate(void* param) { +static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4869,7 +4940,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param) { SOperatorInfo* upstream = pOperator->upstream; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + SSDataBlock* pBlock = upstream->exec(upstream, newgroup); if (pBlock == NULL) { break; } @@ -4904,40 +4975,89 @@ static SSDataBlock* hashGroupbyAggregate(void* param) { return pInfo->binfo.pRes; } -static SSDataBlock* doFill(void* param) { +static SSDataBlock* doFill(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } SFillOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; if (taosFillHasMoreResults(pInfo->pFillInfo)) { + *newgroup = false; doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); return pInfo->pRes; } + // handle the cached new group data block + if (pInfo->existNewGroupBlock) { + pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; + int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; + taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); + + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); + + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + pInfo->existNewGroupBlock = NULL; + *newgroup = true; + return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; + } + while(1) { - SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); - if (pBlock == NULL) { - if (pInfo->totalInputRows == 0) { - pOperator->status = OP_EXEC_DONE; - return NULL; - } + SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream, newgroup); + if (*newgroup) { + assert(pBlock != NULL); + } + + if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block + pInfo->existNewGroupBlock = pBlock; + *newgroup = false; + // fill the previous group data block + // before handle a new data block, close the fill operation for previous group data block taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { - pInfo->totalInputRows += pBlock->info.rows; + if (pBlock == NULL) { + if (pInfo->totalInputRows == 0) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + + taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); + } else { + pInfo->totalInputRows += pBlock->info.rows; - int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pBlock->info.window.ekey; + int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey + : */pBlock->info.window.ekey; - taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); + taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); + } } doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); - return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; + if (pInfo->pRes->info.rows > 0) { // current group has no more result to return + return pInfo->pRes; + } else if (pInfo->existNewGroupBlock) { // try next group + pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; + int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey + :*/ pInfo->existNewGroupBlock->info.window.ekey; + taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); + + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); + + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + pInfo->existNewGroupBlock = NULL; + *newgroup = true; + + return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; + } else { + return NULL; + } + // return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } } @@ -5219,9 +5339,10 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); - pInfo->pFillInfo = taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, - pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision, - pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + pInfo->pFillInfo = + taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, + pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, + (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -5264,7 +5385,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfCols = pInfo->orderColumnList != NULL? taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfOutput; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -5288,7 +5409,7 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator } -static SSDataBlock* doTagScan(void* param) { +static SSDataBlock* doTagScan(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -5300,6 +5421,7 @@ static SSDataBlock* doTagScan(void* param) { STagScanInfo *pInfo = pOperator->info; SSDataBlock *pRes = pInfo->pRes; + *newgroup = false; int32_t count = 0; SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index 71aa844b86..654295ecab 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -464,10 +464,13 @@ int32_t compare_a(tOrderDescriptor *pDescriptor, int32_t numOfRows1, int32_t s1, return 0; } -int32_t compare_aRv(SSDataBlock* pBlock, int16_t* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order) { +int32_t compare_aRv(SSDataBlock* pBlock, SArray* colIndex, int32_t numOfCols, int32_t rowIndex, char** buffer, int32_t order) { for (int32_t i = 0; i < numOfCols; ++i) { - int32_t index = colIndex[i]; + SColIndex* pColIndex = taosArrayGet(colIndex, i); + int32_t index = pColIndex->colIndex; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, index); + assert(pColIndex->colId == pColInfo->info.colId); char* data = pColInfo->pData + rowIndex * pColInfo->info.bytes; if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 2de1029396..fa572029fc 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -31,7 +31,7 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { for(int32_t j = 0; j < pFillInfo->numOfCols; ++j) { SFillColInfo* pCol = &pFillInfo->pFillCol[j]; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag)) { + if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) { continue; } @@ -126,10 +126,10 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData } else { setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); } - } else { /* fill the default value */ + } else { // fill the default value */ for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { + if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { continue; } @@ -210,7 +210,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR // assign rows to dst buffer for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; - if (TSDB_COL_IS_TAG(pCol->flag)) { + if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { continue; } @@ -275,13 +275,16 @@ static int64_t appendFilledResult(SFillInfo* pFillInfo, void** output, int64_t r // there are no duplicated tags in the SFillTagColInfo list static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t capacity) { int32_t rowsize = 0; + int32_t numOfTags = 0; int32_t k = 0; for (int32_t i = 0; i < numOfCols; ++i) { SFillColInfo* pColInfo = &pFillInfo->pFillCol[i]; pFillInfo->pData[i] = NULL; - if (TSDB_COL_IS_TAG(pColInfo->flag)) { + if (TSDB_COL_IS_TAG(pColInfo->flag) || pColInfo->col.type == TSDB_DATA_TYPE_BINARY) { + numOfTags += 1; + bool exists = false; int32_t index = -1; for (int32_t j = 0; j < k; ++j) { @@ -310,6 +313,8 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t rowsize += pColInfo->col.bytes; } + pFillInfo->numOfTags = numOfTags; + assert(k <= pFillInfo->numOfTags); return rowsize; } @@ -347,12 +352,13 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 pFillInfo->interval.slidingUnit = slidingUnit; pFillInfo->pData = malloc(POINTER_BYTES * numOfCols); - if (numOfTags > 0) { - pFillInfo->pTags = calloc(pFillInfo->numOfTags, sizeof(SFillTagColInfo)); - for (int32_t i = 0; i < numOfTags; ++i) { + +// if (numOfTags > 0) { + pFillInfo->pTags = calloc(numOfCols, sizeof(SFillTagColInfo)); + for (int32_t i = 0; i < numOfCols; ++i) { pFillInfo->pTags[i].col.colId = -2; // TODO } - } +// } pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); assert(pFillInfo->rowSize > 0); @@ -367,6 +373,7 @@ SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int3 void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) { pFillInfo->start = startTimestamp; pFillInfo->currentKey = startTimestamp; + pFillInfo->end = startTimestamp; pFillInfo->index = -1; pFillInfo->numOfRows = 0; pFillInfo->numOfCurrent = 0; @@ -425,6 +432,8 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); // pFillInfo->pData[i] = pColData->pData; if (pInput->info.rows > pFillInfo->alloc) { @@ -436,6 +445,12 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) } memcpy(pFillInfo->pData[i], pColData->pData, pColData->info.bytes * pInput->info.rows); + + if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer + SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; + assert (pTag->col.colId == pCol->col.colId); + memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? + } } } @@ -456,7 +471,7 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* memcpy(pFillInfo->pData[i], data, (size_t)(pCol->col.bytes * pInput->num)); - if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer + if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; assert (pTag->col.colId == pCol->col.colId); memcpy(pTag->tagVal, data, pCol->col.bytes); // TODO not memcpy?? @@ -465,7 +480,17 @@ void taosFillCopyInputDataFromOneFilePage(SFillInfo* pFillInfo, const tFilePage* } bool taosFillHasMoreResults(SFillInfo* pFillInfo) { - return taosNumOfRemainRows(pFillInfo) > 0; + int32_t remain = taosNumOfRemainRows(pFillInfo); + if (remain > 0) { + return true; + } + + if (pFillInfo->numOfTotal > 0 && (((pFillInfo->end > pFillInfo->start) && FILL_IS_ASC_FILL(pFillInfo)) || + (pFillInfo->end < pFillInfo->start && !FILL_IS_ASC_FILL(pFillInfo)))) { + return getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, 4096) > 0; + } + + return false; } int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 3c4a9f9fef..e3486dd89f 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -132,15 +132,11 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { return plan; } - // todo: exchange operator? + // todo: int32_t op = OP_MultiwaySort; taosArrayPush(plan, &op); - // arithmetic operator - if (!pQueryAttr->simpleAgg && pQueryAttr->interval.interval == 0) { - op = OP_Arithmetic; - taosArrayPush(plan, &op); - } else { + if (pQueryAttr->simpleAgg || (pQueryAttr->interval.interval > 0 || pQueryAttr->sw.gap > 0)) { op = OP_GlobalAggregate; taosArrayPush(plan, &op); @@ -157,7 +153,8 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { } // limit/offset operator - if (pQueryAttr->slimit.limit > 0 || pQueryAttr->slimit.offset > 0) { + if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0 || + pQueryAttr->slimit.limit > 0 || pQueryAttr->slimit.offset > 0) { op = OP_SLimit; taosArrayPush(plan, &op); } diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index d7edd15314..542bbd67e6 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -236,7 +236,8 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { qDebug("QInfo:%"PRIu64" query task is launched", pQInfo->qId); - pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); + bool newgroup = false; + pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); if (isQueryKilled(pQInfo)) { qDebug("QInfo:%"PRIu64" query is killed", pQInfo->qId); diff --git a/tests/script/general/parser/limit2_query.sim b/tests/script/general/parser/limit2_query.sim index 9fe287960d..c35fd369ca 100644 --- a/tests/script/general/parser/limit2_query.sim +++ b/tests/script/general/parser/limit2_query.sim @@ -148,6 +148,9 @@ if $rows != 8200 then return -1 endi +sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000) limit 100000; + + sql select max(c1) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 interval(5m) fill(value, -1000, -2) limit 10 offset 8190; if $rows != 10 then return -1 diff --git a/tests/script/general/parser/select_with_tags.sim b/tests/script/general/parser/select_with_tags.sim index 38a514a51b..7cfb36150b 100644 --- a/tests/script/general/parser/select_with_tags.sim +++ b/tests/script/general/parser/select_with_tags.sim @@ -26,7 +26,7 @@ sql drop database if exists $db -x step1 step1: sql create database if not exists $db keep 36500 sql use $db -sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) +sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12), t3 int) $i = 0 $j = 1 @@ -36,7 +36,7 @@ while $i < $tbNum $tg2 = ' . abc $tg2 = $tg2 . $i $tg2 = $tg2 . ' - sql create table $tb using $mt tags( $i , $tg2 ) + sql create table $tb using $mt tags( $i , $tg2 , 123 ) $x = 0 while $x < $rowNum @@ -85,6 +85,7 @@ if $data00 != @70-01-01 08:01:40.000@ then endi if $data01 != @select_tags_tb0@ then + print expect: select_tags_tb0, actual: $data01 return -1 endi @@ -804,7 +805,46 @@ if $row != 1 then return -1 endi -print ======= selectivity + tags+ group by + tags + filter + interval + join=========== +print TODO ======= selectivity + tags+ group by + tags + filter + interval + join=========== + +print ==========================mix tag columns and group by columns====================== +sql select top(c1, 100), tbname from select_tags_mt0 where tbname in ('select_tags_tb0', 'select_tags_tb1') group by t3 +if $rows != 100 then + return -1 +endi + +if $data00 != @70-01-01 08:01:40.094@ then + print expect: 70-01-01 08:01:40.094, actual: $data00 + return -1 +endi + +if $data01 != 94 then + return -1 +endi + +if $data02 != @select_tags_tb0@ then + return -1 +endi + +if $data03 != 123 then + return -1 +endi + +if $data10 != @70-01-01 08:01:40.095@ then + return -1 +endi + +if $data11 != 95 then + return -1 +endi + +if $data12 != @select_tags_tb0@ then + return -1 +endi + +if $data13 != 123 then + return -1 +endi print ======error sql============================================= -- GitLab