diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 52169226436b3aa75fe76ca073de756b7b227550..2d77905d866061e0a454af094a8928e5771d94e7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1246,7 +1246,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { } int32_t newRows = (payloadSize - additional) / rowSize; - ASSERT(newRows <= nRows && newRows > 1); + ASSERT(newRows <= nRows && newRows >= 1); return newRows; } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index def6c06896149c4f7c871099d1d9dc7166dc2dd1..6b70825ed46bdfffb703d3c508971a22ec145b82 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -257,6 +257,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { terrno = rowsRead; mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id); mndReleaseShowObj(pShow, true); + blockDataDestroy(pBlock); return -1; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b36d3566bf0abaf3c12760cb6fdc7311195e75da..dc12f47c655b7b80836a64d2c6eb31284bc38e46 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -94,10 +94,8 @@ typedef struct SLimit { typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; typedef struct STaskCostInfo { - int64_t created; - int64_t start; - int64_t end; - + int64_t created; + int64_t start; uint64_t loadStatisTime; uint64_t loadFileBlockTime; uint64_t loadDataInCacheTime; @@ -185,7 +183,7 @@ typedef struct SExecTaskInfo { STaskCostInfo cost; int64_t owner; // if it is in execution int32_t code; - uint64_t totalRows; // total number of rows +// uint64_t totalRows; // total number of rows struct { char *tablename; char *dbname; @@ -676,6 +674,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI SArray* pColList); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 6689def7a756c70e0adf36171a61c8f2279d11f7..7757825733153741d6e83404051578f7f4e2aef8 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -126,8 +126,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { pTaskInfo->code = ret; cleanUpUdfs(); - qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), - tstrerror(pTaskInfo->code)); + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; } @@ -142,12 +141,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { *useconds = pTaskInfo->cost.elapsedTime; } + cleanUpUdfs(); + int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; - pTaskInfo->totalRows += current; + uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; - cleanUpUdfs(); qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", - GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); + GET_TASKID(pTaskInfo), current, total, 0, el/1000.0); atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; @@ -197,7 +197,7 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { void qDestroyTask(qTaskInfo_t qTaskHandle) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; - qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows); + qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); queryCostStatis(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a4a4b2b50657c79983a25c244b15213c506a429a..34f9c528a35ba46ea2e979c100c46dd6d8b972db 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3938,6 +3938,21 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) { + *defaultPgsz = 4096; + while (*defaultPgsz < rowSize * 4) { + *defaultPgsz <<= 1u; + } + + // at least four pages need to be in buffer + *defaultBufsz = 4096 * 256; + if ((*defaultBufsz) <= (*defaultPgsz)) { + (*defaultBufsz) = (*defaultPgsz) * 4; + } + + return 0; +} + int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3950,18 +3965,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n return TSDB_CODE_OUT_OF_MEMORY; } - uint32_t defaultPgsz = 4096; - while (defaultPgsz < pAggSup->resultRowSize * 4) { - defaultPgsz <<= 1u; - } - - // at least four pages need to be in buffer - int32_t defaultBufsz = 4096 * 256; - if (defaultBufsz <= defaultPgsz) { - defaultBufsz = defaultPgsz * 4; - } + uint32_t defaultPgsz = 0; + uint32_t defaultBufsz = 0; + getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); - int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); + int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2600c170606c1f0b309d0662cd7b8390672343fe..3ab296f62e127db9f36797f04c1f1543460333d2 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -439,7 +439,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { memcpy(data + (*columnLen), src, varDataTLen(src)); int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); ASSERT(v > 0); - printf("len:%d\n", v); contentLen = varDataTLen(src); } @@ -490,16 +489,13 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf int32_t *rows = (int32_t*) pPage; if (*rows >= pInfo->rowCapacity) { + // release buffer + releaseBufPage(pInfo->pBuf, pPage); + // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); taosArrayPush(p->pPageList, &pageId); - -// // number of rows -// *(int32_t*) pPage = 0; -// -// uint64_t* groupId = (pPage + sizeof(int32_t)); -// *groupId = 0; memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } } @@ -566,6 +562,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); pInfo->pageIndex += 1; + releaseBufPage(pInfo->pBuf, page); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; @@ -631,7 +628,11 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, TD_TMP_DIR_PATH); + uint32_t defaultPgsz = 0; + uint32_t defaultBufsz = 0; + getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz); + + int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 12496eec551c3793cede8b6e065b31d4e554052e..0d47595b3e25214482ab8b6442e521ff00ebdc05 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -707,6 +707,7 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int16_t inputType = GET_PARAM_TYPE(&pInput[0]); + int16_t inputLen = GET_PARAM_BYTES(&pInput[0]); int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); @@ -718,15 +719,15 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp colDataAppendNULL(pOutput->columnData, i); continue; } + char *input = colDataGetData(pInput[0].columnData, i); switch(outputType) { case TSDB_DATA_TYPE_BIGINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - memcpy(output, varDataVal(input), varDataLen(input)); - *(int64_t *)output = taosStr2Int64(output, NULL, 10); + *(int64_t *)output = taosStr2Int64(varDataVal(input), NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1); + char *newBuf = taosMemoryCalloc(1, inputLen); int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); if (len < 0) { taosMemoryFree(newBuf); @@ -742,10 +743,9 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_UBIGINT: { if (inputType == TSDB_DATA_TYPE_BINARY) { - memcpy(output, varDataVal(input), varDataLen(input)); - *(uint64_t *)output = taosStr2UInt64(output, NULL, 10); + *(uint64_t *)output = taosStr2UInt64(varDataVal(input), NULL, 10); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1); + char *newBuf = taosMemoryCalloc(1, inputLen); int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf); if (len < 0) { taosMemoryFree(newBuf);