diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 65b91d87e479abd4c0b497cf4b95bc561db8fae5..78958801fe1363ef5b2324c6080e07aaf99db054 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -78,6 +78,9 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); // when role is changed, call this to notify app typedef void (*FNotifyRole)(void *ahandle, int8_t role); +// if a number of retrieving data failed, call this to start flow control +typedef void (*FNotifyFlowCtrl)(void *ahandle, int32_t mseconds); + // when data file is synced successfully, notity app typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); @@ -93,6 +96,7 @@ typedef struct { FWriteToCache writeToCache; FConfirmForward confirmForward; FNotifyRole notifyRole; + FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; } SSyncInfo; diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index d6d86064d6574e7317116ef25e25a1f1e524ad6a..be1e01cb2371b82acf4cf63e896a6fed68399222 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -125,6 +125,8 @@ typedef struct SsyncPeer { uint64_t sversion; // track the peer version in retrieve process int syncFd; int peerFd; // forward FD + int numOfRetrieves; // number of retrieves tried + int fileChanged; // a flag to indicate file is changed during retrieving process void *timer; void *pConn; int notifyFd; @@ -152,6 +154,7 @@ typedef struct SSyncNode { FWriteToCache writeToCache; FConfirmForward confirmForward; FNotifyRole notifyRole; + FNotifyFlowCtrl notifyFlowCtrl; FNotifyFileSynced notifyFileSynced; pthread_mutex_t mutex; } SSyncNode; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 93c4a9402f794544ac2c03683a56046cc79194f1..04e402769b513a269ac3c5a23d64e92318932bc4 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -137,6 +137,7 @@ void *syncStart(const SSyncInfo *pInfo) pNode->writeToCache = pInfo->writeToCache; pNode->notifyRole = pInfo->notifyRole; pNode->confirmForward = pInfo->confirmForward; + pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; pNode->selfIndex = -1; @@ -530,6 +531,16 @@ void syncBroadcastStatus(SSyncNode *pNode) } } +static void syncResetFlowCtrl(SSyncNode *pNode) { + + for (int i = 0; i < pNode->replica; ++i) { + pNode->peerInfo[i]->numOfRetrieves = 0; + } + + if (pNode->notifyFlowCtrl) + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); +} + static void syncChooseMaster(SSyncNode *pNode) { SSyncPeer *pPeer; int onlineNum = 0; @@ -575,6 +586,7 @@ static void syncChooseMaster(SSyncNode *pNode) { if (index == pNode->selfIndex) { sInfo("vgId:%d, start to work as master", pNode->vgId); nodeRole = TAOS_SYNC_ROLE_MASTER; + syncResetFlowCtrl(pNode); (*pNode->notifyRole)(pNode->ahandle, nodeRole); } else { pPeer = pNode->peerInfo[index]; @@ -706,6 +718,9 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne if (peerOldRole != newRole || nodeRole != selfOldRole) syncBroadcastStatus(pNode); + + if (nodeRole != TAOS_SYNC_ROLE_MASTER) + syncResetFlowCtrl(pNode); } static void syncRestartPeer(SSyncPeer *pPeer) { diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 74970017fae920b3fbc8167f701feb17934e9b05..8aa7d101e7c3e1ba1fda6d26817d07871dbb6d03 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -83,6 +83,7 @@ static int syncAreFilesModified(SSyncPeer *pPeer) int code = 0; if (len >0) { sDebug("%s, processed file is changed", pPeer->id); + pPeer->fileChanged = 1; code = 1; } @@ -454,9 +455,11 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) void *syncRetrieveData(void *param) { - SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncPeer *pPeer = (SSyncPeer *)param; + SSyncNode *pNode = pPeer->pSyncNode; taosBlockSIGPIPE(); + pPeer->fileChanged = 0; pPeer->syncFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (pPeer->syncFd < 0) { sError("%s, failed to open socket to sync", pPeer->id); @@ -471,6 +474,18 @@ void *syncRetrieveData(void *param) } } + if (pPeer->fileChanged) { + // if file is changed 3 times continuously, start flow control + pPeer->numOfRetrieves++; + if (pPeer->numOfRetrieves >= 2 && pNode->notifyFlowCtrl) + (*pNode->notifyFlowCtrl)(pNode->ahandle, 4 << (pPeer->numOfRetrieves - 2)); + } else { + pPeer->numOfRetrieves = 0; + if (pNode->notifyFlowCtrl) + (*pNode->notifyFlowCtrl)(pNode->ahandle, 0); + } + + pPeer->fileChanged = 0; tclose(pPeer->notifyFd); tclose(pPeer->syncFd); syncDecPeerRef(pPeer); diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index 74cfbf1e731fd67c47f7375198a8353740429060..38f7c8e6056644591993027b410283b41fb8560c 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -37,6 +37,7 @@ extern int32_t vDebugFlag; typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count + int32_t delay; int8_t status; int8_t role; int8_t accessState; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 77d4503d9d9e8d8a4e658d0c8c23bd1f6b10d21b..dd88fdf8a3e4bad901efeb7a47e1ac9b88a70a30 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -44,6 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static void vnodeNotifyRole(void *ahandle, int8_t role); +static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); int32_t vnodeInitResources() { @@ -277,6 +278,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.confirmForward = dnodeSendRpcVnodeWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; + syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; pVnode->sync = syncStart(&syncInfo); @@ -549,6 +551,13 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { cqStop(pVnode->cq); } +static void vnodeCtrlFlow(void *ahandle, int32_t mseconds) { + SVnodeObj *pVnode = ahandle; + if (pVnode->delay != mseconds) + vInfo("vgId:%d, sync flow control, mseconds:%d", pVnode->vgId, mseconds); + pVnode->delay = mseconds; +} + static int vnodeResetTsdb(SVnodeObj *pVnode) { char rootDir[128] = "\0"; diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 6b9b8ca4fd5a5c42028936e48b53f45ffa811903..900ff1fbbacc034b20bc0a04df5758e595bb3f32 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -78,6 +78,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { // assign version pVnode->version++; pHead->version = pVnode->version; + if (pVnode->delay) usleep(pVnode->delay*1000); + } else { // from wal or forward // for data from WAL or forward, version may be smaller if (pHead->version <= pVnode->version) return 0;