diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 020f9f7ccdceef9c8950c96a8078c64c377e5d5f..d2e602a5d64f390580f644ab14bef6f5ddc553ba 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -45,7 +45,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); * @param qId * @return */ -int32_t qCreateExecTask(void* readHandle, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); +int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 362ad9b2d8f2fe66815a426147ad5ee98fc5ec86..6a9eb43ca8194e9d5e40dde7451e03aa18d1c9f4 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -104,8 +104,7 @@ typedef void* tsdbReadHandleT; * @param qinfo query info handle from query processor * @return */ -tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, - void *pRef); +tsdbReadHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); /** * Get the last row of the given query time window for all the tables in STableGroupInfo object. diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 68c7f190db6f53529d70f94611501cb3db25f9f6..372a5820b7d2f3ad2f766871f600af0ce1d0d16b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -132,7 +132,7 @@ typedef struct STsdbReadHandle { bool loadExternalRow; // load time window external data rows bool currentLoadExternalRows; // current load external rows int32_t loadType; // block load type - uint64_t qId; // query info handle, for debug purpose + char *idStr; // query info handle, for debug purpose int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows SDFileSet* pFileGroup; SFSIter fileIter; @@ -306,7 +306,7 @@ static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, S } taosArrayPush(pTableCheckInfo, &info); - tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" 0x%"PRIx64, pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->qId); + tsdbDebug("%p check table uid:%"PRId64" from lastKey:%"PRId64" %s", pTsdbReadHandle, info.tableId, info.lastKey, pTsdbReadHandle->idStr); } } @@ -389,13 +389,13 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* } if (updateTs) { - tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 - ", 0x%" PRIx64, pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey, - pTsdbReadHandle->window.ekey, pTsdbReadHandle->qId); + tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s", + pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey, + pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); } } -static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, STsdbMemTable* pMemRef) { +static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pReadHandle = calloc(1, sizeof(STsdbReadHandle)); if (pReadHandle == NULL) { goto _end; @@ -408,7 +408,6 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, pReadHandle->cur.win = TSWINDOW_INITIALIZER; pReadHandle->checkFiles = true; pReadHandle->activeIndex = 0; // current active table index - pReadHandle->qId = qId; pReadHandle->allocSize = 0; pReadHandle->locateStart = false; pReadHandle->loadType = pCond->type; @@ -417,6 +416,10 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, pReadHandle->loadExternalRow = pCond->loadExternalRows; pReadHandle->currentLoadExternalRows = pCond->loadExternalRows; + char buf[128] = {0}; + snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId); + pReadHandle->idStr = strdup(buf); + if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) { goto _end; } @@ -455,7 +458,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock); if (pReadHandle->pDataCols == NULL) { - tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pReadHandle, pReadHandle->qId); + tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _end; } @@ -471,8 +474,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, return NULL; } -tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, void* pRef) { - STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef); +tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, uint64_t taskId) { + STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { return NULL; } @@ -492,9 +495,7 @@ tsdbReadHandleT* tsdbQueryTables(STsdb* tsdb, STsdbQueryCond* pCond, STableGroup return NULL; } -// tsdbMayTakeMemSnapshot(pTsdbReadHandle, psTable); - - tsdbDebug("%p total numOfTable:%" PRIzu " in query, 0x%"PRIx64, pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), pTsdbReadHandle->qId); + tsdbDebug("%p total numOfTable:%" PRIzu " in query, %s", pTsdbReadHandle, taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), pTsdbReadHandle->idStr); return (tsdbReadHandleT) pTsdbReadHandle; } @@ -575,7 +576,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReadHandleT queryHandle, STsdbQueryCond // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); } -tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pMemRef) { +tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { pCond->twindow = updateLastrowForEachGroup(groupList); // no qualified table @@ -583,7 +584,7 @@ tsdbReadHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroup return NULL; } - STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, pMemRef); + STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, groupList, qId, taskId); if (pTsdbReadHandle == NULL) { return NULL; } @@ -666,7 +667,7 @@ static STableGroupInfo* trimTableGroup(STimeWindow* window, STableGroupInfo* pGr return pNew; } -tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, STsdbMemTable* pRef) { +tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, uint64_t qId, uint64_t taskId) { STableGroupInfo* pNew = trimTableGroup(&pCond->twindow, groupList); if (pNew->numOfTables == 0) { @@ -681,7 +682,7 @@ tsdbReadHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond* pCond } } - STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, pRef); + STsdbReadHandle *pTsdbReadHandle = (STsdbReadHandle*) tsdbQueryTables(tsdb, pCond, pNew, qId, taskId); pTsdbReadHandle->loadExternalRow = true; pTsdbReadHandle->currentLoadExternalRows = true; @@ -734,8 +735,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe SMemRow row = (SMemRow)SL_GET_NODE_DATA(node); TSKEY key = memRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64, - pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->qId); + "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s", + pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -744,7 +745,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe } } else { - tsdbDebug("%p uid:%"PRId64", no data in mem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, pHandle->qId); + tsdbDebug("%p uid:%"PRId64", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); } if (!imemEmpty) { @@ -754,8 +755,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe SMemRow row = (SMemRow)SL_GET_NODE_DATA(node); TSKEY key = memRowKey(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", 0x%"PRIx64, - pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->qId); + "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s", + pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr); if (ASCENDING_TRAVERSE(order)) { assert(pCheckInfo->lastKey <= key); @@ -763,7 +764,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe assert(pCheckInfo->lastKey >= key); } } else { - tsdbDebug("%p uid:%"PRId64", no data in imem, 0x%"PRIx64, pHandle, pCheckInfo->tableId, pHandle->qId); + tsdbDebug("%p uid:%"PRId64", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr); } return true; @@ -952,8 +953,8 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { } pCheckInfo->lastKey = memRowKey(row); // first timestamp in buffer - tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, 0x%"PRIx64, pHandle, - pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->qId); + tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle, + pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr); // all data in mem are checked already. if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_TRAVERSE(pHandle->order)) || @@ -1138,21 +1139,21 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl STSchema *pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p failed to malloc buf for pDataCols, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId); + tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[0], pSchema); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId); + tsdbError("%p failed to malloc buf for rhelper.pDataCols[0], %s", pTsdbReadHandle, pTsdbReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } code = tdInitDataCols(pTsdbReadHandle->rhelper.pDCols[1], pSchema); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId); + tsdbError("%p failed to malloc buf for rhelper.pDataCols[1], %s", pTsdbReadHandle, pTsdbReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _error; } @@ -1188,15 +1189,15 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl int64_t elapsedTime = (taosGetTimestampUs() - st); pTsdbReadHandle->cost.blockLoadTime += elapsedTime; - tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, 0x%"PRIx64, - pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->qId); + tsdbDebug("%p load file block into buffer, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, elapsed time:%"PRId64 " us, %s", + pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, elapsedTime, pTsdbReadHandle->idStr); return TSDB_CODE_SUCCESS; _error: pBlock->numOfRows = 0; - tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, 0x%"PRIx64, - pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->qId); + tsdbError("%p error occurs in loading file block, index:%d, brange:%"PRId64"-%"PRId64", rows:%d, %s", + pTsdbReadHandle, slotIndex, pBlock->keyFirst, pBlock->keyLast, pBlock->numOfRows, pTsdbReadHandle->idStr); return terrno; } @@ -1219,9 +1220,9 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* key = extractFirstTraverseKey(pCheckInfo, pTsdbReadHandle->order, pCfg->update); if (key != TSKEY_INITIAL_VAL) { - tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pTsdbReadHandle, key, pTsdbReadHandle->qId); + tsdbDebug("%p key in mem:%"PRId64", %s", pTsdbReadHandle, key, pTsdbReadHandle->idStr); } else { - tsdbDebug("%p no data in mem, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId); + tsdbDebug("%p no data in mem, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); } if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) || @@ -1288,11 +1289,11 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* assert(cur->blockCompleted); if (cur->rows == binfo.rows) { - tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %"PRIx64, - pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->qId); + tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %s", + pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pTsdbReadHandle->idStr); } else { - tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %"PRIx64, - pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->qId); + tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %s", + pTsdbReadHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pTsdbReadHandle->idStr); } } @@ -1815,8 +1816,8 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pTsdbReadHandle); - tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, 0x%"PRIx64, - pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->qId); + tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s", + pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr); } int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo) { @@ -1868,9 +1869,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); tsdbDebug("%p uid:%" PRIu64" start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d," - "end:%d, 0x%"PRIx64, + "end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, - blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->qId); + blockInfo.rows, cur->pos, endPos, pTsdbReadHandle->idStr); // compared with the data from in-memory buffer, to generate the correct timestamp array list int32_t numOfRows = 0; @@ -2027,8 +2028,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); doCheckGeneratedBlockRange(pTsdbReadHandle); - tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, 0x%"PRIx64, - pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->qId); + tsdbDebug("%p uid:%" PRIu64", data block created, mixblock:%d, brange:%"PRIu64"-%"PRIu64" rows:%d, %s", + pTsdbReadHandle, pCheckInfo->tableId, cur->mixBlock, cur->win.skey, cur->win.ekey, cur->rows, pTsdbReadHandle->idStr); } int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { @@ -2202,13 +2203,13 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu memcpy(pTsdbReadHandle->pDataBlockInfo, sup.pDataBlockInfo[0], sizeof(STableBlockInfo) * numOfBlocks); cleanBlockOrderSupporter(&sup, numOfQualTables); - tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted 0x%"PRIx64, pTsdbReadHandle, cnt, - pTsdbReadHandle->qId); + tsdbDebug("%p create data blocks info struct completed for 1 table, %d blocks not sorted %s", pTsdbReadHandle, cnt, + pTsdbReadHandle->idStr); return TSDB_CODE_SUCCESS; } - tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables 0x%"PRIx64, pTsdbReadHandle, cnt, - numOfQualTables, pTsdbReadHandle->qId); + tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pTsdbReadHandle, cnt, + numOfQualTables, pTsdbReadHandle->idStr); assert(cnt <= numOfBlocks && numOfQualTables <= numOfTables); // the pTableQueryInfo[j]->numOfBlocks may be 0 sup.numOfTables = numOfQualTables; @@ -2244,7 +2245,7 @@ static int32_t createDataBlocksInfo(STsdbReadHandle* pTsdbReadHandle, int32_t nu * } */ - tsdbDebug("%p %d data blocks sort completed, 0x%"PRIx64, pTsdbReadHandle, cnt, pTsdbReadHandle->qId); + tsdbDebug("%p %d data blocks sort completed, %s", pTsdbReadHandle, cnt, pTsdbReadHandle->idStr); cleanBlockOrderSupporter(&sup, numOfTables); free(pTree); @@ -2302,8 +2303,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) || (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); - tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pTsdbReadHandle, - pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->qId); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, + pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); pTsdbReadHandle->pFileGroup = NULL; assert(pTsdbReadHandle->numOfBlocks == 0); break; @@ -2326,8 +2327,8 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi break; } - tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, 0x%"PRIx64, pTsdbReadHandle, numOfBlocks, numOfTables, - pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->qId); + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables, + pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr); assert(numOfBlocks >= 0); if (numOfBlocks == 0) { @@ -2420,8 +2421,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReadHandleT* queryHandle, STableBlockDist* if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) || (!ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.ekey < pTsdbReadHandle->window.ekey)) { tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb)); - tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, 0x%"PRIx64, pTsdbReadHandle, - pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->qId); + tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle, + pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); pTsdbReadHandle->pFileGroup = NULL; break; } @@ -2444,8 +2445,8 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReadHandleT* queryHandle, STableBlockDist* break; } - tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, 0x%"PRIx64, pTsdbReadHandle, numOfBlocks, numOfTables, - pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->qId); + tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pTsdbReadHandle, numOfBlocks, numOfTables, + pTsdbReadHandle->pFileGroup->fid, pTsdbReadHandle->idStr); if (numOfBlocks == 0) { continue; @@ -2499,8 +2500,8 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis if ((!cur->mixBlock) || cur->blockCompleted) { // all data blocks in current file has been checked already, try next file if exists } else { - tsdbDebug("%p continue in current data block, index:%d, pos:%d, 0x%"PRIx64, pTsdbReadHandle, cur->slot, cur->pos, - pTsdbReadHandle->qId); + tsdbDebug("%p continue in current data block, index:%d, pos:%d, %s", pTsdbReadHandle, cur->slot, cur->pos, + pTsdbReadHandle->idStr); int32_t code = handleDataMergeIfNeeded(pTsdbReadHandle, pBlockInfo->compBlock, pCheckInfo); *exists = (pTsdbReadHandle->realNumOfRows > 0); @@ -2624,8 +2625,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } int64_t elapsedTime = taosGetTimestampUs() - st; - tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, 0x%"PRIx64, pTsdbReadHandle, - elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->qId); + tsdbDebug("%p build data block from cache completed, elapsed time:%"PRId64" us, numOfRows:%d, numOfCols:%d, %s", pTsdbReadHandle, + elapsedTime, numOfRows, numOfCols, pTsdbReadHandle->idStr); return numOfRows; } @@ -2937,7 +2938,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) { STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle; if (emptyQueryTimewindow(pTsdbReadHandle)) { - tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pTsdbReadHandle, pTsdbReadHandle->qId); + tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); return false; } @@ -3052,7 +3053,7 @@ bool tsdbNextDataBlock(tsdbReadHandleT pHandle) { // memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); // } // -// pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->qId, pMemRef); +// pSecQueryHandle = tsdbQueryTablesImpl(pTsdbReadHandle->pTsdb, &cond, pTsdbReadHandle->idStr, pMemRef); // tfree(cond.colList); // // // current table, only one table @@ -3910,8 +3911,8 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { SIOCostSummary* pCost = &pTsdbReadHandle->cost; - tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, 0x%"PRIx64, - pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->qId); + tsdbDebug("%p :io-cost summary: head-file read cnt:%"PRIu64", head-file time:%"PRIu64" us, statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %s", + pTsdbReadHandle, pCost->headFileLoad, pCost->headFileLoadTime, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pTsdbReadHandle->idStr); tfree(pTsdbReadHandle); } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 48ff7252bef76706034a2b64920e9ac52f8f864c..b2118942815e01632262bbfa03671034b7c51683 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -73,8 +73,8 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { tb_uid_t uid; int32_t nCols; int32_t nTagCols; - SSchemaWrapper *pSW; - STableMetaRsp * pTbMetaMsg = NULL; + SSchemaWrapper *pSW = NULL; + STableMetaRsp *pTbMetaMsg = NULL; SSchema * pTagSchema; SRpcMsg rpcMsg; int msgLen = 0; @@ -145,15 +145,22 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { _exit: - free(pSW->pSchema); - free(pSW); - free(pTbCfg->name); - free(pTbCfg); - if (pTbCfg->type == META_SUPER_TABLE) { - free(pTbCfg->stbCfg.pTagSchema); - } else if (pTbCfg->type == META_SUPER_TABLE) { - kvRowFree(pTbCfg->ctbCfg.pTag); + if (pSW != NULL) { + tfree(pSW->pSchema); + tfree(pSW); } + + if (pTbCfg) { + tfree(pTbCfg->name); + if (pTbCfg->type == META_SUPER_TABLE) { + free(pTbCfg->stbCfg.pTagSchema); + } else if (pTbCfg->type == META_SUPER_TABLE) { + kvRowFree(pTbCfg->ctbCfg.pTag); + } + + tfree(pTbCfg); + } + rpcMsg.handle = pMsg->handle; rpcMsg.ahandle = pMsg->ahandle; rpcMsg.pCont = pTbMetaMsg; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f552afcfbf245773931fca13bd2d7621c62c1887..702581edb90d8d7ec6de3f75fe5ed0105634c7a3 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -38,7 +38,7 @@ #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) #define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES) -#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.queryId) +#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.idstr) #define curTimeWindowIndex(_winres) ((_winres)->curIndex) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b3f5370c0faf4158687f9a13f0a32d6825cfb7aa..f36071f46cabce05ea9931b1b663c3991bc1119d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -240,6 +240,7 @@ typedef struct STaskIdInfo { uint64_t subplanId; uint64_t templateId; uint64_t taskId; // this is a subplan id + char *idstr; } STaskIdInfo; typedef struct SExecTaskInfo { @@ -377,8 +378,10 @@ typedef struct SExchangeInfo { SSDataBlock *pResult; int32_t current; uint64_t rowsOfCurrentSource; - uint64_t bytes; // total load bytes from remote - uint64_t totalRows; + + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed;// total elapsed time } SExchangeInfo; typedef struct STableScanInfo { @@ -660,6 +663,6 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle, uint64_t taskId); #endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b7be85dc347c06ea9eb566e61bf7a1acba5b764a..a2eb5dc339c4e7494f30a2e048120c16637216b9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -50,11 +50,11 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo; - int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, GET_TASKID(pTaskInfo)); + int32_t code = doSetStreamBlock(pTaskInfo->pRoot, input, pTaskInfo->id.queryId); if (code != TSDB_CODE_SUCCESS) { - qError("failed to set the stream block data, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); + qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo)); } else { - qDebug("set the stream block successfully, reqId:0x%"PRIx64, GET_TASKID(pTaskInfo)); + qDebug("%s set the stream block successfully", GET_TASKID(pTaskInfo)); } return code; @@ -81,7 +81,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(streamReadHandle, 0, plan, &pTaskInfo, NULL); + code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 9d69b24fce78f55805803f502b66199e74ee374f..8635b2d5c812c479ccd08b963c1e24daa9ab08a2 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -69,11 +69,11 @@ void freeParam(STaskParam *param) { tfree(param->prevResult); } -int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qCreateExecTask(void* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -141,16 +141,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); - // todo: remove it. - if (tinfo == NULL) { - return TSDB_CODE_SUCCESS; - } - *pRes = NULL; - int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { - qError("QID:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, + qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; return pTaskInfo->code; @@ -161,7 +155,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { } if (isTaskKilled(pTaskInfo)) { - qDebug("QID:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); + qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; } @@ -170,12 +164,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("QID:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; } - qDebug("QID:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); + qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -184,7 +178,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { st = taosGetTimestampUs(); *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); - pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); + uint64_t el = (taosGetTimestampUs() - st); + pTaskInfo->cost.elapsedTime += el; + publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); if (NULL == *pRes) { @@ -194,59 +190,13 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; pTaskInfo->totalRows += current; - qDebug("QID:0x%" PRIx64 " task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", - GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0); + qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", + GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; } -int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { - SQInfo *pQInfo = (SQInfo *)qinfo; - - if (pQInfo == NULL) { - qError("QInfo invalid qhandle"); - return TSDB_CODE_QRY_INVALID_QHANDLE; - } - - *buildRes = false; - if (IS_QUERY_KILLED(pQInfo)) { - qDebug("QID:0x%"PRIx64" query is killed, code:0x%08x", pQInfo->qId, pQInfo->code); - return pQInfo->code; - } - - int32_t code = TSDB_CODE_SUCCESS; - - if (tsRetrieveBlockingModel) { - pQInfo->rspContext = pRspContext; - tsem_wait(&pQInfo->ready); - *buildRes = true; - code = pQInfo->code; - } else { - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - - pthread_mutex_lock(&pQInfo->lock); - - assert(pQInfo->rspContext == NULL); - if (pQInfo->dataReady == QUERY_RESULT_READY) { - *buildRes = true; - qDebug("QID:0x%"PRIx64" retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo->qId, pQueryAttr->resultRowSize, - GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code)); - } else { - *buildRes = false; - qDebug("QID:0x%"PRIx64" retrieve req set query return result after paused", pQInfo->qId); - pQInfo->rspContext = pRspContext; - assert(pQInfo->rspContext != NULL); - } - - code = pQInfo->code; - pthread_mutex_unlock(&pQInfo->lock); - } - - return code; -} - void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) { SQInfo* pQInfo = (SQInfo*) qinfo; assert(pQInfo != NULL); @@ -261,7 +211,7 @@ int32_t qKillTask(qTaskInfo_t qinfo) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("QID:0x%"PRIx64" execTask killed", pTaskInfo->id.queryId); + qDebug("%s execTask killed", GET_TASKID(pTaskInfo)); setTaskKilled(pTaskInfo); // Wait for the query executing thread being stopped/ @@ -280,7 +230,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("QID:0x%"PRIx64" query async killed", pTaskInfo->id.queryId); + qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); setTaskKilled(pTaskInfo); return TSDB_CODE_SUCCESS; @@ -298,142 +248,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) { void qDestroyTask(qTaskInfo_t qTaskHandle) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle; - qDebug("QID:0x%"PRIx64" execTask completed, numOfRows:%"PRId64, pTaskInfo->id.queryId, pTaskInfo->totalRows); + qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows); queryCostStatis(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } -void* qOpenTaskMgmt(int32_t vgId) { - const int32_t refreshHandleInterval = 30; // every 30 seconds, refresh handle pool - - char cacheName[128] = {0}; - sprintf(cacheName, "qhandle_%d", vgId); - - STaskMgmt* pTaskMgmt = calloc(1, sizeof(STaskMgmt)); - if (pTaskMgmt == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pTaskMgmt->qinfoPool = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshHandleInterval, true, freeqinfoFn, cacheName); - pTaskMgmt->closed = false; - pTaskMgmt->vgId = vgId; - - pthread_mutex_init(&pTaskMgmt->lock, NULL); - - qDebug("vgId:%d, open queryTaskMgmt success", vgId); - return pTaskMgmt; -} - -void qTaskMgmtNotifyClosing(void* pQMgmt) { - if (pQMgmt == NULL) { - return; - } - - STaskMgmt* pQueryMgmt = pQMgmt; - qInfo("vgId:%d, set querymgmt closed, wait for all queries cancelled", pQueryMgmt->vgId); - - pthread_mutex_lock(&pQueryMgmt->lock); - pQueryMgmt->closed = true; - pthread_mutex_unlock(&pQueryMgmt->lock); - - taosCacheRefresh(pQueryMgmt->qinfoPool, taskMgmtKillTaskFn, NULL); -} - -void qQueryMgmtReOpen(void *pQMgmt) { - if (pQMgmt == NULL) { - return; - } - - STaskMgmt *pQueryMgmt = pQMgmt; - qInfo("vgId:%d, set querymgmt reopen", pQueryMgmt->vgId); - - pthread_mutex_lock(&pQueryMgmt->lock); - pQueryMgmt->closed = false; - pthread_mutex_unlock(&pQueryMgmt->lock); -} - -void qCleanupTaskMgmt(void* pQMgmt) { - if (pQMgmt == NULL) { - return; - } - - STaskMgmt* pQueryMgmt = pQMgmt; - int32_t vgId = pQueryMgmt->vgId; - - assert(pQueryMgmt->closed); - - SCacheObj* pqinfoPool = pQueryMgmt->qinfoPool; - pQueryMgmt->qinfoPool = NULL; - - taosCacheCleanup(pqinfoPool); - pthread_mutex_destroy(&pQueryMgmt->lock); - tfree(pQueryMgmt); - - qDebug("vgId:%d, queryMgmt cleanup completed", vgId); -} - -void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) { - if (pMgmt == NULL) { - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - return NULL; - } - - STaskMgmt *pQueryMgmt = pMgmt; - if (pQueryMgmt->qinfoPool == NULL) { - qError("QID:0x%"PRIx64"-%p failed to add qhandle into qMgmt, since qMgmt is closed", qId, (void*)qInfo); - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - return NULL; - } - - pthread_mutex_lock(&pQueryMgmt->lock); - if (pQueryMgmt->closed) { - pthread_mutex_unlock(&pQueryMgmt->lock); - qError("QID:0x%"PRIx64"-%p failed to add qhandle into cache, since qMgmt is colsing", qId, (void*)qInfo); - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - return NULL; - } else { - void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), - (getMaximumIdleDurationSec()*1000)); - pthread_mutex_unlock(&pQueryMgmt->lock); - - return handle; - } -} - -void** qAcquireTask(void* pMgmt, uint64_t _key) { - STaskMgmt *pQueryMgmt = pMgmt; - - if (pQueryMgmt->closed) { - terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; - return NULL; - } - - if (pQueryMgmt->qinfoPool == NULL) { - terrno = TSDB_CODE_QRY_INVALID_QHANDLE; - return NULL; - } - - void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &_key, sizeof(_key)); - if (handle == NULL || *handle == NULL) { - terrno = TSDB_CODE_QRY_INVALID_QHANDLE; - return NULL; - } else { - return handle; - } -} - -void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle) { - STaskMgmt *pQueryMgmt = pMgmt; - if (pQueryMgmt->qinfoPool == NULL) { - return NULL; - } - - taosCacheRelease(pQueryMgmt->qinfoPool, pQInfo, freeHandle); - return 0; -} - #if 0 //kill by qid int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) { @@ -445,7 +265,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo if (pQInfo == NULL || !isValidQInfo(pQInfo)) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId); + qWarn("%s be killed(no memory commit).", pQInfo->qId); setTaskKilled(pQInfo); // wait query stop diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9a2bb75b34a4155cc415f5019bd7054f16cc8df4..16487fe7479e2bcb14cdf8c3254223eaf08b2c77 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2432,10 +2432,6 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) { } bool isTaskKilled(SExecTaskInfo *pTaskInfo) { -// if (IS_QUERY_KILLED(pTaskInfo)) { -// return true; -// } - // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived // abort current query execution. if (pTaskInfo->owner != 0 && ((taosGetTimestampSec() - pTaskInfo->cost.start/1000) > 10*getMaximumIdleDurationSec()) @@ -4441,9 +4437,9 @@ void queryCostStatis(SExecTaskInfo *pTaskInfo) { // // calculateOperatorProfResults(pQInfo); - qDebug("QID:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " + qDebug("%s :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, - pTaskInfo->id.queryId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, + GET_TASKID(pTaskInfo), pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows); // //qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, @@ -4994,7 +4990,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { pResultRowInfo->curPos = 0; } - qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + qDebug("%s start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); } @@ -5005,7 +5001,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { // STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window); // tsdbResetQueryHandle(pTableScanInfo->pTsdbReadHandle, &cond); - qDebug("QInfo:0x%"PRIx64" start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, + qDebug("%s start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); if (pResultRowInfo->size > 0) { @@ -5150,6 +5146,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pExchangeInfo->current >= totalSources) { + qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources, + pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); return NULL; } @@ -5170,7 +5168,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { epSet.port[0] = pSource->addr.epAddr[0].port; tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); - qDebug("QID:0x%" PRIx64 " build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, + int64_t startTs = taosGetTimestampUs(); + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); @@ -5181,7 +5180,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { // send the fetch remote task result reques pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - qError("QID:0x%" PRIx64 " prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); + qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _error; } @@ -5198,7 +5197,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SRetrieveTableRsp* pRsp = pExchangeInfo->pRsp; if (pRsp->numOfRows == 0) { - qDebug("QID:0x%"PRIx64" vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", + qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows); @@ -5206,6 +5205,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pExchangeInfo->current += 1; if (pExchangeInfo->current >= totalSources) { + int64_t el = taosGetTimestampUs() - startTs; + pExchangeInfo->totalElapsed += el; + + qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources, + pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); return NULL; } else { continue; @@ -5232,21 +5236,24 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pRes->info.numOfCols = pOperator->numOfOutput; pRes->info.rows = pRsp->numOfRows; + int64_t el = taosGetTimestampUs() - startTs; + pExchangeInfo->totalRows += pRsp->numOfRows; - pExchangeInfo->bytes += pRsp->compLen; + pExchangeInfo->totalSize += pRsp->compLen; pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows; + pExchangeInfo->totalElapsed += el; if (pRsp->completed == 1) { - qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, totalSources); pExchangeInfo->rowsOfCurrentSource = 0; pExchangeInfo->current += 1; } else { - qDebug("QID:0x%" PRIx64 " fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes); + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize); } return pExchangeInfo->pResult; @@ -5293,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; - rpcInit.label = "TSC"; + rpcInit.label = "EX"; rpcInit.numOfThreads = 1; rpcInit.cfp = processRspMsg; rpcInit.sessions = tsMaxConnections; @@ -7728,32 +7735,38 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t return TSDB_CODE_SUCCESS; } -static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) { +static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { SExecTaskInfo* pTaskInfo = calloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; + pTaskInfo->id.taskId = taskId; + + char* p = calloc(1, 128); + snprintf(p, 128, "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, queryId); + pTaskInfo->id.idstr = strdup(p); + return pTaskInfo; } -static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId); +static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId); -SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId) { +SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) { if (pPhyNode->pChildren == NULL || taosArrayGetSize(pPhyNode->pChildren) == 0) { if (pPhyNode->info.type == OP_TableScan) { SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); - tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId); + tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId); return createTableScanOperatorInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pTaskInfo); } else if (pPhyNode->info.type == OP_DataBlocksOptScan) { SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets); - tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId); + tsdbReadHandleT tReaderHandle = doCreateDataReadHandle((STableScanPhyNode*) pPhyNode, readerHandle, (uint64_t) queryId, taskId); return createDataBlocksOptScanInfo(tReaderHandle, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (pPhyNode->info.type == OP_Exchange) { @@ -7771,13 +7784,13 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask for (int32_t i = 0; i < size; ++i) { SPhyNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, readerHandle, queryId); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, readerHandle, queryId, taskId); return createAggregateOperatorInfo(op, pPhyNode->pTargets, pTaskInfo); } } } -static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readerHandle, uint64_t queryId) { +static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, STableGroupInfo* pGroupInfo, void* readerHandle, uint64_t queryId, uint64_t taskId) { STsdbQueryCond cond = {.loadExternalRows = false}; cond.order = pTableScanNode->scan.order; @@ -7796,10 +7809,10 @@ static tsdbReadHandleT createDataReadHandle(STableScanPhyNode* pTableScanNode, S cond.colList[i].colId = pSchema->colId; } - return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, NULL); + return tsdbQueryTables(readerHandle, &cond, pGroupInfo, queryId, taskId); } -static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId) { +static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, void* readerHandle, uint64_t queryId, uint64_t taskId) { int32_t code = 0; STableGroupInfo groupInfo = {0}; @@ -7826,28 +7839,28 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode, if (groupInfo.numOfTables == 0) { code = 0; - qDebug("no table qualified for query, reqId:0x%"PRIx64, queryId); + qDebug("no table qualified for query, TID:0x%"PRIx64", QID:0x%"PRIx64, taskId, queryId); goto _error; } - return createDataReadHandle(pTableScanNode, &groupInfo, readerHandle, queryId); + return createDataReadHandle(pTableScanNode, &groupInfo, readerHandle, queryId, taskId); _error: terrno = code; return NULL; } -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle, uint64_t taskId) { uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; - *pTaskInfo = createExecTaskInfo(queryId); + *pTaskInfo = createExecTaskInfo(queryId, taskId); if (*pTaskInfo == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; } - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId); + (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId, taskId); if ((*pTaskInfo)->pRoot == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; @@ -8814,72 +8827,15 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { } void doDestroyTask(SExecTaskInfo *pTaskInfo) { - qDebug("QID:0x%"PRIx64" start to free execTask", pTaskInfo->id.queryId); + qDebug("%s start to free execTask", GET_TASKID(pTaskInfo)); doDestroyTableQueryInfo(&pTaskInfo->tableqinfoGroupInfo); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); - qDebug("QID:0x%"PRIx64" execTask is freed", pTaskInfo->id.queryId); + qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); tfree(pTaskInfo); } -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { - // the remained number of retrieved rows, not the interpolated result - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - - // load data from file to msg buffer - if (pQueryAttr->tsCompQuery) { - SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0); - FILE *f = *(FILE **)pColInfoData->pData; // TODO refactor - - // make sure file exist - if (f) { - off_t s = lseek(fileno(f), 0, SEEK_END); - assert(s == pRuntimeEnv->outputBuf->info.rows); - - //qDebug("QInfo:0x%"PRIx64" ts comp data return, file:%p, size:%"PRId64, pQInfo->qId, f, (uint64_t)s); - if (fseek(f, 0, SEEK_SET) >= 0) { - size_t sz = fread(data, 1, s, f); - if(sz < s) { // todo handle error - //qError("fread(f:%p,%d) failed, rsize:%" PRId64 ", expect size:%" PRId64, f, fileno(f), (uint64_t)sz, (uint64_t)s); - assert(0); - } - } else { - UNUSED(s); - //qError("fseek(f:%p,%d) failed, error:%s", f, fileno(f), strerror(errno)); - assert(0); - } - - // dump error info - if (s <= (sizeof(STSBufFileHeader) + sizeof(STSGroupBlockInfo) + 6 * sizeof(int32_t))) { -// qDump(data, s); - assert(0); - } - - fclose(f); - *(FILE **)pColInfoData->pData = NULL; - } - - // all data returned, set query over - if (Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)) { -// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER); - } - } else { - doCopyQueryResultToMsg(pQInfo, (int32_t)pRuntimeEnv->outputBuf->info.rows, data, compressed, compLen); - } - - //qDebug("QInfo:0x%"PRIx64" current numOfRes rows:%d, total:%" PRId64, pQInfo->qId, -// pRuntimeEnv->outputBuf->info.rows, pRuntimeEnv->resultInfo.total); - - if (pQueryAttr->limit.limit > 0 && pQueryAttr->limit.limit == pRuntimeEnv->resultInfo.total) { - //qDebug("QInfo:0x%"PRIx64" results limitation reached, limitation:%"PRId64, pQInfo->qId, pQueryAttr->limit.limit); -// setTaskStatus(pOperator->pTaskInfo, QUERY_OVER); - } - - return TSDB_CODE_SUCCESS; -} - bool doBuildResCheck(SQInfo* pQInfo) { bool buildRes = false; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5e0b641b285927c7b02644bce2f2ae831ff2dcb8..1814e1724b94c5f1e6397789b9fcee1d77af62bb 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) { SExecTaskInfo* pTaskInfo = nullptr; DataSinkHandle sinkHandle = nullptr; - int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle); + int32_t code = qCreateExecTask((void*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); } #pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 83c7a4208e3da2cbd8b2c58040afd1520bb01cc5..d07e20d5c14762ddb68b8c15fa93963a14043115 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -173,17 +173,17 @@ typedef struct SQWorkerMgmt { #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__) -#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__) -#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) -#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) -#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId) +#define QW_TASK_ELOG_E(param) qError("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) +#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) +#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, qId, tId) -#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) -#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:0x%"PRIx64",TID:0x%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_LOCK_DEBUG(...) do { if (gQWDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index 7735e1a1eebd4e329f0df807d7269e7754e4c9ee..51f55d238f33f3c109ec51e3df6f845f5f5aabac 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -32,7 +32,7 @@ int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t int32_t qwBuildAndSendDropRsp(void *connection, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code); int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len); +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection); int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index d09f749805ae2fa38a2b203dbf8c196fa0c4d03f..c28e30333c15604ad872f71b9f53a13b022fa8ac 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -456,7 +456,7 @@ _return: QW_RET(code); } -int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t code = 0; bool qcontinue = true; SSDataBlock* pRes = NULL; @@ -467,11 +467,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { DataSinkHandle sinkHandle = ctx->sinkHandle; while (true) { - QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++); + QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); code = qExecTask(*taskHandle, &pRes, &useconds); if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x", code); + QW_TASK_ELOG("qExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); } @@ -485,6 +485,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (TASK_TYPE_TEMP == ctx->taskType) { qwFreeTaskHandle(QW_FPARAMS(), taskHandle); } + + if (queryEnd) { + *queryEnd = true; + } break; } @@ -587,12 +591,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void QW_ERR_RET(code); } - queryEnd = pOutput->queryEnd; - pOutput->queryEnd = false; - - if (DS_BUF_EMPTY == pOutput->bufStatus && queryEnd) { - pOutput->queryEnd = true; - + if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); } @@ -974,7 +973,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); + code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); @@ -996,7 +995,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { atomic_store_ptr(&ctx->sinkHandle, sinkHandle); if (pTaskInfo && sinkHandle) { - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); } _return: @@ -1083,6 +1082,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWPhaseOutput output = {0}; void *rsp = NULL; int32_t dataLen = 0; + bool queryEnd = false; do { QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, &output)); @@ -1102,7 +1102,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { DataSinkHandle sinkHandle = ctx->sinkHandle; - QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx)); + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; @@ -1114,13 +1114,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { // RC WARNING atomic_store_8(&ctx->queryContinue, 1); } - - if (sOutput.queryEnd) { - needStop = true; - } if (rsp) { - qwBuildFetchRsp(rsp, &sOutput, dataLen); + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); @@ -1131,6 +1128,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } } + if (queryEnd) { + needStop = true; + } + _return: if (NULL == ctx) { @@ -1196,7 +1197,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (NULL == rsp) { QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { - qwBuildFetchRsp(rsp, &sOutput, dataLen); + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); } if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) { diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index d11bee6dce22454646d5080cb87314c8beb73685..56c882f404a36b6b070e0df8b79c3b88af56890d 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -26,11 +26,11 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) { return TSDB_CODE_SUCCESS; } -void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len) { +void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; rsp->useconds = htobe64(input->useconds); - rsp->completed = input->queryEnd; + rsp->completed = qComplete; rsp->precision = input->precision; rsp->compressed = input->compressed; rsp->compLen = htonl(len); @@ -258,12 +258,12 @@ int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); if (TSDB_CODE_SUCCESS != code) { - QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code); + QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code)); rpcFreeCont(req); QW_ERR_RET(code); } - QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId); + QW_SCH_TASK_DLOG("put task continue exec msg to query queue, vgId:%d", mgmt->nodeId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index 2d39a9cea55090c8e5ad4b4eb3c95f0966c1ebd0..b0927e9ecfbb61da5508fb4e1151621e600897e4 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1360,9 +1360,14 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { tDebug("%s, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), pConn->peerFqdn, pConn->peerPort, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } else { - if (pHead->code == 0) pConn->secured = 1; // for success response, set link as secured - tDebug("%s, %s is sent to 0x%x:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), - pConn->peerIp, pConn->peerPort, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); + if (pHead->code == 0) { + pConn->secured = 1; // for success response, set link as secured + } + + char ipport[40] = {0}; + taosIpPort2String(pConn->peerIp, pConn->peerPort, ipport); + tDebug("%s, %s is sent to %s, code:0x%x len:%d sig:0x%08x:0x%08x:%d", pConn->info, TMSG_INFO(pHead->msgType), + ipport, htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } // tTrace("connection type is: %d", pConn->connType);