提交 ffbbb0cc 编写于 作者: S Shengliang Guan

TD-1455 TD-2196

上级 d1cd6206
...@@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole; ...@@ -97,6 +97,7 @@ extern int32_t tsAlternativeRole;
extern int32_t tsBalanceInterval; extern int32_t tsBalanceInterval;
extern int32_t tsOfflineThreshold; extern int32_t tsOfflineThreshold;
extern int32_t tsMnodeEqualVnodeNum; extern int32_t tsMnodeEqualVnodeNum;
extern int32_t tsFlowCtrl;
// restful // restful
extern int32_t tsEnableHttpModule; extern int32_t tsEnableHttpModule;
......
...@@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0; ...@@ -133,6 +133,7 @@ int32_t tsAlternativeRole = 0;
int32_t tsBalanceInterval = 300; // seconds int32_t tsBalanceInterval = 300; // seconds
int32_t tsOfflineThreshold = 86400*100; // seconds 10days int32_t tsOfflineThreshold = 86400*100; // seconds 10days
int32_t tsMnodeEqualVnodeNum = 4; int32_t tsMnodeEqualVnodeNum = 4;
int32_t tsFlowCtrl = 1;
// restful // restful
int32_t tsEnableHttpModule = 1; int32_t tsEnableHttpModule = 1;
...@@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) { ...@@ -971,6 +972,17 @@ static void doInitGlobalConfig(void) {
cfg.maxValue = 1000; cfg.maxValue = 1000;
cfg.ptrLength = 0; cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE; cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
// module configs
cfg.option = "flowctrl";
cfg.ptr = &tsFlowCtrl;
cfg.valType = TAOS_CFG_VTYPE_INT32;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = 0;
cfg.maxValue = 1;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg); taosInitConfigOption(cfg);
cfg.option = "http"; cfg.option = "http";
......
...@@ -660,7 +660,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) { ...@@ -660,7 +660,7 @@ static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
static void vnodeNotifyRole(int32_t vgId, int8_t role) { static void vnodeNotifyRole(int32_t vgId, int8_t role) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while notify role", vgId); vTrace("vgId:%d, vnode not found while notify role", vgId);
return; return;
} }
...@@ -680,12 +680,12 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { ...@@ -680,12 +680,12 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) {
static void vnodeCtrlFlow(int32_t vgId, int32_t level) { static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
vError("vgId:%d, vnode not found while ctrl flow", vgId); vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
return; return;
} }
pVnode->flowctlLevel = level; pVnode->flowctlLevel = level;
vDebug("vgId:%d, set flowctl level:%d", pVnode->vgId, level); vDebug("vgId:%d, set flowctrl level:%d", pVnode->vgId, level);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
...@@ -40,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -40,6 +41,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
void vnodeInitWriteFp(void) { void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
...@@ -50,32 +52,6 @@ void vnodeInitWriteFp(void) { ...@@ -50,32 +52,6 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
} }
static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, &pWrite->rpcMsg);
if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) {
vDebug("vgId:%d, failed to reprocess msg after perform flowctl since %s", pVnode->vgId, tstrerror(code));
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
}
tfree(pWrite);
vnodeRelease(pWrite->pVnode);
}
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pVnode->flowctlLevel <= 0) return 0;
int32_t ms = pVnode->flowctlLevel * 5;
void * unUsed = NULL;
taosTmrReset(vnodeFlowCtlMsgToWQueue, ms, pWrite, tsDnodeTmr, &unUsed);
vDebug("vgId:%d, perform flowctl for %d ms", pVnode->vgId, ms);
return TSDB_CODE_RPC_ACTION_IN_PROGRESS;
}
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
...@@ -298,3 +274,34 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) { ...@@ -298,3 +274,34 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
taosFreeQitem(pWrite); taosFreeQitem(pWrite);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) {
SVWriteMsg *pWrite = param;
SVnodeObj * pVnode = pWrite->pVnode;
int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, pWrite->rpcMsg.handle == NULL ? NULL : &pWrite->rpcMsg);
if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) {
vDebug("vgId:%d, failed to reprocess msg after perform flowctrl since %s", pVnode->vgId, tstrerror(code));
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
}
tfree(pWrite);
vnodeRelease(pWrite->pVnode);
}
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
SVnodeObj *pVnode = pWrite->pVnode;
if (pVnode->flowctlLevel <= 0) return 0;
if (tsFlowCtrl == 0) {
int32_t ms = pVnode->flowctlLevel * 2;
if (ms > 60000) ms = 60000;
vDebug("vgId:%d, perform flowctrl for %d ms", pVnode->vgId, ms);
taosMsleep(ms);
return 0;
} else {
void *unUsed = NULL;
taosTmrReset(vnodeFlowCtlMsgToWQueue, 5, pWrite, tsDnodeTmr, &unUsed);
return TSDB_CODE_RPC_ACTION_IN_PROGRESS;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册