From 4069969a8989416f349d49d621d6e6bedf092045 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 17 Dec 2020 16:02:13 +0800 Subject: [PATCH] TD-1927 --- src/inc/tsync.h | 4 ---- src/sync/inc/syncInt.h | 12 +++++++++--- src/sync/src/syncMain.c | 43 +++++++++++++++++------------------------ 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 0ce2a1a495..4dae86bbed 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -119,10 +119,6 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *); extern char *syncRole[]; //global configurable parameters -extern int32_t tsMaxSyncNum; -extern int32_t tsSyncTcpThreads; -extern int32_t tsSyncTimer; -extern int32_t tsMaxFwdInfo; extern int32_t sDebugFlag; extern char tsArbitrator[]; extern uint16_t tsSyncPort; diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 535251ba11..d855c651f9 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -29,11 +29,17 @@ extern "C" { #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} +#define SYNC_TCP_THREADS 2 +#define SYNC_MAX_NUM 2 + #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024) -#define SYNC_FWD_TIMER 300 -#define SYNC_ROLE_TIMER 10000 -#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 + +#define SYNC_MAX_FWDS 512 +#define SYNC_FWD_TIMER 300 +#define SYNC_ROLE_TIMER 15000 // ms +#define SYNC_CHECK_INTERVAL 1 // ms +#define SYNC_WAIT_AFTER_CHOOSE_MASTER 10 // ms #define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 078b02556c..d0dc291257 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -29,19 +29,12 @@ #include "syncTcp.h" #include "syncInt.h" -// global configurable -int32_t tsMaxSyncNum = 2; -int32_t tsSyncTcpThreads = 2; -int32_t tsMaxFwdInfo = 512; -int32_t tsSyncTimer = 1; +int32_t tsSyncNum = 0; // number of sync in process in whole system +char tsNodeFqdn[TSDB_FQDN_LEN] = {0}; -// module global, not configurable -int32_t tsSyncNum; // number of sync in process in whole system -char tsNodeFqdn[TSDB_FQDN_LEN]; - -static void * tsTcpPool; +static void * tsTcpPool = NULL; static void * tsSyncTmrCtrl = NULL; -static void * tsVgIdHash; +static void * tsVgIdHash = NULL; static int32_t tsSyncRefId = -1; // local functions @@ -83,7 +76,7 @@ char *syncStatus[] = { int32_t syncInit() { SPoolInfo info = {0}; - info.numOfThreads = tsSyncTcpThreads; + info.numOfThreads = SYNC_TCP_THREADS; info.serverIp = 0; info.port = tsSyncPort; info.bufferSize = SYNC_MAX_SIZE; @@ -107,7 +100,7 @@ int32_t syncInit() { tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (tsVgIdHash == NULL) { - sError("failed to init tsVgIdHash"); + sError("failed to init vgIdHash"); taosTmrCleanUp(tsSyncTmrCtrl); syncCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; @@ -210,7 +203,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); - pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + tsMaxFwdInfo * sizeof(SFwdInfo), 1); + pNode->pSyncFwds = calloc(sizeof(SSyncFwds) + SYNC_MAX_FWDS * sizeof(SFwdInfo), 1); if (pNode->pSyncFwds == NULL) { sError("vgId:%d, no memory to allocate syncFwds", pNode->vgId); terrno = TAOS_SYSTEM_ERROR(errno); @@ -750,7 +743,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) { int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, check peer connection in 1000 ms", pPeer->id); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); } } @@ -828,7 +821,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); // Ensure the sync of mnode not interrupted - if (pNode->vgId != 1 && tsSyncNum >= tsMaxSyncNum) { + if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) { sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum); taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer); return; @@ -839,7 +832,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { SSyncMsg msg; syncBuildSyncReqMsg(&msg, pNode->vgId); - taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { sError("%s, failed to send sync-req to peer", pPeer->id); @@ -859,7 +852,7 @@ static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) { if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { // find the forwardInfo from first for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { - pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; + pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % SYNC_MAX_FWDS; if (pFwdRsp->version == pFwdInfo->version) { syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncRemoveConfirmedFwdInfo(pNode); @@ -995,7 +988,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); return; } @@ -1011,7 +1004,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { } else { sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno)); taosClose(connFd); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer); } } @@ -1140,15 +1133,15 @@ static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle SSyncFwds *pSyncFwds = pNode->pSyncFwds; int64_t time = taosGetTimestampMs(); - if (pSyncFwds->fwds >= tsMaxFwdInfo) { - // pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; + if (pSyncFwds->fwds >= SYNC_MAX_FWDS) { + // pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS; // pSyncFwds->fwds--; sError("vgId:%d, failed to save fwd info, hver:%" PRIu64 " fwds:%d", pNode->vgId, version, pSyncFwds->fwds); return TSDB_CODE_SYN_TOO_MANY_FWDINFO; } if (pSyncFwds->fwds > 0) { - pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; + pSyncFwds->last = (pSyncFwds->last + 1) % SYNC_MAX_FWDS; } SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; @@ -1171,7 +1164,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFwdInfo->confirmed == 0) break; - pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; + pSyncFwds->first = (pSyncFwds->first + 1) % SYNC_MAX_FWDS; pSyncFwds->fwds--; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; sTrace("vgId:%d, fwd info is removed, hver:%" PRIu64 " fwds:%d", pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); @@ -1237,7 +1230,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { if (pSyncFwds->fwds > 0) { pthread_mutex_lock(&pNode->mutex); for (int32_t i = 0; i < pSyncFwds->fwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % SYNC_MAX_FWDS; if (ABS(time - pFwdInfo->time) < 2000) break; sDebug("vgId:%d, forward info expired, hver:%" PRIu64 " curtime:%" PRIu64 " savetime:%" PRIu64, pNode->vgId, -- GitLab