From fe115ab02e4b41a98e6e7b0686d8277b8ee7050d Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 10 Oct 2022 18:20:53 +0800 Subject: [PATCH] fix: support statistics of insert_req --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 9 +++++ source/dnode/vnode/src/inc/vnodeInt.h | 10 ++++++ source/dnode/vnode/src/vnd/vnodeCommit.c | 39 +++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeOpen.c | 4 +++ source/dnode/vnode/src/vnd/vnodeQuery.c | 8 ++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 9 +++-- 6 files changed, 73 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4047bc2340..5451fe33f1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -78,6 +78,15 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { pMgmt->state.numOfBatchInsertReqs = numOfBatchInsertReqs; pMgmt->state.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; + printf("%s:%d: Info: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64 + "\n", + __func__, __LINE__, pInfo->vstat.numOfInsertReqs, pInfo->vstat.numOfInsertSuccessReqs, + pInfo->vstat.numOfBatchInsertReqs, pInfo->vstat.numOfBatchInsertSuccessReqs); + printf("%s:%d: Mgmt: nInsert:%" PRIi64 ", nInsertSuccess:%" PRIi64 ", nBatch:%" PRIi64 ", nBatchSuccess:%" PRIi64 + "\n", + __func__, __LINE__, pMgmt->state.numOfInsertReqs, pMgmt->state.numOfInsertSuccessReqs, + pMgmt->state.numOfBatchInsertReqs, pMgmt->state.numOfBatchInsertSuccessReqs); + tfsGetMonitorInfo(pMgmt->pTfs, &pInfo->tfs); taosArrayDestroy(pVloads); } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2a8a74d297..d4b88abb1d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -56,6 +56,7 @@ typedef struct SSma SSma; typedef struct STsdb STsdb; typedef struct STQ STQ; typedef struct SVState SVState; +typedef struct SVStatis SVStatis; typedef struct SVBufPool SVBufPool; typedef struct SQWorker SQHandle; typedef struct STsdbKeepCfg STsdbKeepCfg; @@ -284,9 +285,17 @@ struct SVState { int64_t commitTerm; }; +struct SVStatis { + int64_t nInsert; + int64_t nInsertSuccess; + int64_t nBatchInsert; + int64_t nBatchInsertSuccess; +}; + struct SVnodeInfo { SVnodeCfg config; SVState state; + SVStatis statis; }; typedef enum { @@ -309,6 +318,7 @@ struct SVnode { char* path; SVnodeCfg config; SVState state; + SVStatis statis; STfs* pTfs; SMsgCb msgCb; TdThreadMutex mutex; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 6dc3ef86a7..ae55d50456 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -227,6 +227,11 @@ int vnodeCommit(SVnode *pVnode) { info.state.committed = pVnode->state.applied; info.state.commitTerm = pVnode->state.applyTerm; info.state.commitID = pVnode->state.commitID; + info.statis.nInsert = pVnode->statis.nInsert; + info.statis.nInsertSuccess = pVnode->statis.nInsertSuccess; + info.statis.nBatchInsert = pVnode->statis.nBatchInsert; + info.statis.nBatchInsertSuccess = pVnode->statis.nBatchInsertSuccess; + snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); if (vnodeSaveInfo(dir, &info) < 0) { ASSERT(0); @@ -352,6 +357,32 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { return 0; } +static int vnodeEncodeStatis(const void *pObj, SJson *pJson) { + const SVStatis *pStatis = (SVStatis *)pObj; + + if (tjsonAddIntegerToObject(pJson, "insert", pStatis->nInsert) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "insert success", pStatis->nInsertSuccess) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "batch insert", pStatis->nBatchInsert) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "batch insert success", pStatis->nBatchInsertSuccess) < 0) return -1; + + return 0; +} + +static int vnodeDecodeStatis(const SJson *pJson, void *pObj) { + SVStatis *pStatis = (SVStatis *)pObj; + + int32_t code; + tjsonGetNumberValue(pJson, "insert", pStatis->nInsert, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "insert success", pStatis->nInsertSuccess, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "batch insert", pStatis->nBatchInsert, code); + if (code < 0) return -1; + tjsonGetNumberValue(pJson, "batch insert success", pStatis->nBatchInsertSuccess, code); + if (code < 0) return -1; + return 0; +} + static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { SJson *pJson; char *pData; @@ -371,6 +402,10 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { goto _err; } + if (tjsonAddObject(pJson, "statis", vnodeEncodeStatis, (void *)&pInfo->statis) < 0) { + goto _err; + } + pData = tjsonToString(pJson); if (pData == NULL) { goto _err; @@ -402,6 +437,10 @@ static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { goto _err; } + if (tjsonToObject(pJson, "statis", vnodeDecodeStatis, (void *)&pInfo->statis) < 0) { + goto _err; + } + tjsonDelete(pJson); return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 616aa39bdf..cd87784732 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -85,6 +85,10 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.applied = info.state.committed; pVnode->state.commitID = info.state.commitID; pVnode->state.commitTerm = info.state.commitTerm; + pVnode->statis.nInsert = info.statis.nInsert; + pVnode->statis.nInsertSuccess = info.statis.nInsertSuccess; + pVnode->statis.nBatchInsert = info.statis.nBatchInsert; + pVnode->statis.nBatchInsertSuccess = info.statis.nBatchInsertSuccess; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0d57c7bb74..ba8fbaf553 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -375,10 +375,10 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->pointsWritten = 100; pLoad->numOfSelectReqs = 1; - pLoad->numOfInsertReqs = 3; - pLoad->numOfInsertSuccessReqs = 2; - pLoad->numOfBatchInsertReqs = 5; - pLoad->numOfBatchInsertSuccessReqs = 4; + pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); + pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); + pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); + pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5c8590c7c9..bff836faf5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -812,10 +812,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock; + SSubmitBlk *pBlock = NULL; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0}; - int32_t nRows; + int32_t nRows = 0; int32_t tsize, ret; SEncoder encoder = {0}; SArray *newTbUids = NULL; @@ -823,6 +823,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq pRsp->code = 0; pSubmitReq->version = version; + atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1); #ifdef TD_DEBUG_PRINT_ROW vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); @@ -942,12 +943,16 @@ _exit: taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); + atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows); + atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows); + // TODO: the partial success scenario and the error case // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level // 1/level 2. // TODO: refactor if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); + atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); } vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); -- GitLab