From d1cd620696bbfe7f88efe04caf060d1688301d7b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 25 Nov 2020 16:29:02 +0800 Subject: [PATCH] TD-1455 TD-2196 --- src/dnode/src/dnodeVWrite.c | 4 ++-- src/inc/tsync.h | 6 +++--- src/inc/vnode.h | 6 ++++-- src/sync/src/syncRetrieve.c | 5 ++--- src/vnode/inc/vnodeInt.h | 2 +- src/vnode/src/vnodeMain.c | 10 ++++----- src/vnode/src/vnodeWrite.c | 42 ++++++++++++++++++++++++++++++++----- 7 files changed, 53 insertions(+), 22 deletions(-) diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 959eb3c9c5..6d4b50ee54 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -176,7 +176,7 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *wparam, int32_t code) { if (count <= 1) return; SRpcMsg rpcRsp = { - .handle = pWrite->rpcHandle, + .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rspRet.rsp, .contLen = pWrite->rspRet.len, .code = pWrite->code, @@ -206,7 +206,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite, - pWrite->rpcAhandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); + pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version); pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, &pWrite->rspRet); if (pWrite->code <= 0) pWrite->processedCount = 1; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 77a3b36e73..398e1bf97c 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -23,7 +23,7 @@ extern "C" { #define TAOS_SYNC_MAX_REPLICA 5 #define TAOS_SYNC_MAX_INDEX 0x7FFFFFFF -typedef enum _TAOS_SYNC_ROLE { +typedef enum { TAOS_SYNC_ROLE_OFFLINE = 0, TAOS_SYNC_ROLE_UNSYNCED = 1, TAOS_SYNC_ROLE_SYNCING = 2, @@ -31,7 +31,7 @@ typedef enum _TAOS_SYNC_ROLE { TAOS_SYNC_ROLE_MASTER = 4 } ESyncRole; -typedef enum _TAOS_SYNC_STATUS { +typedef enum { TAOS_SYNC_STATUS_INIT = 0, TAOS_SYNC_STATUS_START = 1, TAOS_SYNC_STATUS_FILE = 2, @@ -80,7 +80,7 @@ typedef void (*FConfirmForward)(int32_t vgId, void *mhandle, int32_t code); typedef void (*FNotifyRole)(int32_t vgId, int8_t role); // if a number of retrieving data failed, call this to start flow control -typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t mseconds); +typedef void (*FNotifyFlowCtrl)(int32_t vgId, int32_t level); // when data file is synced successfully, notity app typedef int32_t (*FNotifyFileSynced)(int32_t vgId, uint64_t fversion); diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 4e8389498b..5f643295d6 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -20,6 +20,7 @@ extern "C" { #endif +#include "trpc.h" #include "twal.h" typedef enum _VN_STATUS { @@ -51,8 +52,9 @@ typedef struct { typedef struct { int32_t code; int32_t processedCount; - void * rpcHandle; - void * rpcAhandle; + int32_t qtype; + void * pVnode; + SRpcMsg rpcMsg; SRspRet rspRet; char reserveForSync[16]; SWalHead pHead[]; diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 348f91820b..060badba9d 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -504,6 +504,8 @@ void *syncRetrieveData(void *param) { SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); + if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, pPeer->numOfRetrieves); + pPeer->fileChanged = 0; pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { @@ -520,10 +522,7 @@ void *syncRetrieveData(void *param) { } if (pPeer->fileChanged) { - // if file is changed 3 times continuously, start flow control pPeer->numOfRetrieves++; - if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) - (*pNode->notifyFlowCtrl)(pNode->vgId, 4 << (pPeer->numOfRetrieves - 2)); } else { pPeer->numOfRetrieves = 0; if (pNode->notifyFlowCtrl) (*pNode->notifyFlowCtrl)(pNode->vgId, 0); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 021831a644..cf9a1baa80 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -39,7 +39,7 @@ typedef struct { int32_t refCount; // reference count int32_t queuedWMsg; int32_t queuedRMsg; - int32_t delayMs; + int32_t flowctlLevel; int8_t status; int8_t role; int8_t accessState; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index b94fea52bd..1ee968e7cb 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -34,7 +34,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno); static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion); static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); static void vnodeNotifyRole(int32_t vgId, int8_t role); -static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds); +static void vnodeCtrlFlow(int32_t vgId, int32_t level); static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion); static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); @@ -677,17 +677,15 @@ static void vnodeNotifyRole(int32_t vgId, int8_t role) { vnodeRelease(pVnode); } -static void vnodeCtrlFlow(int32_t vgId, int32_t mseconds) { +static void vnodeCtrlFlow(int32_t vgId, int32_t level) { SVnodeObj *pVnode = vnodeAcquire(vgId); if (pVnode == NULL) { vError("vgId:%d, vnode not found while ctrl flow", vgId); return; } - if (pVnode->delayMs != mseconds) { - pVnode->delayMs = mseconds; - vDebug("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); - } + pVnode->flowctlLevel = level; + vDebug("vgId:%d, set flowctl level:%d", pVnode->vgId, level); vnodeRelease(pVnode); } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a0227d84ba..cada188591 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -22,14 +22,17 @@ #include "tsdb.h" #include "twal.h" #include "tsync.h" +#include "ttimer.h" #include "tdataformat.h" #include "vnode.h" #include "vnodeInt.h" #include "syncInt.h" #include "tcq.h" +#include "dnode.h" #define MAX_QUEUED_MSG_NUM 10000 +extern void * tsDnodeTmr; static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *); static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); @@ -47,6 +50,32 @@ void vnodeInitWriteFp(void) { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; } +static void vnodeFlowCtlMsgToWQueue(void *param, void *tmrId) { + SVWriteMsg *pWrite = param; + SVnodeObj * pVnode = pWrite->pVnode; + + int32_t code = vnodeWriteToWQueue(pVnode, pWrite->pHead, pWrite->qtype, &pWrite->rpcMsg); + if (code != 0 && pWrite->qtype == TAOS_QTYPE_RPC) { + vDebug("vgId:%d, failed to reprocess msg after perform flowctl since %s", pVnode->vgId, tstrerror(code)); + dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + } + + tfree(pWrite); + vnodeRelease(pWrite->pVnode); +} + +static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) { + SVnodeObj *pVnode = pWrite->pVnode; + if (pVnode->flowctlLevel <= 0) return 0; + + int32_t ms = pVnode->flowctlLevel * 5; + void * unUsed = NULL; + taosTmrReset(vnodeFlowCtlMsgToWQueue, ms, pWrite, tsDnodeTmr, &unUsed); + + vDebug("vgId:%d, perform flowctl for %d ms", pVnode->vgId, ms); + return TSDB_CODE_RPC_ACTION_IN_PROGRESS; +} + int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t code = 0; SVnodeObj *pVnode = vparam; @@ -77,8 +106,6 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // assign version pHead->version = pVnode->version + 1; - if (pVnode->delayMs) taosMsleep(pVnode->delayMs); - } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0; @@ -218,9 +245,10 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam; + int32_t code = 0; if (qtype == TAOS_QTYPE_RPC) { - int32_t code = vnodeCheckWrite(pVnode); + code = vnodeCheckWrite(pVnode); if (code != TSDB_CODE_SUCCESS) return code; } @@ -237,14 +265,18 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar if (rparam != NULL) { SRpcMsg *pRpcMsg = rparam; - pWrite->rpcHandle = pRpcMsg->handle; - pWrite->rpcAhandle = pRpcMsg->ahandle; + pWrite->rpcMsg = *pRpcMsg; } memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len); + pWrite->pVnode = pVnode; + pWrite->qtype = qtype; atomic_add_fetch_32(&pVnode->refCount, 1); + code = vnodePerformFlowCtrl(pWrite); + if (code != 0) return code; + int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1); if (queued > MAX_QUEUED_MSG_NUM) { vDebug("vgId:%d, too many msg:%d in vwqueue, flow control", pVnode->vgId, queued); -- GitLab