diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index ef635e6efc1ca5f071c64dbe00920c3987837494..6f5e3be8ab4965331f23d2e2b7307823981205a5 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -20,6 +20,7 @@ #include "tlog.h" #include "tutil.h" #include "ttimer.h" +#include "tref.h" #include "tsocket.h" #include "tglobal.h" #include "taoserror.h" @@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN]; static ttpool_h tsTcpPool; static void * syncTmrCtrl = NULL; static void * vgIdHash; +static int tsSyncRefId = -1; // local functions static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); @@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer); static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); -static void syncAddNodeRef(SSyncNode *pNode); -static void syncDecNodeRef(SSyncNode *pNode); +static void syncFreeNode(void *); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); char* syncRole[] = { @@ -106,6 +108,12 @@ int32_t syncInit() { return -1; } + tsSyncRefId = taosOpenRef(200, syncFreeNode); + if (tsSyncRefId < 0) { + syncCleanUp(); + return -1; + } + tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); sInfo("sync module initialized successfully"); @@ -128,6 +136,9 @@ void syncCleanUp() { vgIdHash = NULL; } + taosCloseRef(tsSyncRefId); + tsSyncRefId = -1; + sInfo("sync module is cleaned up"); } @@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) { pNode->quorum = pCfg->quorum; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; + int ret = taosAddRef(tsSyncRefId, pNode); + if (ret < 0) { + syncFreeNode(pNode); + return NULL; + } + for (int i = 0; i < pCfg->replica; ++i) { const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); @@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) { } } - syncAddNodeRef(pNode); - if (pNode->selfIndex < 0) { sInfo("vgId:%d, this node is not configured", pNode->vgId); terrno = TSDB_CODE_SYN_INVALID_CONFIG; @@ -210,7 +225,9 @@ void syncStop(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; - if (pNode == NULL) return; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + sInfo("vgId:%d, cleanup sync", pNode->vgId); pthread_mutex_lock(&(pNode->mutex)); @@ -228,14 +245,17 @@ void syncStop(void *param) { pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); + taosRemoveRef(tsSyncRefId, pNode); } int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { SSyncNode *pNode = param; int i, j; - if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG; + sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, pNode->replica); @@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { syncRole[nodeRole]); syncBroadcastStatus(pNode); + taosReleaseRef(tsSyncRefId, pNode); + return 0; } int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { SSyncNode *pNode = param; - SSyncPeer *pPeer; - SSyncHead *pSyncHead; - SWalHead * pWalHead = data; - int fwdLen; - int code = 0; - - if (pNode == NULL) return 0; - - if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { - sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, - pWalHead->version, nodeVersion); - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - syncRestartConnection(pPeer); - } - return TSDB_CODE_SYN_INVALID_VERSION; - } - - // always update version - nodeVersion = pWalHead->version; - sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], - qtype, pWalHead->version); - - if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; - - // only pkt from RPC or CQ can be forwarded - if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; - // a hacker way to improve the performance - pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); - pSyncHead->type = TAOS_SMSG_FORWARD; - pSyncHead->pversion = 0; - pSyncHead->len = sizeof(SWalHead) + pWalHead->len; - fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return 0; - pthread_mutex_lock(&(pNode->mutex)); + int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); - for (int i = 0; i < pNode->replica; ++i) { - pPeer = pNode->peerInfo[i]; - if (pPeer == NULL || pPeer->peerFd < 0) continue; - if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - - if (pNode->quorum > 1 && code == 0) { - syncSaveFwdInfo(pNode, pWalHead->version, mhandle); - code = 1; - } - - int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); - if (retLen == fwdLen) { - sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); - } else { - sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); - syncRestartConnection(pPeer); - } - } - - pthread_mutex_unlock(&(pNode->mutex)); + taosReleaseRef(tsSyncRefId, pNode); return code; } void syncConfirmForward(void *param, uint64_t version, int32_t code) { SSyncNode *pNode = param; - if (pNode == NULL) return; - if (pNode->quorum <= 1) return; - SSyncPeer *pPeer = pNode->pMaster; - if (pPeer == NULL) return; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; - char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; + SSyncPeer *pPeer = pNode->pMaster; + if (pPeer && pNode->quorum > 1) { + char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; - SSyncHead *pHead = (SSyncHead *)msg; - pHead->type = TAOS_SMSG_FORWARD_RSP; - pHead->len = sizeof(SFwdRsp); + SSyncHead *pHead = (SSyncHead *)msg; + pHead->type = TAOS_SMSG_FORWARD_RSP; + pHead->len = sizeof(SFwdRsp); - SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); - pFwdRsp->version = version; - pFwdRsp->code = code; + SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); + pFwdRsp->version = version; + pFwdRsp->code = code; - int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); - int retLen = write(pPeer->peerFd, msg, msgLen); + int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); + int retLen = write(pPeer->peerFd, msg, msgLen); - if (retLen == msgLen) { - sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); - } else { - sDebug("%s, failed to send forward ack, restart", pPeer->id); - syncRestartConnection(pPeer); + if (retLen == msgLen) { + sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); + } else { + sDebug("%s, failed to send forward ack, restart", pPeer->id); + syncRestartConnection(pPeer); + } } + + taosReleaseRef(tsSyncRefId, pNode); } void syncRecover(void *param) { SSyncNode *pNode = param; SSyncPeer *pPeer; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return; + // to do: add a few lines to check if recover is OK // if take this node to unsync state, the whole system may not work @@ -414,17 +392,24 @@ void syncRecover(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); + + taosReleaseRef(tsSyncRefId, pNode); } int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { SSyncNode *pNode = param; + int ret = taosAcquireRef(tsSyncRefId, pNode); + if (ret < 0) return -1; + pNodesRole->selfIndex = pNode->selfIndex; for (int i = 0; i < pNode->replica; ++i) { pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->role[i] = pNode->peerInfo[i]->role; } + taosReleaseRef(tsSyncRefId, pNode); + return 0; } @@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) { pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); } -static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); } +static void syncFreeNode(void *param) { + SSyncNode *pNode = param; -static void syncDecNodeRef(SSyncNode *pNode) { - if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { - pthread_mutex_destroy(&pNode->mutex); - taosTFree(pNode->pRecv); - taosTFree(pNode->pSyncFwds); - taosTFree(pNode); - } + pthread_mutex_destroy(&pNode->mutex); + taosTFree(pNode->pRecv); + taosTFree(pNode->pSyncFwds); + taosTFree(pNode); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } int syncDecPeerRef(SSyncPeer *pPeer) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { - syncDecNodeRef(pPeer->pSyncNode); + taosReleaseRef(tsSyncRefId, pPeer->pSyncNode); sDebug("%s, resource is freed", pPeer->id); taosTFree(pPeer->watchFd); @@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); } - syncAddNodeRef(pNode); + taosAcquireRef(tsSyncRefId, pNode); return pPeer; } @@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) { SSyncPeer *pPeer = param; SSyncNode *pNode = pPeer->pSyncNode; - syncAddNodeRef(pNode); + if (taosAcquireRef(tsSyncRefId, pNode) < 0) return; pthread_mutex_lock(&(pNode->mutex)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); @@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) { } pthread_mutex_unlock(&(pNode->mutex)); - syncDecNodeRef(pNode); + taosReleaseRef(tsSyncRefId, pNode); } static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { @@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code static void syncMonitorFwdInfos(void *param, void *tmrId) { SSyncNode *pNode = param; + + int ret = taosAcquireRef(tsSyncRefId, pNode); + if ( ret < 0) return; + SSyncFwds *pSyncFwds = pNode->pSyncFwds; - if (pSyncFwds == NULL) return; - uint64_t time = taosGetTimestampMs(); + if (pSyncFwds) {; + uint64_t time = taosGetTimestampMs(); - if (pSyncFwds->fwds > 0) { - pthread_mutex_lock(&(pNode->mutex)); - for (int i = 0; i < pSyncFwds->fwds; ++i) { - SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; - if (time - pFwdInfo->time < 2000) break; - syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + if (pSyncFwds->fwds > 0) { + pthread_mutex_lock(&(pNode->mutex)); + for (int i = 0; i < pSyncFwds->fwds; ++i) { + SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; + if (time - pFwdInfo->time < 2000) break; + syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); + } + + syncRemoveConfirmedFwdInfo(pNode); + pthread_mutex_unlock(&(pNode->mutex)); } - syncRemoveConfirmedFwdInfo(pNode); - pthread_mutex_unlock(&(pNode->mutex)); + pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); } - pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); + taosReleaseRef(tsSyncRefId, pNode); } + +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) { + SSyncPeer *pPeer; + SSyncHead *pSyncHead; + SWalHead * pWalHead = data; + int fwdLen; + int32_t code = 0; + + if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) { + sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId, + pWalHead->version, nodeVersion); + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + syncRestartConnection(pPeer); + } + return TSDB_CODE_SYN_INVALID_VERSION; + } + + // always update version + nodeVersion = pWalHead->version; + sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], + qtype, pWalHead->version); + + if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; + + // only pkt from RPC or CQ can be forwarded + if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; + + // a hacker way to improve the performance + pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); + pSyncHead->type = TAOS_SMSG_FORWARD; + pSyncHead->pversion = 0; + pSyncHead->len = sizeof(SWalHead) + pWalHead->len; + fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head + + pthread_mutex_lock(&(pNode->mutex)); + + for (int i = 0; i < pNode->replica; ++i) { + pPeer = pNode->peerInfo[i]; + if (pPeer == NULL || pPeer->peerFd < 0) continue; + if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; + + if (pNode->quorum > 1 && code == 0) { + syncSaveFwdInfo(pNode, pWalHead->version, mhandle); + code = 1; + } + + int retLen = write(pPeer->peerFd, pSyncHead, fwdLen); + if (retLen == fwdLen) { + sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + } else { + sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); + syncRestartConnection(pPeer); + } + } + + pthread_mutex_unlock(&(pNode->mutex)); + + return code; +} + diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 5e4a0990c18245dfda6c38cd65836e0d217444bf..102d5ba93fb79ff8874660b240c26937cce6dce7 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -134,13 +134,66 @@ sql select join_mt0.ts, join_mt1.t1, join_mt0.t1, join_mt1.tbname, join_mt0.tbna #1970-01-01 08:01:40.800 | 10 | 45.000000000 | 0 | true | false | 0 | #1970-01-01 08:01:40.790 | 10 | 945.000000000 | 90 | true | true | 0 | - sql select count(join_mt0.c1), sum(join_mt1.c2), first(join_mt0.c5), last(join_mt1.c7), first(join_mt1.c7) from join_mt0, join_mt1 where join_mt0.t1=join_mt1.t1 and join_mt0.ts=join_mt1.ts interval(10a) group by join_mt0.t1 order by join_mt0.ts desc limit 20 offset 19; -if $rows != 100 then +if $rows != 20 then + return -1 +endi + +if $data00 != @70-01-01 08:01:40.800@ then + return -1 +endi + +if $data01 != 10 then + return -1 +endi + +if $data02 != 45.000000000 then return -1 endi -# c5 is null ! error +if $data03 != 0 then + return -1 +endi + +if $data04 != 1 then + return -1 +endi + +if $data05 != 0 then + return -1 +endi + +if $data06 != 0 then + return -1 +endi + +if $data10 != @70-01-01 08:01:40.790@ then + return -1 +endi + +if $data11 != 10 then + return -1 +endi + +if $data12 != 945.000000000 then + return -1 +endi + +if $data13 != 90 then + return -1 +endi + +if $data14 != 1 then + return -1 +endi + +if $data15 != 1 then + return -1 +endi + +if $data16 != 0 then + return -1 +endi sql select count(join_mt0.c1), sum(join_mt0.c2)/count(*), avg(c2), first(join_mt0.c5), last(c7) from join_mt0 interval(10a) group by join_mt0.t1 order by join_mt0.ts desc; if $rows != 100 then