From c9bcfe5fa6d71b98e68bc105540fdb85f95670ae Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Tue, 28 Jul 2020 06:39:30 +0000 Subject: [PATCH] first draft for sync flow control --- src/inc/tsync.h | 4 ++++ src/sync/inc/syncInt.h | 3 +++ src/sync/src/syncMain.c | 14 ++++++++++++++ src/sync/src/syncRetrieve.c | 16 +++++++++++++++- src/vnode/inc/vnodeInt.h | 1 + src/vnode/src/vnodeMain.c | 9 +++++++++ src/vnode/src/vnodeWrite.c | 2 ++ 7 files changed, 48 insertions(+), 1 deletion(-) diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 65b91d87e4..78958801fe 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -78,6 +78,9 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); // when role is changed, call this to notify app typedef void (*FNotifyRole)(void *ahandle, int8_t role); +// if a number of retrieving data failed, call this to start flow control +typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); + // when data file is synced successfully, notity app typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); @@ -93,6 +96,7 @@ typedef struct { FWriteToCache writeToCache; FConfirmForward confirmForward; FNotifyRole notifyRole; + FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; } SSyncInfo; diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index d6d86064d6..be1e01cb23 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -125,6 +125,8 @@ typedef struct SsyncPeer { uint64_t sversion; // track the peer version in retrieve process int syncFd; int peerFd; // forward FD + int numOfRetrieves; // number of retrieves tried + int fileChanged; // a flag to indicate file is changed during retrieving process void *timer; void *pConn; int notifyFd; @@ -152,6 +154,7 @@ typedef struct SSyncNode { FWriteToCache writeToCache; FConfirmForward confirmForward; FNotifyRole notifyRole; + FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; pthread_mutex_t mutex; } SSyncNode; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 93c4a9402f..e70e27b0ef 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -137,6 +137,7 @@ void *syncStart(const SSyncInfo *pInfo) pNode->writeToCache = pInfo->writeToCache; pNode->notifyRole = pInfo->notifyRole; pNode->confirmForward = pInfo->confirmForward; + pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; pNode->selfIndex = -1; @@ -530,6 +531,15 @@ void syncBroadcastStatus(SSyncNode *pNode) } } +static void syncResetFlowCtrl(SSyncNode *pNode) { + + for (int i = 0; i < pNode->replica; ++i) { + pNode->peerInfo[i]->numOfRetrieves = 0; + } + + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); +} + static void syncChooseMaster(SSyncNode *pNode) { SSyncPeer *pPeer; int onlineNum = 0; @@ -575,6 +585,7 @@ static void syncChooseMaster(SSyncNode *pNode) { if (index == pNode->selfIndex) { sInfo("vgId:%d, start to work as master", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; + syncResetFlowCtrl(pNode); (*pNode->notifyRole)(pNode->ahandle, nodeRole); } else { pPeer = pNode->peerInfo[index]; @@ -706,6 +717,9 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (peerOldRole != newRole || nodeRole != selfOldRole) syncBroadcastStatus(pNode); + + if (nodeRole != TAOS_SYNC_ROLE_MASTER) + syncResetFlowCtrl(pNode); } static void syncRestartPeer(SSyncPeer *pPeer) { diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 74970017fa..be3a3c2ac7 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -83,6 +83,7 @@ static int syncAreFilesModified(SSyncPeer *pPeer) int code = 0; if (len >0) { sDebug("%s, processed file is changed", pPeer->id); + pPeer->fileChanged = 1; code = 1; } @@ -454,9 +455,11 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) void *syncRetrieveData(void *param) { - SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); + pPeer->fileChanged = 0; pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { sError("%s, failed to open socket to sync", pPeer->id); @@ -471,6 +474,17 @@ void *syncRetrieveData(void *param) } } + if (pPeer->fileChanged) { + // if file is changed 3 times continuously, start flow control + pPeer->numOfRetrieves++; + if (pPeer->numOfRetrieves >= 3) + (*pNode->notifyFlowCtrl)(pNode->ahandle, pPeer->numOfRetrieves - 2); + } else { + pPeer->numOfRetrieves = 0; + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + } + + pPeer->fileChanged = 0; tclose(pPeer->notifyFd); tclose(pPeer->syncFd); syncDecPeerRef(pPeer); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 74cfbf1e73..38f7c8e605 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,6 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count + int32_t delay; int8_t status; int8_t role; int8_t accessState; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 77d4503d9d..dd88fdf8a3 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,6 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); +static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); int32_t vnodeInitResources() { @@ -277,6 +278,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; + syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; pVnode->sync = syncStart(&syncInfo); @@ -549,6 +551,13 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } +static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { + SVnodeObj *pVnode = ahandle; + if (pVnode->delay != mseconds) + vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); + pVnode->delay = mseconds; +} + static int vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 6b9b8ca4fd..900ff1fbba 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -78,6 +78,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { // assign version pVnode->version++; pHead->version = pVnode->version; + if (pVnode->delay) usleep(pVnode->delay*1000); + } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0; -- GitLab