diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 8d8287e53f690cb3217cbe72e7e3afd0d2c5eb51..da1d2b01c35ec8f89785c933ef3a8bcc7cb5faef 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -36,7 +36,7 @@ extern "C" { (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ - (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) + (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index f39169c193d3483e4db6a35aa42f5187ef1bf723..91ac7a2f3ace7f08bafb5d4620e65f59f0b89480 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -168,6 +168,9 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo } else { pRes->code = numOfRows; } + if (pRes->code == TSDB_CODE_SUCCESS) { + pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE; + } tscAsyncResultOnError(pSql); return; diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 2b635e8d8322e6c1779cf877b536644f20369b40..130abdf4a73ade3951070a100555d15270c265f2 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -35,6 +35,7 @@ typedef struct SCompareParam { static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { int32_t ret = 0; + size_t size = taosArrayGetSize(columnIndexList); if (size > 0) { ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC); @@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc (*hasPrev) = true; } +// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the +// output value to multiple rows static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) { if (numOfRows <= 1) { - return ; + return; } for (int32_t k = 0; k < numOfOutput; ++k) { @@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput continue; } - int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result - char* src = pCtx[k].pOutput; + char* src = pCtx[k].pOutput; + char* dst = pCtx[k].pOutput + pCtx[k].outputBytes; - for (int32_t i = 0; i < inc; ++i) { - pCtx[k].pOutput += pCtx[k].outputBytes; - memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes); + // Let's start from the second row, as the first row has result value already. + for (int32_t i = 1; i < numOfRows; ++i) { + memcpy(dst, src, (size_t)pCtx[k].outputBytes); + dst += pCtx[k].outputBytes; + } + } +} + +static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { + for (int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex; + } + + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); + } else { + aAggs[functionId].mergeFunc(&pCtx[j]); + } + } +} + +static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) { + for(int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); + } else { + aAggs[functionId].xFinalize(&pCtx[j]); } } } @@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD SMultiwayMergeInfo* pInfo = pOperator->info; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; - char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); + char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES); for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - add[i] = pCtx[i].pInput; + addrPtr[i] = pCtx[i].pInput; pCtx[i].size = 1; } for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } else { - for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - - continue; - } - - aAggs[functionId].xFinalize(&pCtx[j]); - } + doFinalizeResultImpl(pInfo, pCtx, numOfExpr); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); @@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } } else { - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); @@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].pInput = add[i]; + pCtx[i].pInput = addrPtr[i]; } } - tfree(add); + tfree(addrPtr); } static bool isAllSourcesCompleted(SGlobalMerger *pMerger) { @@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; bool sameGroup = true; if (pInfo->hasPrev) { + + // todo refactor extract method int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); // if this row belongs to current result set group @@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { break; } + bool sameGroup = true; if (pAggInfo->hasGroupColData) { - bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); - if (!sameGroup) { + sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); + if (!sameGroup && !pAggInfo->multiGroupResults) { *newgroup = true; pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; @@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { } if (handleData) { // data in current group is all handled - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { - int32_t functionId = pAggInfo->binfo.pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - - continue; - } - - aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); - } + doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); - pAggInfo->binfo.pRes->info.rows += numOfRows; + pAggInfo->binfo.pRes->info.rows += numOfRows; setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows); } @@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { return (pRes->info.rows != 0)? pRes:NULL; } -static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { - SSLimitOperatorInfo *pInfo = pOperator->info; - assert(pInfo->currentGroupOffset >= 0); +static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { + if (pInfo->currentOffset > 0) { + pInfo->currentOffset -= 1; + } else { + // discard the data rows in current group + if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) { + size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock); + for (int32_t i = 0; i < num1; ++i) { + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); - SSDataBlock* pBlock = NULL; - if (pInfo->currentGroupOffset == 0) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - } + SColumnInfo *pColInfo = &pColInfoData->info; + + char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes); - if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { - while ((*newgroup) == false) { // ignore the remain blocks - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + memcpy(pDst, pSrc, pColInfo->bytes); } + + pInfo->rowsTotal += 1; + pInfo->pRes->info.rows += 1; } + } +} + +static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) { + if (pInfo->capacity < pResultBlock->info.rows + numOfRows) { + int32_t total = pResultBlock->info.rows + numOfRows; + + size_t num = taosArrayGetSize(pResultBlock->pDataBlock); + for (int32_t i = 0; i < num; ++i) { + SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i); + + char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes); + if (tmp != NULL) { + pInfoData->pData = tmp; + } else { + // todo handle the malloc failure + } - return pBlock; + pInfo->capacity = total; + pInfo->threshold = (int64_t) (total * 0.8); + } } +} - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); +enum { + BLOCK_NEW_GROUP = 1, + BLOCK_NO_GROUP = 2, + BLOCK_SAME_GROUP = 3, +}; - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; +static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) { + int32_t rowIndex = 0; + + while (rowIndex < pBlock->info.rows) { + int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); + + bool samegroup = true; + if (pInfo->hasPrev) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); + + SColumnInfo *pColInfo = &pColInfoData->info; + + char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes); + if (ret != 0) { // it is a new group + samegroup = false; + break; + } + } } - while(1) { - if (*newgroup) { - pInfo->currentGroupOffset -= 1; - *newgroup = false; + if (!samegroup || !pInfo->hasPrev) { + pInfo->ignoreCurrentGroup = false; + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev); + + pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group + pInfo->rowsTotal = 0; + + if (pInfo->currentGroupOffset > 0) { + pInfo->ignoreCurrentGroup = true; + pInfo->currentGroupOffset -= 1; // now we are in the next group data + rowIndex += 1; + continue; + } + + // A new group has arrived according to the result rows, and the group limitation has already reached. + // Let's jump out of current loop and return immediately. + if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return BLOCK_NO_GROUP; } - while ((*newgroup) == false) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + pInfo->groupTotal += 1; - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + // data in current group not allowed, return if current result does not belong to the previous group.And there + // are results exists in current SSDataBlock + if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) { + return BLOCK_NEW_GROUP; } - // now we have got the first data block of the next group. - if (pInfo->currentGroupOffset == 0) { - return pBlock; + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); + + } else { // handle the offset in the same group + // All the data in current group needs to be discarded, due to the limit parameter in the SQL statement + if (pInfo->ignoreCurrentGroup) { + rowIndex += 1; + continue; } + + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); } - return NULL; + rowIndex += 1; + } + + return BLOCK_SAME_GROUP; } SSDataBlock* doSLimit(void* param, bool* newgroup) { @@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { } SSLimitOperatorInfo *pInfo = pOperator->info; + pInfo->pRes->info.rows = 0; - SSDataBlock *pBlock = NULL; - while (1) { - pBlock = skipGroupBlock(pOperator, newgroup); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } - - if (*newgroup) { // a new group arrives - pInfo->groupTotal += 1; - pInfo->rowsTotal = 0; - pInfo->currentOffset = pInfo->limit.offset; - } + if (pInfo->pPrevBlock != NULL) { + ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows); + int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock); + assert(ret != BLOCK_NEW_GROUP); - assert(pInfo->currentGroupOffset == 0); - - if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { - if (pInfo->currentOffset == 0) { - break; - } - - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; + pInfo->pPrevBlock = NULL; + } - // move the remain rows of this data block to the front. - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + assert(pInfo->currentGroupOffset >= 0); - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); - } + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - pInfo->currentOffset = 0; - break; + if (pBlock == NULL) { + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - } - - if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort - return NULL; - } - if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { - pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); - pInfo->rowsTotal = pInfo->limit.limit; + ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows); + int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock); + if (ret == BLOCK_NEW_GROUP) { + pInfo->pPrevBlock = pBlock; + return pInfo->pRes; + } - if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) { - pOperator->status = OP_EXEC_DONE; + if (pOperator->status == OP_EXEC_DONE) { + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - // setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - } else { - pInfo->rowsTotal += pBlock->info.rows; + // now the number of rows in current group is enough, let's return to the invoke function + if (pInfo->pRes->info.rows > pInfo->threshold) { + return pInfo->pRes; + } } - - return pBlock; } diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f5d9a6a17e1cf6784355d7e7b6f198dcc1fb9248..edb673d1670ca758f087ab9f83dec7d780531b6b 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1757,6 +1757,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow pSql->res.numOfRows = 0; code = doPackSendDataBlock(pSql, pInsertParam, pTableMeta, count, pTableDataBlock); if (code != TSDB_CODE_SUCCESS) { + pParentSql->res.code = code; goto _error; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 14dfee905e1231739b29c4965091e3cd0910fc08..b9e13865f8ea0d4aaf143c4e47fde8ec69e9855e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -422,7 +422,6 @@ int32_t readFromFile(char *name, uint32_t *len, void **buf) { return TSDB_CODE_TSC_APP_ERROR; } close(fd); - tfree(*buf); return TSDB_CODE_SUCCESS; } @@ -926,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { pQueryInfo = pCmd->active; pQueryInfo->pUdfInfo = pUdfInfo; pQueryInfo->udfCopy = true; - } } @@ -6917,7 +6915,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo const char* msg1 = "interval not allowed in group by normal column"; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* tagSchema = NULL; @@ -8439,6 +8436,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS if (taosArrayGetSize(subInfo->pSubquery) >= 2) { return invalidOperationMsg(msgBuf, "not support union in subquery"); } + SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); tscInitQueryInfo(pSub); @@ -8461,6 +8459,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS if (pTableMetaInfo1 == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub); pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta); @@ -8544,7 +8543,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf // check if there is 3 level select SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i); SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); - if (p->from->type == SQL_NODE_FROM_SUBQUERY){ + if (p->from->type == SQL_NODE_FROM_SUBQUERY) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); } @@ -8636,6 +8635,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } } + // disable group result mixed up if interval/session window query exists. + if (isTimeWindowQuery(pQueryInfo)) { + size_t num = taosArrayGetSize(pQueryInfo->pUpstream); + for(int32_t i = 0; i < num; ++i) { + SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i); + pUp->multigroupResult = false; + } + } + // parse the having clause in the first place int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index efdc644f2c8637a48b78f122ab847603609dd3f8..ca55bce82aa7916057a9b8a0ecd1e40734392962 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2265,7 +2265,7 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { destroySup(pSup); taos_free_result(pSql); - parent->res.code = code; + parent->res.code = c; tscAsyncResultOnError(parent); return; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ff4d0e43e6a12c7f92e301d19f8688e48ff88283..5b919f24735bb0d9d107cf50364dc306759daf01 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3093,6 +3093,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { pQueryInfo->slimit.offset = 0; pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->window = TSWINDOW_INITIALIZER; + pQueryInfo->multigroupResult = true; } int32_t tscAddQueryInfo(SSqlCmd* pCmd) { @@ -3104,7 +3105,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) { } tscInitQueryInfo(pQueryInfo); - pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer if (pCmd->pQueryInfo == NULL) { @@ -3186,6 +3186,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { pQueryInfo->window = pSrc->window; pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->pTableMetaInfo = NULL; + pQueryInfo->multigroupResult = pSrc->multigroupResult; pQueryInfo->bufLen = pSrc->bufLen; pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery; @@ -3585,7 +3586,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->bufLen = pQueryInfo->bufLen; pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); - + pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult; pNewQueryInfo->distinct = pQueryInfo->distinct; if (pNewQueryInfo->buf == NULL) { @@ -3810,12 +3811,12 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) { pParentSql->res.code = code; - + tscAsyncResultOnError(pParentSql); return; } - tscFreeSubobj(pParentSql); + tscFreeSubobj(pParentSql); tfree(pParentSql->pSubs); pParentSql->res.code = TSDB_CODE_SUCCESS; @@ -3824,9 +3825,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tstrerror(code), pParentSql->retry); - + tscResetSqlCmd(&pParentSql->cmd, true); - + code = tsParseSql(pParentSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; @@ -3895,7 +3896,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pNew->maxRetry = pSql->maxRetry; pNew->cmd.resColumnId = TSDB_RES_COL_ID; - + tsem_init(&pNew->rspSem, 0, 0); SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id @@ -4431,7 +4432,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, size_t sz = 0; STableMeta* pChild = *ppChild; STableMeta* pChild1; - + taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz); // tableMeta exists, build child table meta according to the super table meta @@ -4442,9 +4443,9 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, int32_t tableMetaSize = sizeof(STableMeta) + totalBytes; if (*tableMetaCapacity < tableMetaSize) { pChild1 = realloc(pChild, tableMetaSize); - if(pChild1 == NULL) + if(pChild1 == NULL) return -1; - pChild = pChild1; + pChild = pChild1; *tableMetaCapacity = (size_t)tableMetaSize; } @@ -4716,6 +4717,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->sw = pQueryInfo->sessionWindow; pQueryAttr->stateWindow = pQueryInfo->stateWindow; + pQueryAttr->multigroupResult = pQueryInfo->multigroupResult; pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfOutput = numOfOutput; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 2b9f952d76461fdd516a8f205cfa3749a4bc0012..7aab8dbca747f40a9f93ee826cea5f9a62a7b297 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -219,6 +219,7 @@ typedef struct SQueryAttr { bool distinct; // distinct query or not bool stateWindow; // window State on sub/normal table bool createFilterOperator; // if filter operator is needed + bool multigroupResult; // multigroup result can exist in one SSDataBlock int32_t interBufSize; // intermediate buffer sizse int32_t havingNum; // having expr number @@ -460,16 +461,23 @@ typedef struct SLimitOperatorInfo { } SLimitOperatorInfo; typedef struct SSLimitOperatorInfo { - int64_t groupTotal; - int64_t currentGroupOffset; - - int64_t rowsTotal; - int64_t currentOffset; - SLimitVal limit; - SLimitVal slimit; - - char **prevRow; - SArray *orderColumnList; + int64_t groupTotal; + int64_t currentGroupOffset; + + int64_t rowsTotal; + int64_t currentOffset; + SLimitVal limit; + SLimitVal slimit; + + char **prevRow; + SArray *orderColumnList; + bool hasPrev; + bool ignoreCurrentGroup; + bool multigroupResult; + SSDataBlock *pRes; // result buffer + SSDataBlock *pPrevBlock; + int64_t capacity; + int64_t threshold; } SSLimitOperatorInfo; typedef struct SFilterOperatorInfo { @@ -481,8 +489,9 @@ typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; SSDataBlock *pRes; int64_t totalInputRows; - + void **p; SSDataBlock *existNewGroupBlock; + bool multigroupResult; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { @@ -544,9 +553,9 @@ typedef struct SMultiwayMergeInfo { bool hasDataBlockForNewGroup; SSDataBlock *pExistBlock; - bool hasPrev; - bool groupMix; SArray *udfInfo; + bool hasPrev; + bool multiGroupResults; } SMultiwayMergeInfo; // todo support the disk-based sort @@ -568,7 +577,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -577,10 +586,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t numOfRows, void* merger, bool groupMix); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo); + int32_t numOfRows, void* merger); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); diff --git a/src/query/inc/qTableMeta.h b/src/query/inc/qTableMeta.h index 0dae74ac8224329810064c773ecd478d4a89ad15..54de5185b02194bc7cd55a609cfee3815ace2f4f 100644 --- a/src/query/inc/qTableMeta.h +++ b/src/query/inc/qTableMeta.h @@ -148,6 +148,7 @@ typedef struct SQueryInfo { bool orderProjectQuery; bool stateWindow; bool globalMerge; + bool multigroupResult; } SQueryInfo; /** diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e55c798a02af588044b9a130f5a3d0aafdfb2ea3..2bd5723d5a60169fc7417daea2825fdd2a240716 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -39,15 +39,12 @@ #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) - #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define MULTI_KEY_DELIM "-" -#define HASH_CAPACITY_LIMIT 10000000 - #define TIME_WINDOW_COPY(_dst, _src) do {\ (_dst).skey = (_src).skey;\ (_dst).ekey = (_src).ekey;\ @@ -968,8 +965,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t break; } } - - return; } static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, @@ -1338,7 +1333,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } else { pCtx[k].start.ptr = (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes; } - + pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes; } } @@ -1627,7 +1622,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); preWin = win; - + int32_t prevEndPos = (forwardStep - 1) * step + startPos; startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); if (startPos < 0) { @@ -2266,30 +2261,30 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Fill: { SOperatorInfo* pInfo = pRuntimeEnv->proot; - pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); + pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); break; } case OP_MultiwayMergeSort: { - bool groupMix = true; - if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { - groupMix = false; - } - - pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, - 4096, merger, groupMix); // TODO hack it + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); break; } - case OP_GlobalAggregate: { + case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. + bool multigroupResult = pQueryAttr->multigroupResult; + if (pQueryAttr->multigroupResult) { + multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE); + } + pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo); + pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); break; } case OP_SLimit: { - pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger); + int32_t num = pRuntimeEnv->proot->numOfOutput; + SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; + pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); break; } @@ -3648,7 +3643,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; - // re-estabilish output buffer pointer. + // set the correct pointer after the memory buffer reallocated. int32_t functionId = pBInfo->pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; @@ -4213,6 +4208,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti // refactor : extract method SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); + //add condition (pBlock->info.rows >= 1) just to runtime happy if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) { STimeWindow* w = &pBlock->info.window; @@ -4292,15 +4288,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } -int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) { - void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); +int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); - p[i] = pColInfoData->pData; + p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows); } - pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity); - tfree(p); + int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows); + pOutput->info.rows += numOfRows; + return pOutput->info.rows; } @@ -5344,11 +5340,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); + pInfo->pRes = destroyOutputBuf(pInfo->pRes); tfree(pInfo->prevRow); } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, - SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) { + SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); pInfo->resultRowFactor = @@ -5356,15 +5353,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx - pInfo->pMerge = param; - pInfo->bufCapacity = 4096; - pInfo->udfInfo = pUdfInfo; - - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - - pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->multiGroupResults = groupResultMixedUp; + pInfo->pMerge = param; + pInfo->bufCapacity = 4096; + pInfo->udfInfo = pUdfInfo; + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); // TODO refactor int32_t len = 0; @@ -5417,17 +5413,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, } SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, - int32_t numOfRows, void *merger, bool groupMix) { + int32_t numOfRows, void *merger) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - pInfo->pMerge = merger; - pInfo->groupMix = groupMix; - pInfo->bufCapacity = numOfRows; - + pInfo->pMerge = merger; + pInfo->bufCapacity = numOfRows; pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - { + { // todo extract method to create prev compare buffer int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { len += pExpr[i].base.colBytes; @@ -5435,8 +5429,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfCols; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -5452,7 +5446,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->numOfOutput = numOfOutput; + pOperator->pExpr = pExpr; pOperator->exec = doMultiwayMergeSort; pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; @@ -6362,19 +6357,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } -static SSDataBlock* doFill(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SFillOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - +static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); - return pInfo->pRes; + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) { + return; + } } // handle the cached new group data block @@ -6386,12 +6375,48 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; - return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; + } +} + +static SSDataBlock* doFill(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + SFillOperatorInfo *pInfo = pOperator->info; + pInfo->pRes->info.rows = 0; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; } + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { + return pInfo->pRes; + } +// if (taosFillHasMoreResults(pInfo->pFillInfo)) { +// *newgroup = false; +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return pInfo->pRes; +// } +// +// // handle the cached new group data block +// if (pInfo->existNewGroupBlock) { +// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; +// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); +// +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); +// +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); +// pInfo->existNewGroupBlock = NULL; +// *newgroup = true; +// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +// } + while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); @@ -6405,8 +6430,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { pInfo->existNewGroupBlock = pBlock; *newgroup = false; - // fill the previous group data block - // before handle a new data block, close the fill operation for previous group data block + // Fill the previous group data block, before handle the data block of new group. + // Close the fill operation for previous group data block taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { if (pBlock == NULL) { @@ -6418,28 +6443,61 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { pInfo->totalInputRows += pBlock->info.rows; - - int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey - : */pBlock->info.window.ekey; - - taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey); + taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); } } - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); - if (pInfo->pRes->info.rows > 0) { // current group has no more result to return - return pInfo->pRes; + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + + // current group has no more result to return + if (pInfo->pRes->info.rows > 0) { + // 1. The result in current group not reach the threshold of output result, continue + // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) { + return pInfo->pRes; + } + + doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { + return pInfo->pRes; + } + +// if (taosFillHasMoreResults(pInfo->pFillInfo)) { +// *newgroup = false; +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return pInfo->pRes; +// } +// +// // handle the cached new group data block +// if (pInfo->existNewGroupBlock) { +// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; +// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); +// +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); +// +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); +// pInfo->existNewGroupBlock = NULL; +// *newgroup = true; +// +// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { +// return pInfo->pRes; +// } +// +//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +// } + } else if (pInfo->existNewGroupBlock) { // try next group pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey - :*/ pInfo->existNewGroupBlock->info.window.ekey; + int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; @@ -6447,7 +6505,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } else { return NULL; } - // return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } } @@ -6548,6 +6605,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = destroyOutputBuf(pInfo->pRes); + tfree(pInfo->p); } static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { @@ -6891,10 +6949,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato return pOperator; } -SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, - int32_t numOfOutput) { +SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + pInfo->multigroupResult = multigroupResult; { SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6909,6 +6967,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + + pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES); } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6928,7 +6988,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) { +SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -6936,9 +6996,11 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->slimit = pQueryAttr->slimit; pInfo->limit = pQueryAttr->limit; - + pInfo->capacity = pRuntimeEnv->resultInfo.capacity; + pInfo->threshold = (int64_t) (pInfo->capacity * 0.8); + pInfo->currentOffset = pQueryAttr->limit.offset; pInfo->currentGroupOffset = pQueryAttr->slimit.offset; - pInfo->currentOffset = pQueryAttr->limit.offset; + pInfo->multigroupResult= multigroupResult; // TODO refactor int32_t len = 0; @@ -6946,10 +7008,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator len += pExpr[i].base.resBytes; } - int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; + int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfCols; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -6957,6 +7019,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator offset += pExpr[index->colIndex].base.resBytes; } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SLimitOperator"; @@ -7195,8 +7259,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pOperator->status = OP_EXEC_DONE; break; } - - // ensure result output buf + + // ensure result output buf if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { int32_t newSize = pRes->info.rows + pBlock->info.rows; for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { @@ -7219,7 +7283,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; memcpy(start, val, pDistDataInfo->bytes); @@ -7237,7 +7301,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); - + pInfo->totalBytes = 0; pInfo->buf = NULL; pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 1a86bbae36697224585522b5be836c61394c7cc4..cdcc164152dddbc34d03508a2bdd7379d6e50892 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); pFillInfo->pData[i] = pColData->pData; - if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer + if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; assert (pTag->col.colId == pCol->col.colId); memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? diff --git a/tests/pytest/functions/queryTestCases.py b/tests/pytest/functions/queryTestCases.py index 081669bbb9bc866da1ccd297ba1e762c698de341..e69cb8f4682b130d6c2182b005544afaf41c16c5 100644 --- a/tests/pytest/functions/queryTestCases.py +++ b/tests/pytest/functions/queryTestCases.py @@ -15,6 +15,7 @@ import sys import subprocess import random import math +import numpy as np from util.log import * from util.cases import * @@ -57,16 +58,33 @@ class TDTestCase: def td3690(self): tdLog.printNoPrefix("==========TD-3690==========") + + tdSql.prepare() + + tdSql.execute("show variables") + res_off = tdSql.cursor.fetchall() + resList = np.array(res_off) + index = np.where(resList == "offlineThreshold") + index_value = np.dstack((index[0])).squeeze() tdSql.query("show variables") - tdSql.checkData(53, 1, 864000) + tdSql.checkData(index_value, 1, 864000) def td4082(self): tdLog.printNoPrefix("==========TD-4082==========") + + tdSql.prepare() + cfgfile = self.getCfgFile() max_compressMsgSize = 100000000 + tdSql.execute("show variables") + res_com = tdSql.cursor.fetchall() + rescomlist = np.array(res_com) + cpms_index = np.where(rescomlist == "compressMsgSize") + index_value = np.dstack((cpms_index[0])).squeeze() + tdSql.query("show variables") - tdSql.checkData(26, 1, -1) + tdSql.checkData(index_value, 1, -1) tdSql.query("show dnodes") index = tdSql.getData(0, 0) @@ -80,7 +98,7 @@ class TDTestCase: tdDnodes.start(index) tdSql.query("show variables") - tdSql.checkData(26, 1, 100000000) + tdSql.checkData(index_value, 1, 100000000) tdDnodes.stop(index) cmd = f"sed -i '$s/{max_compressMsgSize}/{max_compressMsgSize+10}/g' {cfgfile} " @@ -91,7 +109,7 @@ class TDTestCase: tdDnodes.start(index) tdSql.query("show variables") - tdSql.checkData(26, 1, -1) + tdSql.checkData(index_value, 1, -1) tdDnodes.stop(index) cmd = f"sed -i '$d' {cfgfile}" @@ -104,8 +122,12 @@ class TDTestCase: def td4097(self): tdLog.printNoPrefix("==========TD-4097==========") + tdSql.execute("drop database if exists db") tdSql.execute("drop database if exists db1") + tdDnodes.stop(1) + tdDnodes.start(1) + tdSql.execute("create database if not exists db keep 3650") tdSql.execute("create database if not exists db1 keep 3650") tdSql.execute("create database if not exists new keep 3650") @@ -267,10 +289,22 @@ class TDTestCase: # keep ~ [days,365000] tdSql.execute("drop database if exists db") tdSql.execute("create database if not exists db") + + tdSql.execute("show variables") + res_kp = tdSql.cursor.fetchall() + resList = np.array(res_kp) + keep_index = np.where(resList == "keep") + index_value = np.dstack((keep_index[0])).squeeze() + tdSql.query("show variables") - tdSql.checkData(38, 1, 3650) + tdSql.checkData(index_value, 1, 3650) + tdSql.query("show databases") - tdSql.checkData(0,7,"3650,3650,3650") + selfPath = os.path.dirname(os.path.realpath(__file__)) + if ("community" in selfPath): + tdSql.checkData(0, 7, "3650,3650,3650") + else: + tdSql.checkData(0, 7, 3650) days = tdSql.getData(0, 6) tdSql.error("alter database db keep 3650001") @@ -289,14 +323,22 @@ class TDTestCase: tdSql.execute("alter database db keep 36500") tdSql.query("show databases") - tdSql.checkData(0, 7, "3650,3650,36500") + if ("community" in selfPath): + tdSql.checkData(0, 7, "36500,36500,36500") + else: + tdSql.checkData(0, 7, 36500) + tdSql.execute("drop database if exists db") tdSql.execute("create database if not exists db1") tdSql.query("show databases") - tdSql.checkData(0, 7, "3650,3650,3650") + if ("community" in selfPath): + tdSql.checkData(0, 7, "3650,3650,3650") + else: + tdSql.checkData(0, 7, 3650) + tdSql.query("show variables") - tdSql.checkData(38, 1, 3650) + tdSql.checkData(index_value, 1, 3650) tdSql.execute("alter database db1 keep 365") tdSql.execute("drop database if exists db1") @@ -697,10 +739,8 @@ class TDTestCase: tdSql.checkRows(tbnum*3) tdSql.query(f"select distinct c1 c2, c2 c3 from t1 where c1 <{tbnum}") tdSql.checkRows(3) - tdSql.query("select distinct c1, c2 from stb1 order by ts") - tdSql.checkRows(tbnum*3+1) - tdSql.query("select distinct c1, c2 from t1 order by ts") - tdSql.checkRows(4) + tdSql.error("select distinct c1, c2 from stb1 order by ts") + tdSql.error("select distinct c1, c2 from t1 order by ts") tdSql.error("select distinct c1, ts from stb1 group by c2") tdSql.error("select distinct c1, ts from t1 group by c2") tdSql.error("select distinct c1, max(c2) from stb1 ") @@ -1085,9 +1125,9 @@ class TDTestCase: def run(self): # master branch - # self.td3690() - # self.td4082() - # self.td4288() + self.td3690() + self.td4082() + self.td4288() # self.td4724() # self.td5798() # self.td5935() diff --git a/tests/pytest/functions/showOfflineThresholdIs864000.py b/tests/pytest/functions/showOfflineThresholdIs864000.py index 57d0b1921b0a3f81e28426e340783ca7e0268c00..7462d4cd72f600674fcb82aa1224019787d23fd5 100644 --- a/tests/pytest/functions/showOfflineThresholdIs864000.py +++ b/tests/pytest/functions/showOfflineThresholdIs864000.py @@ -12,6 +12,8 @@ # -*- coding: utf-8 -*- import sys +import numpy as np + from util.log import * from util.cases import * from util.sql import * @@ -24,8 +26,17 @@ class TDTestCase: tdSql.init(conn.cursor(), logSql) def run(self): + # tdSql.query("show variables") + # tdSql.checkData(54, 1, 864000) + tdSql.execute("show variables") + res = tdSql.cursor.fetchall() + resList = np.array(res) + index = np.where(resList == "offlineThreshold") + index_value = np.dstack((index[0])).squeeze() tdSql.query("show variables") - tdSql.checkData(54, 1, 864000) + tdSql.checkData(index_value, 1, 864000) + pass + def stop(self): tdSql.close() diff --git a/tests/pytest/query/query.py b/tests/pytest/query/query.py index 8cec38780e5e873cb07d8ade45ef56b5d4ee43f7..eaec42721a8d78d8469e788732f4277e2d321939 100644 --- a/tests/pytest/query/query.py +++ b/tests/pytest/query/query.py @@ -1,132 +1,139 @@ -################################################################### -# Copyright (c) 2016 by TAOS Technologies, Inc. -# All rights reserved. -# -# This file is proprietary and confidential to TAOS Technologies. -# No part of this file may be reproduced, stored, transmitted, -# disclosed or used in any form or by any means other than as -# expressly provided by the written permission from Jianhui Tao -# -################################################################### - -# -*- coding: utf-8 -*- - -import sys -import taos -from util.log import tdLog -from util.cases import tdCases -from util.sql import tdSql -from util.dnodes import tdDnodes - -class TDTestCase: - def init(self, conn, logSql): - tdLog.debug("start to execute %s" % __file__) - tdSql.init(conn.cursor(), logSql) - - self.ts = 1538548685000 - - def run(self): - tdSql.prepare() - - print("==============step1") - tdSql.execute( - "create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))") - tdSql.execute( - 'CREATE TABLE if not exists dev_001 using st tags("dev_01")') - tdSql.execute( - 'CREATE TABLE if not exists dev_002 using st tags("dev_02")') - - print("==============step2") - - tdSql.execute( - """INSERT INTO dev_001(ts, tagtype) VALUES('2020-05-13 10:00:00.000', 1), - ('2020-05-13 10:00:00.001', 1) - dev_002 VALUES('2020-05-13 10:00:00.001', 1)""") - - tdSql.query("select * from db.st where ts='2020-05-13 10:00:00.000'") - tdSql.checkRows(1) - - tdSql.query("select tbname, dev from dev_001") - tdSql.checkRows(1) - tdSql.checkData(0, 0, 'dev_001') - tdSql.checkData(0, 1, 'dev_01') - - tdSql.query("select tbname, dev, tagtype from dev_001") - tdSql.checkRows(2) - tdSql.checkData(0, 0, 'dev_001') - tdSql.checkData(0, 1, 'dev_01') - tdSql.checkData(0, 2, 1) - tdSql.checkData(1, 0, 'dev_001') - tdSql.checkData(1, 1, 'dev_01') - tdSql.checkData(1, 2, 1) - - ## test case for https://jira.taosdata.com:18080/browse/TD-2488 - tdSql.execute("create table m1(ts timestamp, k int) tags(a int)") - tdSql.execute("create table t1 using m1 tags(1)") - tdSql.execute("create table t2 using m1 tags(2)") - tdSql.execute("insert into t1 values('2020-1-1 1:1:1', 1)") - tdSql.execute("insert into t1 values('2020-1-1 1:10:1', 2)") - tdSql.execute("insert into t2 values('2020-1-1 1:5:1', 99)") - - tdSql.query("select count(*) from m1 where ts = '2020-1-1 1:5:1' ") - tdSql.checkRows(1) - tdSql.checkData(0, 0, 1) - - tdDnodes.stop(1) - tdDnodes.start(1) - - tdSql.query("select count(*) from m1 where ts = '2020-1-1 1:5:1' ") - tdSql.checkRows(1) - tdSql.checkData(0, 0, 1) - - ## test case for https://jira.taosdata.com:18080/browse/TD-1930 - tdSql.execute("create table tb(ts timestamp, c1 int, c2 binary(10), c3 nchar(10), c4 float, c5 bool)") - for i in range(10): - tdSql.execute("insert into tb values(%d, %d, 'binary%d', 'nchar%d', %f, %d)" % (self.ts + i, i, i, i, i + 0.1, i % 2)) - - tdSql.error("select * from tb where c2 = binary2") - tdSql.error("select * from tb where c3 = nchar2") - - tdSql.query("select * from tb where c2 = 'binary2' ") - tdSql.checkRows(1) - - tdSql.query("select * from tb where c3 = 'nchar2' ") - tdSql.checkRows(1) - - tdSql.query("select * from tb where c1 = '2' ") - tdSql.checkRows(1) - - tdSql.query("select * from tb where c1 = 2 ") - tdSql.checkRows(1) - - tdSql.query("select * from tb where c4 = '0.1' ") - tdSql.checkRows(1) - - tdSql.query("select * from tb where c4 = 0.1 ") - tdSql.checkRows(1) - - tdSql.query("select * from tb where c5 = true ") - tdSql.checkRows(5) - - tdSql.query("select * from tb where c5 = 'true' ") - tdSql.checkRows(5) - - # For jira: https://jira.taosdata.com:18080/browse/TD-2850 - tdSql.execute("create database 'Test' ") - tdSql.execute("use 'Test' ") - tdSql.execute("create table 'TB'(ts timestamp, 'Col1' int) tags('Tag1' int)") - tdSql.execute("insert into 'Tb0' using tb tags(1) values(now, 1)") - tdSql.query("select * from tb") - tdSql.checkRows(1) - - tdSql.query("select * from tb0") - tdSql.checkRows(1) - - - def stop(self): - tdSql.close() - tdLog.success("%s successfully executed" % __file__) - - -tdCases.addWindows(__file__, TDTestCase()) -tdCases.addLinux(__file__, TDTestCase()) +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql +from util.dnodes import tdDnodes + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.ts = 1538548685000 + + def run(self): + tdSql.prepare() + + print("==============step1") + tdSql.execute( + "create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))") + tdSql.execute( + 'CREATE TABLE if not exists dev_001 using st tags("dev_01")') + tdSql.execute( + 'CREATE TABLE if not exists dev_002 using st tags("dev_02")') + + print("==============step2") + + tdSql.execute( + """INSERT INTO dev_001(ts, tagtype) VALUES('2020-05-13 10:00:00.000', 1), + ('2020-05-13 10:00:00.001', 1) + dev_002 VALUES('2020-05-13 10:00:00.001', 1)""") + + tdSql.query("select * from db.st where ts='2020-05-13 10:00:00.000'") + tdSql.checkRows(1) + + tdSql.query("select tbname, dev from dev_001") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 'dev_001') + tdSql.checkData(0, 1, 'dev_01') + + tdSql.query("select tbname, dev, tagtype from dev_001") + tdSql.checkRows(2) + tdSql.checkData(0, 0, 'dev_001') + tdSql.checkData(0, 1, 'dev_01') + tdSql.checkData(0, 2, 1) + tdSql.checkData(1, 0, 'dev_001') + tdSql.checkData(1, 1, 'dev_01') + tdSql.checkData(1, 2, 1) + + ## test case for https://jira.taosdata.com:18080/browse/TD-2488 + tdSql.execute("create table m1(ts timestamp, k int) tags(a int)") + tdSql.execute("create table t1 using m1 tags(1)") + tdSql.execute("create table t2 using m1 tags(2)") + tdSql.execute("insert into t1 values('2020-1-1 1:1:1', 1)") + tdSql.execute("insert into t1 values('2020-1-1 1:10:1', 2)") + tdSql.execute("insert into t2 values('2020-1-1 1:5:1', 99)") + + tdSql.query("select count(*) from m1 where ts = '2020-1-1 1:5:1' ") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + + tdDnodes.stop(1) + tdDnodes.start(1) + + tdSql.query("select count(*) from m1 where ts = '2020-1-1 1:5:1' ") + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + + ## test case for https://jira.taosdata.com:18080/browse/TD-1930 + tdSql.execute("create table tb(ts timestamp, c1 int, c2 binary(10), c3 nchar(10), c4 float, c5 bool)") + for i in range(10): + tdSql.execute( + "insert into tb values(%d, %d, 'binary%d', 'nchar%d', %f, %d)" % (self.ts + i, i, i, i, i + 0.1, i % 2)) + + tdSql.error("select * from tb where c2 = binary2") + tdSql.error("select * from tb where c3 = nchar2") + + tdSql.query("select * from tb where c2 = 'binary2' ") + tdSql.checkRows(1) + + tdSql.query("select * from tb where c3 = 'nchar2' ") + tdSql.checkRows(1) + + tdSql.query("select * from tb where c1 = '2' ") + tdSql.checkRows(1) + + tdSql.query("select * from tb where c1 = 2 ") + tdSql.checkRows(1) + + tdSql.query("select * from tb where c4 = '0.1' ") + tdSql.checkRows(1) + + tdSql.query("select * from tb where c4 = 0.1 ") + tdSql.checkRows(1) + + tdSql.query("select * from tb where c5 = true ") + tdSql.checkRows(5) + + tdSql.query("select * from tb where c5 = 'true' ") + tdSql.checkRows(5) + + # For jira: https://jira.taosdata.com:18080/browse/TD-2850 + tdSql.execute("create database 'Test' ") + tdSql.execute("use 'Test' ") + tdSql.execute("create table 'TB'(ts timestamp, 'Col1' int) tags('Tag1' int)") + tdSql.execute("insert into 'Tb0' using tb tags(1) values(now, 1)") + tdSql.query("select * from tb") + tdSql.checkRows(1) + + # For jira:https://jira.taosdata.com:18080/browse/TD-6314 + tdSql.execute("use db") + tdSql.execute("create stable stb_001(ts timestamp,v int) tags(c0 int)") + tdSql.query("select _block_dist() from stb_001") + tdSql.checkRows(1) + + tdSql.query("select * from tb0") + tdSql.checkRows(1) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())