diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 65a91fe42642346a66b4f399fac89dbb849cb447..dc83f7bc439f90192aa3ce1d3d98062ea283fd55 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -689,16 +689,15 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) { .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE}; SAppInstInfo* pAppInfo = getAppInfo(pRequest); - if (TSDB_CODE_SUCCESS == code) { code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList); - tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); } if (TSDB_CODE_SUCCESS == code) { schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest); } else { + tscError("0x%"PRIx64" failed to create query plan, code:%s 0x%"PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); pRequest->body.queryFp(pRequest->body.param, pRequest, code); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index f9825c50ffaf77133702cf9b1afaae0242f5bbb5..dc15c0b13d9874bb8302238d259ab477fd02615f 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -778,9 +778,9 @@ TEST(testCase, async_api_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - taos_query(pConn, "use test"); + taos_query(pConn, "use nest"); - TAOS_RES* pRes = taos_query(pConn, "desc abc1.tu"); + TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;"); if (taos_errno(pRes) != 0) { printf("failed, reason:%s\n", taos_errstr(pRes)); } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 38be0b8f0a176f0796c1a1b7db9d0ee595fed1cd..d8897c3b9c05773c4c15c484cdc18b49dff77408 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -322,7 +322,6 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p pColumnInfoData->varmeta.length = pSource->varmeta.length; } else { char* tmp = taosMemoryRealloc(pColumnInfoData->nullbitmap, BitmapLen(numOfRows)); - printf("----------------%d\n", BitmapLen(numOfRows)); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a8a95b513afdb2ff33e54f141cdf5a39da55dd35..da681bd13b098f298974cb1a65ffcaf9b40e96eb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -760,7 +760,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index eda24cb0d6aff408562ef091f198f5b263e621ba..b1ad7a1cf26d613ef91301e4a945e24070a9d073 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1821,7 +1821,7 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { if (pFilterNode == NULL) { return; } @@ -2829,7 +2829,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { return seqLoadRemoteData(pOperator); } else { return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); - // return concurrentlyLoadRemoteData(pOperator); } } @@ -2855,9 +2854,14 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) { +static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo, const char* id) { size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints); + if (numOfSources == 0) { + qError("%s invalid number: %d of sources in exchange operator", id, (int32_t) numOfSources); + return TSDB_CODE_INVALID_PARA; + } + pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { @@ -2879,7 +2883,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode goto _error; } - int32_t code = initExchangeOperator(pExNode, pInfo); + int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2908,7 +2912,7 @@ _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; return NULL; } @@ -3677,7 +3681,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - doFilter(pProjectInfo->pFilterNode, pBlock, true); + doFilter(pProjectInfo->pFilterNode, pBlock); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); @@ -5189,7 +5193,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond); if (NULL == (*pTaskInfo)->pRoot) { - code = terrno; + code = (*pTaskInfo)->code; goto _complete; } @@ -5202,7 +5206,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead _complete: taosMemoryFreeClear(*pTaskInfo); - terrno = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5965f5d7adb6bfacbc2ca24ff78776dc9221c88c..132f93a6a533caa2f676009a47afdea8ee6ee869 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { while(1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes, true); + doFilter(pInfo->pCondition, pRes); bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6c513a05b7a07812903059ba42b416e456c3a5c0..637534ad96a12a00736e79c062c09dcb6358b900 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, false); + doFilter(pTableScanInfo->pFilterNode, pBlock); int64_t et = taosGetTimestampMs(); pTableScanInfo->readRecorder.filterTime += (et - st); @@ -939,7 +939,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes); } - doFilter(pInfo->pCondition, pInfo->pRes, false); + doFilter(pInfo->pCondition, pInfo->pRes); blockDataUpdateTsWindow(pInfo->pRes, 0); break; } @@ -1711,7 +1711,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { } pRes->info.rows = count; - doFilter(pInfo->pFilterNode, pRes, true); + doFilter(pInfo->pFilterNode, pRes); pOperator->resultInfo.totalRows += pRes->info.rows; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index dcaa95e28a3d438a6ca7638bc6d925b59dd0aeb8..ff093741fabedb124aa7b68eb09b21018f99d101 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -296,7 +296,10 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { + SArray* pColMatchInfo, SOperatorInfo* pOperator) { + SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -354,6 +357,8 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData } blockDataDestroy(p); + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.rows); return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } @@ -371,7 +376,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, - pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo); + pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ea8ded2c302eddc3b5429aec215952c295e1640a..f63a9351c6e24f95a0f47a779af18638d7bfbcdc 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -227,6 +227,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); + + // set current source id done if (pSource->src.pBlock == NULL) { pSource->src.rowIndex = -1; ++pHandle->numOfCompletedSources; @@ -426,8 +428,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { double sortPass = floorl(log2(numOfSources) / log2(pHandle->numOfPages)); pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs; - qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64, - pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed); + + if (sortPass > 0) { + size_t s = pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0; + qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%" PRIzu + ", sort elapsed:%" PRId64 ", total elapsed:%" PRId64, + pHandle->idStr, (int32_t)(sortPass + 1), s, pHandle->sortElapsed, pHandle->totalElapsed); + } else { + qDebug("%s ordered source:%"PRIzu", available buf:%d, no need internal sort", pHandle->idStr, numOfSources, + pHandle->numOfPages); + } int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize); blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);