diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 4087f638a95b063fcf6081db2556758643ef1c9a..efe3d7678a39f140b413192adfe9654b3661251f 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole; extern int32_t tsBalanceInterval; extern int32_t tsOfflineThreshold; extern int32_t tsMnodeEqualVnodeNum; +extern int32_t tsFlowCtrl; // restful extern int32_t tsEnableHttpModule; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 4495c3d9288a5df0b8944aa444625c143b5c7670..f8bb965d282f994c35d32fabeb4c3ddba2fb4c27 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0; int32_t tsBalanceInterval = 300; // seconds int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsMnodeEqualVnodeNum = 4; +int32_t tsFlowCtrl = 1; // restful int32_t tsEnableHttpModule = 1; @@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) { cfg.maxValue = 1000; cfg.ptrLength = 0; cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // module configs + cfg.option = "flowctrl"; + cfg.ptr = &tsFlowCtrl; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; + cfg.minValue = 0; + cfg.maxValue = 1; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); cfg.option = "http"; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 1ee968e7cb243eef96102ed6de1d55da0aec40c0..df4376d5d06830204cb3e310cd662b477c1fe210 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -660,7 +660,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { static void vnodeNotifyRole(int32_t vgId, int8_t role) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while notify role", vgId); + vTrace("vgId:%d, vnode not found while notify role", vgId); return; } @@ -680,12 +680,12 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { static void vnodeCtrlFlow(int32_t vgId, int32_t level) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { - vError("vgId:%d, vnode not found while ctrl flow", vgId); + vTrace("vgId:%d, vnode not found while flow ctrl", vgId); return; } pVnode->flowctlLevel = level; - vDebug("vgId:%d, set flowctl level:%d", pVnode->vgId, level); + vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index cada1885917f0981b347ff0e068bd3dd52b00e34..4bf7b30ebdcbdf882af30843bd6b9b86b44c9475 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -17,6 +17,7 @@ #include "os.h" #include "taosmsg.h" #include "taoserror.h" +#include "tglobal.h" #include "tqueue.h" #include "trpc.h" #include "tsdb.h" @@ -40,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -50,32 +52,6 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } -static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { - SVWriteMsg *pWrite = param; - SVnodeObj * pVnode = pWrite->pVnode; - - int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, &pWrite->rpcMsg); - if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { - vDebug("vgId:%d, failed to reprocess msg after perform flowctl since %s", pVnode->vgId, tstrerror(code)); - dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); - } - - tfree(pWrite); - vnodeRelease(pWrite->pVnode); -} - -static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { - SVnodeObj *pVnode = pWrite->pVnode; - if (pVnode->flowctlLevel <= 0) return 0; - - int32_t ms = pVnode->flowctlLevel * 5; - void * unUsed = NULL; - taosTmrReset(vnodeFlowCtlMsgToWQueue, ms, pWrite, tsDnodeTmr, &unUsed); - - vDebug("vgId:%d, perform flowctl for %d ms", pVnode->vgId, ms); - return TSDB_CODE_RPC_ACTION_IN_PROGRESS; -} - int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t code = 0; SVnodeObj *pVnode = vparam; @@ -298,3 +274,34 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { taosFreeQitem(pWrite); vnodeRelease(pVnode); } + +static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { + SVWriteMsg *pWrite = param; + SVnodeObj * pVnode = pWrite->pVnode; + + int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, pWrite->rpcMsg.handle == NULL ? NULL : &pWrite->rpcMsg); + if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { + vDebug("vgId:%d, failed to reprocess msg after perform flowctrl since %s", pVnode->vgId, tstrerror(code)); + dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } + + tfree(pWrite); + vnodeRelease(pWrite->pVnode); +} + +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { + SVnodeObj *pVnode = pWrite->pVnode; + if (pVnode->flowctlLevel <= 0) return 0; + + if (tsFlowCtrl == 0) { + int32_t ms = pVnode->flowctlLevel * 2; + if (ms > 60000) ms = 60000; + vDebug("vgId:%d, perform flowctrl for %d ms", pVnode->vgId, ms); + taosMsleep(ms); + return 0; + } else { + void *unUsed = NULL; + taosTmrReset(vnodeFlowCtlMsgToWQueue, 5, pWrite, tsDnodeTmr, &unUsed); + return TSDB_CODE_RPC_ACTION_IN_PROGRESS; + } +}