提交 7da3ed27 编写于 作者: C Cary Xu

fix: support statistics of insert_req

上级 fe115ab0
...@@ -66,10 +66,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { ...@@ -66,10 +66,10 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
pInfo->vstat.totalVnodes = totalVnodes; pInfo->vstat.totalVnodes = totalVnodes;
pInfo->vstat.masterNum = masterNum; pInfo->vstat.masterNum = masterNum;
pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs; pInfo->vstat.numOfSelectReqs = numOfSelectReqs - pMgmt->state.numOfSelectReqs;
pInfo->vstat.numOfInsertReqs = numOfInsertReqs - pMgmt->state.numOfInsertReqs; pInfo->vstat.numOfInsertReqs = numOfInsertReqs;
pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs - pMgmt->state.numOfInsertSuccessReqs; pInfo->vstat.numOfInsertSuccessReqs = numOfInsertSuccessReqs;
pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs - pMgmt->state.numOfBatchInsertReqs; pInfo->vstat.numOfBatchInsertReqs = numOfBatchInsertReqs;
pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs - pMgmt->state.numOfBatchInsertSuccessReqs; pInfo->vstat.numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
pMgmt->state.totalVnodes = totalVnodes; pMgmt->state.totalVnodes = totalVnodes;
pMgmt->state.masterNum = masterNum; pMgmt->state.masterNum = masterNum;
pMgmt->state.numOfSelectReqs = numOfSelectReqs; pMgmt->state.numOfSelectReqs = numOfSelectReqs;
......
...@@ -227,10 +227,6 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -227,10 +227,6 @@ int vnodeCommit(SVnode *pVnode) {
info.state.committed = pVnode->state.applied; info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm; info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID; info.state.commitID = pVnode->state.commitID;
info.statis.nInsert = pVnode->statis.nInsert;
info.statis.nInsertSuccess = pVnode->statis.nInsertSuccess;
info.statis.nBatchInsert = pVnode->statis.nBatchInsert;
info.statis.nBatchInsertSuccess = pVnode->statis.nBatchInsertSuccess;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path); snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) { if (vnodeSaveInfo(dir, &info) < 0) {
...@@ -357,32 +353,6 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { ...@@ -357,32 +353,6 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
return 0; return 0;
} }
static int vnodeEncodeStatis(const void *pObj, SJson *pJson) {
const SVStatis *pStatis = (SVStatis *)pObj;
if (tjsonAddIntegerToObject(pJson, "insert", pStatis->nInsert) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "insert success", pStatis->nInsertSuccess) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "batch insert", pStatis->nBatchInsert) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "batch insert success", pStatis->nBatchInsertSuccess) < 0) return -1;
return 0;
}
static int vnodeDecodeStatis(const SJson *pJson, void *pObj) {
SVStatis *pStatis = (SVStatis *)pObj;
int32_t code;
tjsonGetNumberValue(pJson, "insert", pStatis->nInsert, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "insert success", pStatis->nInsertSuccess, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "batch insert", pStatis->nBatchInsert, code);
if (code < 0) return -1;
tjsonGetNumberValue(pJson, "batch insert success", pStatis->nBatchInsertSuccess, code);
if (code < 0) return -1;
return 0;
}
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
SJson *pJson; SJson *pJson;
char *pData; char *pData;
...@@ -402,10 +372,6 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { ...@@ -402,10 +372,6 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
goto _err; goto _err;
} }
if (tjsonAddObject(pJson, "statis", vnodeEncodeStatis, (void *)&pInfo->statis) < 0) {
goto _err;
}
pData = tjsonToString(pJson); pData = tjsonToString(pJson);
if (pData == NULL) { if (pData == NULL) {
goto _err; goto _err;
...@@ -437,10 +403,6 @@ static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { ...@@ -437,10 +403,6 @@ static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
goto _err; goto _err;
} }
if (tjsonToObject(pJson, "statis", vnodeDecodeStatis, (void *)&pInfo->statis) < 0) {
goto _err;
}
tjsonDelete(pJson); tjsonDelete(pJson);
return 0; return 0;
......
...@@ -85,10 +85,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -85,10 +85,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->state.applied = info.state.committed; pVnode->state.applied = info.state.committed;
pVnode->state.commitID = info.state.commitID; pVnode->state.commitID = info.state.commitID;
pVnode->state.commitTerm = info.state.commitTerm; pVnode->state.commitTerm = info.state.commitTerm;
pVnode->statis.nInsert = info.statis.nInsert;
pVnode->statis.nInsertSuccess = info.statis.nInsertSuccess;
pVnode->statis.nBatchInsert = info.statis.nBatchInsert;
pVnode->statis.nBatchInsertSuccess = info.statis.nBatchInsertSuccess;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
taosThreadMutexInit(&pVnode->lock, NULL); taosThreadMutexInit(&pVnode->lock, NULL);
......
...@@ -365,22 +365,36 @@ _exit: ...@@ -365,22 +365,36 @@ _exit:
return code; return code;
} }
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, type) \
do { \
if (oVal != atomic_val_compare_exchange_##type(&pVar, oVal, 0)) { \
int##type##_t tmpVal = atomic_sub_fetch_##type(&pVar, oVal); \
ASSERT(tmpVal >= 0); \
} \
} while (0)
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode); pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = syncGetMyRole(pVnode->sync); pLoad->syncState = syncGetMyRole(pVnode->sync);
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
pLoad->totalStorage = (int64_t)3 * 1073741824; pLoad->totalStorage = (int64_t)3 * 1073741824;
pLoad->compStorage = (int64_t)2 * 1073741824; pLoad->compStorage = (int64_t)2 * 1073741824;
pLoad->pointsWritten = 100; pLoad->pointsWritten = 100;
pLoad->numOfSelectReqs = 1; pLoad->numOfSelectReqs = 1;
pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert); pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess); pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert); pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess); pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
return 0;
} VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64);
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64);
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64);
VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64);
return 0;
}
void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) { void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId) {
if (dbname) { if (dbname) {
......
...@@ -951,8 +951,8 @@ _exit: ...@@ -951,8 +951,8 @@ _exit:
// 1/level 2. // 1/level 2.
// TODO: refactor // TODO: refactor
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) { if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1); atomic_fetch_add_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_INPUT__DATA_SUBMIT);
} }
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version); vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册