diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index e1d298089caec13af8f34d508a3620d9b4508b0f..bcbc98b5b3aff0825a4d92f8ff612344c3720ce6 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -16,11 +16,13 @@ #define _DEFAULT_SOURCE #include "os.h" #include "taosdef.h" +#include "taosmsg.h" #include "tglobal.h" #include "mnode.h" #include "http.h" #include "tmqtt.h" #include "monitor.h" +#include "dnode.h" #include "dnodeInt.h" #include "dnodeModule.h" @@ -129,17 +131,32 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) { for (int32_t module = TSDB_MOD_MNODE; module < TSDB_MOD_HTTP; ++module) { bool enableModule = moduleStatus & (1 << module); if (!tsModule[module].enable && enableModule) { - dInfo("module status:%u is received, start %s module", tsModuleStatus, tsModule[module].name); + dInfo("module status:%u is set, start %s module", moduleStatus, tsModule[module].name); tsModule[module].enable = true; dnodeSetModuleStatus(module); (*tsModule[module].startFp)(); } if (tsModule[module].enable && !enableModule) { - dInfo("module status:%u is received, stop %s module", tsModuleStatus, tsModule[module].name); + dInfo("module status:%u is set, stop %s module", moduleStatus, tsModule[module].name); tsModule[module].enable = false; dnodeUnSetModuleStatus(module); (*tsModule[module].stopFp)(); } } } + +void dnodeCheckModules() { + if (tsModuleStatus & TSDB_MOD_MNODE) return; + + SDMMnodeInfos *mnodes = dnodeGetMnodeInfos(); + for (int32_t i = 0; i < mnodes->nodeNum; ++i) { + SDMMnodeInfo *node = &mnodes->nodeInfos[i]; + if (node->nodeId == dnodeGetDnodeId()) { + uint32_t moduleStatus = tsModuleStatus | (1 << TSDB_MOD_MNODE);; + dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus); + dnodeProcessModuleStatus(moduleStatus); + break; + } + } +} diff --git a/src/inc/dnode.h b/src/inc/dnode.h index 093ce93205646313a5f0ca3bec4d121ad8e20776..028041b2d25147311315d23e67bad3b2e4d1bcf9 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -43,6 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet); void dnodeGetMnodeEpSetForShell(void *epSet); void * dnodeGetMnodeInfos(); int32_t dnodeGetDnodeId(); +void dnodeCheckModules(); void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg)); void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg); diff --git a/src/inc/tsync.h b/src/inc/tsync.h index ff9c9901bd91e05f443ada68508cdccbc13ef066..ca0f70d104d603d176d89dd5b92979433f390466 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -103,6 +103,9 @@ typedef struct { typedef void* tsync_h; +int32_t syncInit(); +void syncCleanUp(); + tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8928a6622d5905f7f63e462d818e8a0e98bcec79..3ff5cc48285909fac861271d78441c009fbccd7c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -22,6 +22,7 @@ #include "tqueue.h" #include "twal.h" #include "tsync.h" +#include "ttimer.h" #include "tglobal.h" #include "dnode.h" #include "mnode.h" @@ -88,6 +89,8 @@ typedef struct { SSdbWriteWorker *writeWorker; } SSdbWriteWorkerPool; +extern void * tsMnodeTmr; +static void * tsUpdateSyncTmr; static SSdbObject tsSdbObj = {0}; static taos_qset tsSdbWriteQset; static taos_qall tsSdbWriteQall; @@ -290,11 +293,16 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { taosFreeQitem(pOper); } +static void sdbUpdateSyncTmrFp(void *param, void *tmrId) { sdbUpdateSync(); } + void sdbUpdateSync() { if (!mnodeIsRunning()) { mDebug("mnode not start yet, update sync info later"); + dnodeCheckModules(); + taosTmrReset(sdbUpdateSyncTmrFp, 1000, NULL, tsMnodeTmr, &tsUpdateSyncTmr); return; } + mDebug("update sync info in sdb"); SSyncCfg syncCfg = {0}; int32_t index = 0; @@ -387,8 +395,6 @@ int32_t sdbInit() { tsSdbObj.role = TAOS_SYNC_ROLE_MASTER; } - sdbUpdateSync(); - tsSdbObj.status = SDB_STATUS_SERVING; return TSDB_CODE_SUCCESS; } diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index da40160bc19fea6babf2d2cddf9ea6b6f261f2fe..f96b902efde28c62af194047b64485a8f83c0e45 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -30,23 +30,19 @@ #include "syncInt.h" // global configurable -int tsMaxSyncNum = 2; -int tsSyncTcpThreads = 2; -int tsMaxWatchFiles = 500; -int tsMaxFwdInfo = 200; -int tsSyncTimer = 1; -//int sDebugFlag = 135; -//char tsArbitrator[TSDB_FQDN_LEN] = {0}; +int tsMaxSyncNum = 2; +int tsSyncTcpThreads = 2; +int tsMaxWatchFiles = 500; +int tsMaxFwdInfo = 200; +int tsSyncTimer = 1; // module global, not configurable -int tsSyncNum; // number of sync in process in whole system -char tsNodeFqdn[TSDB_FQDN_LEN]; +int tsSyncNum; // number of sync in process in whole system +char tsNodeFqdn[TSDB_FQDN_LEN]; -static int tsNodeNum; // number of nodes in system -static ttpool_h tsTcpPool; -static void *syncTmrCtrl = NULL; -static void *vgIdHash; -static pthread_once_t syncModuleInit = PTHREAD_ONCE_INIT; +static ttpool_h tsTcpPool; +static void * syncTmrCtrl = NULL; +static void * vgIdHash; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -75,7 +71,7 @@ char* syncRole[] = { "master" }; -static void syncModuleInitFunc() { +int32_t syncInit() { SPoolInfo info; info.numOfThreads = tsSyncTcpThreads; @@ -87,25 +83,52 @@ static void syncModuleInitFunc() { info.processIncomingConn = syncProcessIncommingConnection; tsTcpPool = taosOpenTcpThreadPool(&info); - if (tsTcpPool == NULL) return; + if (tsTcpPool == NULL) { + sError("failed to init tcpPool"); + return -1; + } syncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC"); if (syncTmrCtrl == NULL) { + sError("failed to init tmrCtrl"); taosCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; - return; + return -1; } - + vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true); if (vgIdHash == NULL) { + sError("failed to init vgIdHash"); taosTmrCleanUp(syncTmrCtrl); taosCloseTcpThreadPool(tsTcpPool); tsTcpPool = NULL; syncTmrCtrl = NULL; - return; - } + return -1; + } tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); + sInfo("sync module initialized successfully"); + + return 0; +} + +void syncCleanUp() { + if (tsTcpPool) { + taosCloseTcpThreadPool(tsTcpPool); + tsTcpPool = NULL; + } + + if (syncTmrCtrl) { + taosTmrCleanUp(syncTmrCtrl); + syncTmrCtrl = NULL; + } + + if (vgIdHash) { + taosHashCleanup(vgIdHash); + vgIdHash = NULL; + } + + sInfo("sync module is cleaned up"); } void *syncStart(const SSyncInfo *pInfo) { @@ -118,15 +141,6 @@ void *syncStart(const SSyncInfo *pInfo) { return NULL; } - pthread_once(&syncModuleInit, syncModuleInitFunc); - if (tsTcpPool == NULL) { - free(pNode); - syncModuleInit = PTHREAD_ONCE_INIT; - sError("failed to init sync module(%s)", tstrerror(errno)); - return NULL; - } - - atomic_add_fetch_32(&tsNodeNum, 1); tstrncpy(pNode->path, pInfo->path, sizeof(pNode->path)); pthread_mutex_init(&pNode->mutex, NULL); @@ -138,7 +152,7 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->confirmForward = pInfo->confirmForward; pNode->notifyFlowCtrl = pInfo->notifyFlowCtrl; pNode->notifyFileSynced = pInfo->notifyFileSynced; - + pNode->selfIndex = -1; pNode->vgId = pInfo->vgId; pNode->replica = pCfg->replica; @@ -148,8 +162,9 @@ void *syncStart(const SSyncInfo *pInfo) { for (int i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); - if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) + if ((strcmp(pNodeInfo->nodeFqdn, tsNodeFqdn) == 0) && (pNodeInfo->nodePort == tsSyncPort)) { pNode->selfIndex = i; + } } if (pNode->selfIndex < 0) { @@ -181,16 +196,17 @@ void *syncStart(const SSyncInfo *pInfo) { syncAddArbitrator(pNode); syncAddNodeRef(pNode); taosHashPut(vgIdHash, (const char *)&pNode->vgId, sizeof(int32_t), (char *)(&pNode), sizeof(SSyncNode *)); - - if (pNode->notifyRole) - (*pNode->notifyRole)(pNode->ahandle, nodeRole); + + if (pNode->notifyRole) { + (*pNode->notifyRole)(pNode->ahandle, nodeRole); + } return pNode; } void syncStop(void *param) { - SSyncNode * pNode = param; - SSyncPeer *pPeer; + SSyncNode *pNode = param; + SSyncPeer *pPeer; if (pNode == NULL) return; sInfo("vgId:%d, cleanup sync", pNode->vgId); @@ -199,7 +215,7 @@ void syncStop(void *param) { for (int i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer) syncRemovePeer(pPeer); + if (pPeer) syncRemovePeer(pPeer); } pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; @@ -214,19 +230,19 @@ void syncStop(void *param) { } int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { - SSyncNode * pNode = param; - int i, j; + SSyncNode *pNode = param; + int i, j; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; - sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], - pNewCfg->replica, pNode->replica); + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, + pNode->replica); pthread_mutex_lock(&(pNode->mutex)); for (i = 0; i < pNode->replica; ++i) { for (j = 0; j < pNewCfg->replica; ++j) { - if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) && - (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort)) + if ((strcmp(pNode->peerInfo[i]->fqdn, pNewCfg->nodeInfo[j].nodeFqdn) == 0) && + (pNode->peerInfo[i]->port == pNewCfg->nodeInfo[j].nodePort)) break; } @@ -241,8 +257,8 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { const SNodeInfo *pNewNode = &pNewCfg->nodeInfo[i]; for (j = 0; j < pNode->replica; ++j) { - if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) && - (pNode->peerInfo[j]->port == pNewNode->nodePort)) + if (pNode->peerInfo[j] && (strcmp(pNode->peerInfo[j]->fqdn, pNewNode->nodeFqdn) == 0) && + (pNode->peerInfo[j]->port == pNewNode->nodePort)) break; } @@ -252,8 +268,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { newPeers[i] = pNode->peerInfo[j]; } - if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) + if ((strcmp(pNewNode->nodeFqdn, tsNodeFqdn) == 0) && (pNewNode->nodePort == tsSyncPort)) { pNode->selfIndex = i; + } } pNode->replica = pNewCfg->replica; @@ -261,8 +278,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; memcpy(pNode->peerInfo, newPeers, sizeof(SSyncPeer *) * pNewCfg->replica); - for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) + for (i = pNewCfg->replica; i < TAOS_SYNC_MAX_REPLICA; ++i) { pNode->peerInfo[i] = NULL; + } syncAddArbitrator(pNode); @@ -274,43 +292,44 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { pthread_mutex_unlock(&(pNode->mutex)); - sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, syncRole[nodeRole]); + sInfo("vgId:%d, %d replicas are configured, quorum:%d role:%s", pNode->vgId, pNode->replica, pNode->quorum, + syncRole[nodeRole]); syncBroadcastStatus(pNode); return 0; } int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { - SSyncNode * pNode = param; - SSyncPeer * pPeer; - SSyncHead *pSyncHead; - SWalHead *pWalHead = data; - int fwdLen; - int code = 0; + SSyncNode *pNode = param; + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead * pWalHead = data; + int fwdLen; + int code = 0; if (pNode == NULL) return 0; // always update version nodeVersion = pWalHead->version; - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER ) return 0; + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; // only pkt from RPC or CQ can be forwarded if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; // a hacker way to improve the performance - pSyncHead = (SSyncHead *) ( ((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead->type = TAOS_SMSG_FORWARD; pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; - fwdLen = pSyncHead->len + sizeof(SSyncHead); //include the WAL and SYNC head + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head pthread_mutex_lock(&(pNode->mutex)); for (int i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer == NULL || pPeer->peerFd <0) continue; - if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - + if (pPeer == NULL || pPeer->peerFd < 0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + if (pNode->quorum > 1 && code == 0) { syncSaveFwdInfo(pNode, pWalHead->version, mhandle); code = 1; @@ -335,12 +354,12 @@ void syncConfirmForward(void *param, uint64_t version, int32_t code) { if (pNode == NULL) return; if (pNode->quorum <= 1) return; - SSyncPeer *pPeer = pNode->pMaster; + SSyncPeer *pPeer = pNode->pMaster; if (pPeer == NULL) return; char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; - SSyncHead *pHead = (SSyncHead *) msg; + SSyncHead *pHead = (SSyncHead *)msg; pHead->type = TAOS_SMSG_FORWARD_RSP; pHead->len = sizeof(SFwdRsp); @@ -363,7 +382,7 @@ void syncRecover(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; - // to do: add a few lines to check if recover is OK + // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work nodeRole = TAOS_SYNC_ROLE_UNSYNCED; @@ -373,7 +392,7 @@ void syncRecover(void *param) { pthread_mutex_lock(&(pNode->mutex)); for (int i = 0; i < pNode->replica; ++i) { - pPeer = (SSyncPeer *) pNode->peerInfo[i]; + pPeer = (SSyncPeer *)pNode->peerInfo[i]; if (pPeer->peerFd >= 0) { syncRestartConnection(pPeer); } @@ -386,7 +405,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; pNodesRole->selfIndex = pNode->selfIndex; - for (int i=0; ireplica; ++i) { + for (int i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } @@ -410,7 +429,7 @@ static void syncAddArbitrator(SSyncNode *pNode) { if (-1 == ret) { nodeInfo.nodePort = tsArbitratorPort; } - + if (pPeer) { if ((strcmp(nodeInfo.nodeFqdn, pPeer->fqdn) == 0) && (nodeInfo.nodePort == pPeer->port)) { return; @@ -418,39 +437,26 @@ static void syncAddArbitrator(SSyncNode *pNode) { syncRemovePeer(pPeer); pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = NULL; } - } + } pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) -{ - atomic_add_fetch_8(&pNode->refCount, 1); +static void syncAddNodeRef(SSyncNode *pNode) { + atomic_add_fetch_8(&pNode->refCount, 1); } -static void syncDecNodeRef(SSyncNode *pNode) -{ +static void syncDecNodeRef(SSyncNode *pNode) { if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { pthread_mutex_destroy(&pNode->mutex); taosTFree(pNode->pRecv); taosTFree(pNode->pSyncFwds); taosTFree(pNode); - - if (atomic_sub_fetch_32(&tsNodeNum, 1) == 0) { - if (tsTcpPool) taosCloseTcpThreadPool(tsTcpPool); - if (syncTmrCtrl) taosTmrCleanUp(syncTmrCtrl); - if (vgIdHash) taosHashCleanup(vgIdHash); - syncTmrCtrl = NULL; - tsTcpPool = NULL; - vgIdHash = NULL; - syncModuleInit = PTHREAD_ONCE_INIT; - sDebug("sync module is cleaned up"); - } } } void syncAddPeerRef(SSyncPeer *pPeer) { - atomic_add_fetch_8(&pPeer->refCount, 1); + atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { @@ -486,8 +492,8 @@ static void syncRemovePeer(SSyncPeer *pPeer) { static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); if (ip == -1) return NULL; - - SSyncPeer *pPeer = (SSyncPeer *) calloc(1, sizeof(SSyncPeer)); + + SSyncPeer *pPeer = (SSyncPeer *)calloc(1, sizeof(SSyncPeer)); if (pPeer == NULL) return NULL; pPeer->nodeId = pInfo->nodeId; @@ -506,9 +512,11 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { int ret = strcmp(pPeer->fqdn, tsNodeFqdn); if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) { sDebug("%s, start to check peer connection", pPeer->id); - taosTmrReset(syncCheckPeerConnection, 100 + (pNode->vgId*10)%100, pPeer, syncTmrCtrl, &pPeer->timer); + int32_t checkMs = 100 + (pNode->vgId * 10) % 100; + if (pNode->vgId) checkMs = tsStatusInterval * 2000 + 100; + taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - + syncAddNodeRef(pNode); return pPeer; } @@ -542,16 +550,18 @@ static void syncChooseMaster(SSyncNode *pNode) { sDebug("vgId:%d, choose master", pNode->vgId); for (int i = 0; i < pNode->replica; ++i) { - if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) + if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { onlineNum++; + } } if (onlineNum == pNode->replica) { // if all peers are online, peer with highest version shall be master index = 0; for (int i = 1; i < pNode->replica; ++i) { - if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) + if (pNode->peerInfo[i]->version > pNode->peerInfo[index]->version) { index = i; + } } } @@ -568,8 +578,9 @@ static void syncChooseMaster(SSyncNode *pNode) { //slave with highest version shall be master pPeer = pNode->peerInfo[i]; if (pPeer->role == TAOS_SYNC_ROLE_SLAVE || pPeer->role == TAOS_SYNC_ROLE_MASTER) { - if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) + if (index < 0 || pPeer->version > pNode->peerInfo[index]->version) { index = i; + } } } } @@ -595,8 +606,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) { int replica = pNode->replica; for (int i = 0; i < pNode->replica; ++i) { - if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) + if (pNode->peerInfo[i]->role != TAOS_SYNC_ROLE_OFFLINE) { onlineNum++; + } } // add arbitrator connection @@ -644,7 +656,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { code = -1; for (int i = 0; i < pNode->replica; ++i) { - if ( i == pNode->selfIndex ) continue; + if (i == pNode->selfIndex) continue; syncRestartPeer(pNode->peerInfo[i]); } } @@ -661,12 +673,11 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne pNode->peerInfo[pNode->selfIndex]->version = nodeVersion; pPeer->role = newRole; - sDebug("%s, own role:%s, new peer role:%s", pPeer->id, - syncRole[nodeRole], syncRole[pPeer->role]); + sDebug("%s, own role:%s, new peer role:%s", pPeer->id, syncRole[nodeRole], syncRole[pPeer->role]); SSyncPeer *pMaster = syncCheckMaster(pNode); - if ( pMaster ) { + if (pMaster) { // master is there pNode->pMaster = pMaster; sDebug("%s, it is the master, ver:%" PRIu64, pMaster->id, pMaster->version); @@ -691,27 +702,30 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne for (i = 0; i < pNode->replica; ++i) { SSyncPeer *pTemp = pNode->peerInfo[i]; if (pTemp->role != peersStatus[i].role) break; - if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break; + if ((pTemp->role != TAOS_SYNC_ROLE_OFFLINE) && (pTemp->version != peersStatus[i].version)) break; } - + if (i >= pNode->replica) consistent = 1; } else { if (pNode->replica == 2) consistent = 1; } - if (consistent) + if (consistent) { syncChooseMaster(pNode); + } } if (syncRequired) { syncRecoverFromMaster(pMaster); } - if (peerOldRole != newRole || nodeRole != selfOldRole) + if (peerOldRole != newRole || nodeRole != selfOldRole) { syncBroadcastStatus(pNode); + } - if (nodeRole != TAOS_SYNC_ROLE_MASTER) + if (nodeRole != TAOS_SYNC_ROLE_MASTER) { syncResetFlowCtrl(pNode); + } } static void syncRestartPeer(SSyncPeer *pPeer) { @@ -722,8 +736,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) { pPeer->sstatus = TAOS_SYNC_STATUS_INIT; int ret = strcmp(pPeer->fqdn, tsNodeFqdn); - if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) + if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) { taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); + } } void syncRestartConnection(SSyncPeer *pPeer) { @@ -747,13 +762,13 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { if (pPeer->sstatus != TAOS_SYNC_STATUS_INIT) { sDebug("%s, sync is already started", pPeer->id); - return; // already started + return; // already started } // start a new thread to retrieve the data syncAddPeerRef(pPeer); - pthread_attr_t thattr; - pthread_t thread; + pthread_attr_t thattr; + pthread_t thread; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); int ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer); @@ -780,8 +795,8 @@ static void syncNotStarted(void *param, void *tmrId) { } static void syncTryRecoverFromMaster(void *param, void *tmrId) { - SSyncPeer *pPeer = param; - SSyncNode *pNode = pPeer->pSyncNode; + SSyncPeer *pPeer = param; + SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&(pNode->mutex)); syncRecoverFromMaster(pPeer); @@ -805,7 +820,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { return; } - sDebug("%s, try to sync", pPeer->id) + sDebug("%s, try to sync", pPeer->id); SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); @@ -814,49 +829,47 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; - taosTmrReset(syncNotStarted, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); - if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt) ) { + if (write(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { sError("%s, failed to send sync-req to peer", pPeer->id); } else { nodeSStatus = TAOS_SYNC_STATUS_START; sInfo("%s, sync-req is sent", pPeer->id); } - - return; } static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { - SSyncNode * pNode = pPeer->pSyncNode; - SFwdRsp *pFwdRsp = (SFwdRsp *) cont; - SSyncFwds *pSyncFwds = pNode->pSyncFwds; - SFwdInfo *pFwdInfo; + SSyncNode *pNode = pPeer->pSyncNode; + SFwdRsp * pFwdRsp = (SFwdRsp *)cont; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; + SFwdInfo * pFwdInfo; sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version); SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { // find the forwardInfo from first - for (int i=0; ifwds; ++i) { - pFwdInfo = pSyncFwds->fwdInfo + (i+pSyncFwds->first)%tsMaxFwdInfo; + for (int i = 0; i < pSyncFwds->fwds; ++i) { + pFwdInfo = pSyncFwds->fwdInfo + (i + pSyncFwds->first) % tsMaxFwdInfo; if (pFwdRsp->version == pFwdInfo->version) break; } - + syncProcessFwdAck(pNode, pFwdInfo, pFwdRsp->code); syncRemoveConfirmedFwdInfo(pNode); } } static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { - SSyncNode * pNode = pPeer->pSyncNode; - SWalHead *pHead = (SWalHead *)cont; + SSyncNode *pNode = pPeer->pSyncNode; + SWalHead * pHead = (SWalHead *)cont; sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { - //nodeVersion = pHead->version; + // nodeVersion = pHead->version; (*pNode->writeToCache)(pNode->ahandle, pHead, TAOS_QTYPE_FWD); - } else { + } else { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) { syncSaveIntoBuffer(pPeer, pHead); } else { @@ -877,12 +890,13 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { pPeer->version = pPeersStatus->version; syncCheckRole(pPeer, pPeersStatus->peersStatus, pPeersStatus->role); - if (pPeersStatus->ack) + if (pPeersStatus->ack) { syncSendPeersStatusMsgToPeer(pPeer, 0); + } } static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { - if (pPeer->peerFd <0) return -1; + if (pPeer->peerFd < 0) return -1; int hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); if (hlen != sizeof(SSyncHead)) { @@ -906,9 +920,9 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { } static int syncProcessPeerMsg(void *param, void *buffer) { - SSyncPeer * pPeer = param; - SSyncHead head; - char *cont = (char *)buffer; + SSyncPeer *pPeer = param; + SSyncHead head; + char * cont = (char *)buffer; SSyncNode *pNode = pPeer->pSyncNode; pthread_mutex_lock(&(pNode->mutex)); @@ -932,16 +946,16 @@ static int syncProcessPeerMsg(void *param, void *buffer) { return code; } -#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA +#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) { SSyncNode *pNode = pPeer->pSyncNode; char msg[statusMsgLen] = {0}; - if (pPeer->peerFd <0 || pPeer->ip ==0) return; + if (pPeer->peerFd < 0 || pPeer->ip == 0) return; - SSyncHead *pHead = (SSyncHead *) msg; - SPeersStatus *pPeersStatus = (SPeersStatus *) (msg + sizeof(SSyncHead)); + SSyncHead * pHead = (SSyncHead *)msg; + SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead)); pHead->type = TAOS_SMSG_STATUS; pHead->len = statusMsgLen - sizeof(SSyncHead); @@ -979,28 +993,28 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { int connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); if (connFd < 0) { sDebug("%s, failed to open tcp socket(%s)", pPeer->id, strerror(errno)); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); return; } SFirstPkt firstPkt; memset(&firstPkt, 0, sizeof(firstPkt)); - firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId:0; + firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0; firstPkt.syncHead.type = TAOS_SMSG_STATUS; - tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); + tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); firstPkt.port = tsSyncPort; firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { sDebug("%s, connection to peer server is setup", pPeer->id); - pPeer->peerFd = connFd; + pPeer->peerFd = connFd; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); syncAddPeerRef(pPeer); } else { sDebug("try later"); close(connFd); - taosTmrReset(syncCheckPeerConnection, tsSyncTimer *1000, pPeer, syncTmrCtrl, &pPeer->timer); + taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer); } } @@ -1011,7 +1025,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, check peer connection", pPeer->id); - syncSetupPeerConnection(pPeer); + syncSetupPeerConnection(pPeer); pthread_mutex_unlock(&(pNode->mutex)); } @@ -1020,7 +1034,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { taosTmrStopA(&pPeer->timer); pthread_attr_t thattr; - pthread_t thread; + pthread_t thread; pthread_attr_init(&thattr); pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED); @@ -1032,15 +1046,15 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { sError("%s, failed to create sync thread", pPeer->id); taosClose(pPeer->syncFd); syncDecPeerRef(pPeer); - } else { + } else { sInfo("%s, sync connection is up", pPeer->id); } } static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { - char ipstr[24]; - int i; - + char ipstr[24]; + int i; + tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); @@ -1065,8 +1079,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { SSyncPeer *pPeer; for (i = 0; i < pNode->replica; ++i) { pPeer = pNode->peerInfo[i]; - if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) - break; + if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break; } pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL; @@ -1091,8 +1104,6 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) { } pthread_mutex_unlock(&(pNode->mutex)); - - return; } static void syncProcessBrokenLink(void *param) { @@ -1121,10 +1132,12 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { if (pSyncFwds->fwds >= tsMaxFwdInfo) { pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->fwds--; - } + } + + if (pSyncFwds->fwds > 0) { + pSyncFwds->last = (pSyncFwds->last + 1) % tsMaxFwdInfo; + } - if (pSyncFwds->fwds > 0) - pSyncFwds->last = (pSyncFwds->last+1) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last; pFwdInfo->version = version; pFwdInfo->mhandle = mhandle; @@ -1140,14 +1153,14 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; int fwds = pSyncFwds->fwds; - for (int i=0; ifwdInfo + pSyncFwds->first; + for (int i = 0; i < fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFwdInfo->confirmed == 0) break; - pSyncFwds->first = (pSyncFwds->first+1) % tsMaxFwdInfo; + pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->fwds--; if (pSyncFwds->fwds == 0) pSyncFwds->first = pSyncFwds->last; - //sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", + // sDebug("vgId:%d, fwd info is removed, ver:%d, fwds:%d", // pNode->vgId, pFwdInfo->version, pSyncFwds->fwds); memset(pFwdInfo, 0, sizeof(SFwdInfo)); } @@ -1159,12 +1172,14 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code if (code == 0) { pFwdInfo->acks++; - if (pFwdInfo->acks >= pNode->quorum-1) + if (pFwdInfo->acks >= pNode->quorum - 1) { confirm = 1; + } } else { pFwdInfo->nacks++; - if (pFwdInfo->nacks > pNode->replica-pNode->quorum) + if (pFwdInfo->nacks > pNode->replica - pNode->quorum) { confirm = 1; + } } if (confirm && pFwdInfo->confirmed == 0) { @@ -1181,15 +1196,15 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { if (pSyncFwds->fwds > 0) { pthread_mutex_lock(&(pNode->mutex)); - for (int i=0; ifwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first+i) % tsMaxFwdInfo; + for (int i = 0; i < pSyncFwds->fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; if (time - pFwdInfo->time < 2000) break; syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); } syncRemoveConfirmedFwdInfo(pNode); pthread_mutex_unlock(&(pNode->mutex)); - } - + } + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 4a2d9859b943c10550ae69b119627f6025b073bd..a4e88fb9468e47f91e908ea669456d52b2592765 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -57,6 +57,9 @@ void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} #endif int32_t vnodeInitResources() { + int code = syncInit(); + if (code != 0) return code; + vnodeInitWriteFp(); vnodeInitReadFp(); @@ -70,11 +73,12 @@ int32_t vnodeInitResources() { } void vnodeCleanupResources() { - if (tsDnodeVnodesHash != NULL) { taosHashCleanup(tsDnodeVnodesHash); tsDnodeVnodesHash = NULL; } + + syncCleanUp(); } int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { diff --git a/tests/script/unique/cluster/balance3.sim b/tests/script/unique/cluster/balance3.sim index cd669b69b6cf6f826f411e1f99ac51247a604995..e3b8125d8c34a7e59aaac4f3df56e4650fb6cc68 100644 --- a/tests/script/unique/cluster/balance3.sim +++ b/tests/script/unique/cluster/balance3.sim @@ -105,6 +105,15 @@ if $dnode4Vnodes != null then goto show1 endi +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step2 print ========= start dnode4 sql create dnode $hostname4 @@ -132,6 +141,15 @@ if $dnode4Vnodes != 2 then goto show2 endi +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step3 print ========= drop dnode2 sql drop dnode $hostname2 @@ -167,6 +185,15 @@ if $dnode4Vnodes != 3 then goto show3 endi +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + system sh/exec.sh -n dnode2 -s stop -x SIGINT print ============================== step4 @@ -195,6 +222,15 @@ if $dnode5Vnodes != 2 then goto show4 endi +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step5 print ========= drop dnode3 sql drop dnode $hostname3 @@ -232,6 +268,15 @@ endi system sh/exec.sh -n dnode3 -s stop -x SIGINT +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step6 sql create dnode $hostname6 system sh/exec.sh -n dnode6 -s start @@ -258,6 +303,15 @@ if $dnode6Vnodes != 2 then goto show6 endi +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step7 print ========= drop dnode4 sql drop dnode $hostname4 @@ -294,6 +348,14 @@ if $dnode4Vnodes != null then endi system sh/exec.sh -n dnode4 -s stop -x SIGINT +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 print ============================== step8 sql create dnode $hostname7 @@ -321,6 +383,15 @@ if $dnode7Vnodes != 2 then goto show8 endi +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step9 print ========= drop dnode1 system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -335,15 +406,20 @@ sql show mnodes $dnode1Role = $data2_1 $dnode4Role = $data2_4 $dnode5Role = $data2_5 -print dnode1 ==> $dnode1Role -print dnode4 ==> $dnode4Role -print dnode5 ==> $dnode5Role +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 if $dnode1Role != offline then return -1 endi print ============================== step9.1 +sleep 2000 system sh/exec.sh -n dnode1 -s start $x = 0 @@ -353,6 +429,19 @@ show9: if $x == 20 then return -1 endi + +sql show mnodes +$dnode1Role = $data2_1 +$dnode4Role = $data2_4 +$dnode5Role = $data2_5 +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + sql show dnodes -x show9 $dnode5Vnodes = $data2_5 print dnode5 $dnode5Vnodes @@ -374,6 +463,15 @@ endi system sh/exec.sh -n dnode1 -s stop -x SIGINT sleep 5000 +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +print dnode3 ==> $data2_3 +print dnode4 ==> $data2_4 +print dnode5 ==> $data2_5 +print dnode6 ==> $data2_6 +print dnode7 ==> $data2_7 + print ============================== step11 print ========= add db4 diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 0529808b6b7068c552a26b886d7f5ffb6da3dc3d..adc2fd0b9d5624d86de296b6b9f80108938301a4 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -667,7 +667,7 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) { TAOS_RES* pSql = NULL; - for (int attempt = 0; attempt < 3; ++attempt) { + for (int attempt = 0; attempt < 10; ++attempt) { simLogSql(rest, false); pSql = taos_query(script->taos, rest); ret = taos_errno(pSql);