未验证 提交 58b0ca6e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17273 from taosdata/fix/TD-19223-D

fix: support statistics of insert_req
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vmInt.h" #include "vmInt.h"
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad)); pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return; if (pInfo->pVloads == NULL) return;
...@@ -30,6 +30,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { ...@@ -30,6 +30,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
SVnodeLoad vload = {0}; SVnodeLoad vload = {0};
vnodeGetLoad(pVnode->pImpl, &vload); vnodeGetLoad(pVnode->pImpl, &vload);
if (isReset) vnodeResetLoad(pVnode->pImpl, &vload);
taosArrayPush(pInfo->pVloads, &vload); taosArrayPush(pInfo->pVloads, &vload);
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} }
...@@ -39,7 +40,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { ...@@ -39,7 +40,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
SMonVloadInfo vloads = {0}; SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads); vmGetVnodeLoads(pMgmt, &vloads, true);
SArray *pVloads = vloads.pVloads; SArray *pVloads = vloads.pVloads;
if (pVloads == NULL) return; if (pVloads == NULL) return;
...@@ -66,10 +67,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { ...@@ -66,10 +67,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
pInfo->vstat.totalVnodes = totalVnodes; pInfo->vstat.totalVnodes = totalVnodes;
pInfo->vstat.masterNum = masterNum; pInfo->vstat.masterNum = masterNum;
pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs; pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs;
pInfo->vstat.numOfInsertReqs = numOfInsertReqs - pMgmt->state.numOfInsertReqs; pInfo->vstat.numOfInsertReqs = numOfInsertReqs; // delta
pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs; // delta
pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs; // delta
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; // delta
pMgmt->state.totalVnodes = totalVnodes; pMgmt->state.totalVnodes = totalVnodes;
pMgmt->state.masterNum = masterNum; pMgmt->state.masterNum = masterNum;
pMgmt->state.numOfSelectReqs = numOfSelectReqs; pMgmt->state.numOfSelectReqs = numOfSelectReqs;
...@@ -109,7 +110,7 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -109,7 +110,7 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SMonVloadInfo vloads = {0}; SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads); vmGetVnodeLoads(pMgmt, &vloads, false);
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
if (rspLen < 0) { if (rspLen < 0) {
......
...@@ -35,7 +35,7 @@ void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo); ...@@ -35,7 +35,7 @@ void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo);
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo); void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo); void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo); void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo, bool isReset);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo); void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo); void qmGetQnodeLoads(void *pMgmt, SQnodeLoad *pInfo);
......
...@@ -152,7 +152,7 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) { ...@@ -152,7 +152,7 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
if (tsMultiProcess) { if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo); dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
} else if (pWrapper->pMgmt != NULL) { } else if (pWrapper->pMgmt != NULL) {
vmGetVnodeLoads(pWrapper->pMgmt, pInfo); vmGetVnodeLoads(pWrapper->pMgmt, pInfo, false);
} }
dmReleaseWrapper(pWrapper); dmReleaseWrapper(pWrapper);
} }
......
...@@ -73,6 +73,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num); ...@@ -73,6 +73,7 @@ int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num);
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num);
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num); int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num);
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
......
...@@ -56,6 +56,7 @@ typedef struct SSma SSma; ...@@ -56,6 +56,7 @@ typedef struct SSma SSma;
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
typedef struct STQ STQ; typedef struct STQ STQ;
typedef struct SVState SVState; typedef struct SVState SVState;
typedef struct SVStatis SVStatis;
typedef struct SVBufPool SVBufPool; typedef struct SVBufPool SVBufPool;
typedef struct SQWorker SQHandle; typedef struct SQWorker SQHandle;
typedef struct STsdbKeepCfg STsdbKeepCfg; typedef struct STsdbKeepCfg STsdbKeepCfg;
...@@ -284,9 +285,17 @@ struct SVState { ...@@ -284,9 +285,17 @@ struct SVState {
int64_t commitTerm; int64_t commitTerm;
}; };
struct SVStatis {
int64_t nInsert; // delta
int64_t nInsertSuccess; // delta
int64_t nBatchInsert; // delta
int64_t nBatchInsertSuccess; // delta
};
struct SVnodeInfo { struct SVnodeInfo {
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
SVStatis statis;
}; };
typedef enum { typedef enum {
...@@ -309,6 +318,7 @@ struct SVnode { ...@@ -309,6 +318,7 @@ struct SVnode {
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
SVStatis statis;
STfs* pTfs; STfs* pTfs;
SMsgCb msgCb; SMsgCb msgCb;
TdThreadMutex mutex; TdThreadMutex mutex;
......
...@@ -15,6 +15,12 @@ ...@@ -15,6 +15,12 @@
#include "vnd.h" #include "vnd.h"
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType) \
do { \
int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \
ASSERT(newVal >= 0); \
} while (0)
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
} }
...@@ -375,13 +381,28 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { ...@@ -375,13 +381,28 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->compStorage = (int64_t)2 * 1073741824;
pLoad->pointsWritten = 100; pLoad->pointsWritten = 100;
pLoad->numOfSelectReqs = 1; pLoad->numOfSelectReqs = 1;
pLoad->numOfInsertReqs = 3; pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
pLoad->numOfInsertSuccessReqs = 2; pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
pLoad->numOfBatchInsertReqs = 5; pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
pLoad->numOfBatchInsertSuccessReqs = 4; pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
return 0; return 0;
} }
/**
* @brief Reset the statistics value by monitor interval
*
* @param pVnode
* @param pLoad
*/
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64);
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64);
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64);
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64);
}
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
if (dbname) { if (dbname) {
*dbname = pVnode->config.dbname; *dbname = pVnode->config.dbname;
......
...@@ -812,10 +812,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -812,10 +812,10 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0}; SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0}; SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock; SSubmitBlk *pBlock = NULL;
SVCreateTbReq createTbReq = {0}; SVCreateTbReq createTbReq = {0};
SDecoder decoder = {0}; SDecoder decoder = {0};
int32_t nRows; int32_t nRows = 0;
int32_t tsize, ret; int32_t tsize, ret;
SEncoder encoder = {0}; SEncoder encoder = {0};
SArray *newTbUids = NULL; SArray *newTbUids = NULL;
...@@ -823,6 +823,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -823,6 +823,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
pRsp->code = 0; pRsp->code = 0;
pSubmitReq->version = version; pSubmitReq->version = version;
atomic_fetch_add_64(&pVnode->statis.nBatchInsert, 1);
#ifdef TD_DEBUG_PRINT_ROW #ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__); vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
...@@ -942,11 +943,15 @@ _exit: ...@@ -942,11 +943,15 @@ _exit:
taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp); taosArrayDestroyEx(submitRsp.pArray, tFreeSSubmitBlkRsp);
atomic_fetch_add_64(&pVnode->statis.nInsert, submitRsp.numOfRows);
atomic_fetch_add_64(&pVnode->statis.nInsertSuccess, submitRsp.affectedRows);
// TODO: the partial success scenario and the error case // TODO: the partial success scenario and the error case
// => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level // => If partial success, extract the success submitted rows and reconstruct a new submit msg, and push to level
// 1/level 2. // 1/level 2.
// TODO: refactor // TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT); tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册