diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 3859dd688d23e4839d37b8c3644a19df5a6388ae..e913ecb6ab7d00f2fc0d12c636ab025cb809a27b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -467,6 +467,7 @@ typedef struct { int8_t replica; int16_t numOfColumns; int32_t numOfRows; + int32_t curIterPackedRows; void* pIter; SMnode* pMnode; STableMetaRsp* pMeta; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 01dd223b5f3027808c05f41000d1dfcd6a7b6f3b..0bfab227c46e0354cadbd3fb54d6d7f21cdf0902 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -764,104 +764,129 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl return numOfRows; } -static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - int32_t cols = 0; - SConnObj *pConn = NULL; - - if (pShow->pIter == NULL) { - SProfileMgmt *pMgmt = &pMnode->profileMgmt; - pShow->pIter = taosCacheCreateIter(pMgmt->connCache); +/** + * @param pConn the conn queries pack from + * @param[out] pBlock the block data packed into + * @param offset skip [offset] queries in pConn + * @param rowsToPack at most rows to pack + * @return rows packed +*/ +static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBlock* pBlock, uint32_t offset, uint32_t rowsToPack) { + int32_t cols = 0; + taosRLockLatch(&pConn->queryLock); + int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); + if (NULL == pConn->pQueries || numOfQueries <= offset) { + taosRUnLockLatch(&pConn->queryLock); + return 0; } - while (numOfRows < rows) { - pConn = mndGetNextConn(pMnode, pShow->pIter); - if (pConn == NULL) { - pShow->pIter = NULL; - break; - } + int32_t i = offset; + for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) { + int32_t curRowIndex = pBlock->info.rows; + SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); + cols = 0; - taosRLockLatch(&pConn->queryLock); - if (NULL == pConn->pQueries || taosArrayGetSize(pConn->pQueries) <= 0) { - taosRUnLockLatch(&pConn->queryLock); - continue; - } + char queryId[26 + VARSTR_HEADER_SIZE] = {0}; + sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid); + varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]); + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false); - int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); - for (int32_t i = 0; i < numOfQueries && numOfRows < rows; ++i) { - SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); - cols = 0; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->queryId, false); - char queryId[26 + VARSTR_HEADER_SIZE] = {0}; - sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid); - varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]); - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)queryId, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->id, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->queryId, false); + char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(app, pConn->app); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)app, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->id, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->pid, false); - char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE]; - STR_TO_VARSTR(app, pConn->app); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)app, false); + char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(user, pConn->user); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)user, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->pid, false); + char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; + sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port); + varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stime, false); - char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(user, pConn->user); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)user, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->useconds, false); - char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0}; - sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port); - varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stableQuery, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stime, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false); + + char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; + int32_t strSize = sizeof(subStatus); + int32_t offset = VARSTR_HEADER_SIZE; + for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { + if (i) { + offset += snprintf(subStatus + offset, strSize - offset - 1, ","); + } + SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); + offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); + } + varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, subStatus, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->useconds, false); + char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(sql, pQuery->sql); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, curRowIndex, (const char *)sql, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->stableQuery, false); + pBlock->info.rows++; + } - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false); + taosRUnLockLatch(&pConn->queryLock); + return i - offset; +} - char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; - int32_t strSize = sizeof(subStatus); - int32_t offset = VARSTR_HEADER_SIZE; - for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { - if (i) { - offset += snprintf(subStatus + offset, strSize - offset - 1, ","); - } - SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); - offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); - } - varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, subStatus, false); +static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { + SMnode * pMnode = pReq->info.node; + SSdb * pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SConnObj *pConn = NULL; - char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(sql, pQuery->sql); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)sql, false); + if (pShow->pIter == NULL) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + pShow->pIter = taosCacheCreateIter(pMgmt->connCache); + } - numOfRows++; + // means fetched some data last time for this conn + if (pShow->curIterPackedRows > 0) { + size_t len = 0; + pConn = taosCacheIterGetData(pShow->pIter, &len); + if (pConn && (taosArrayGetSize(pConn->pQueries) > pShow->curIterPackedRows)) { + numOfRows = packQueriesIntoBlock(pShow, pConn, pBlock, pShow->curIterPackedRows, rows); + pShow->curIterPackedRows += numOfRows; } - - taosRUnLockLatch(&pConn->queryLock); } + while (numOfRows < rows) { + pConn = mndGetNextConn(pMnode, pShow->pIter); + if (pConn == NULL) { + pShow->pIter = NULL; + break; + } + + int32_t packedRows = packQueriesIntoBlock(pShow, pConn, pBlock, 0, rows - numOfRows); + pShow->curIterPackedRows = packedRows; + numOfRows += packedRows; + } pShow->numOfRows += numOfRows; return numOfRows; }