提交 232d59a9 编写于 作者: S Shengliang Guan

TD-1927

上级 1e4a66d8
......@@ -86,9 +86,10 @@ typedef struct SsyncPeer {
int32_t peerFd; // forward FD
int32_t numOfRetrieves; // number of retrieves tried
int32_t fileChanged; // a flag to indicate file is changed during retrieving process
int32_t refCount;
int64_t rid;
void * timer;
void * pConn;
int32_t refCount; // reference count
struct SSyncNode *pSyncNode;
} SSyncPeer;
......@@ -98,6 +99,7 @@ typedef struct SSyncNode {
int8_t quorum;
int8_t selfIndex;
uint32_t vgId;
int32_t refCount;
int64_t rid;
SSyncPeer * peerInfo[TAOS_SYNC_MAX_REPLICA + 1]; // extra one for arbitrator
SSyncPeer * pMaster;
......@@ -121,13 +123,13 @@ extern int32_t tsSyncNum;
extern char tsNodeFqdn[TSDB_FQDN_LEN];
extern char * syncStatus[];
void *syncRetrieveData(void *param);
void *syncRestoreData(void *param);
void * syncRetrieveData(void *param);
void * syncRestoreData(void *param);
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead);
void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer);
SSyncPeer *syncAcquirePeer(int64_t rid);
void syncReleasePeer(SSyncPeer *pPeer);
#ifdef __cplusplus
}
......
......@@ -35,7 +35,8 @@ char tsNodeFqdn[TSDB_FQDN_LEN] = {0};
static void * tsTcpPool = NULL;
static void * tsSyncTmrCtrl = NULL;
static void * tsVgIdHash = NULL;
static int32_t tsSyncRefId = -1;
static int32_t tsNodeRefId = -1;
static int32_t tsPeerRefId = -1;
// local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
......@@ -48,6 +49,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp)
static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode);
static void syncFreeNode(void *);
static void syncFreePeer(void *);
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncMonitorNodeRole(void *param, void *tmrId);
......@@ -55,7 +57,10 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t c
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
static SSyncNode *syncAcquireNode(int64_t rid);
static void syncReleaseNode(SSyncNode *pNode);
char* syncRole[] = {
"offline",
......@@ -87,29 +92,34 @@ int32_t syncInit() {
tsTcpPool = syncOpenTcpThreadPool(&info);
if (tsTcpPool == NULL) {
sError("failed to init tcpPool");
syncCleanUp();
return -1;
}
tsSyncTmrCtrl = taosTmrInit(1000, 50, 10000, "SYNC");
if (tsSyncTmrCtrl == NULL) {
sError("failed to init tmrCtrl");
syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
syncCleanUp();
return -1;
}
tsVgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVgIdHash == NULL) {
sError("failed to init vgIdHash");
taosTmrCleanUp(tsSyncTmrCtrl);
syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
tsSyncTmrCtrl = NULL;
syncCleanUp();
return -1;
}
tsNodeRefId = taosOpenRef(200, syncFreeNode);
if (tsNodeRefId < 0) {
sError("failed to init node ref");
syncCleanUp();
return -1;
}
tsSyncRefId = taosOpenRef(200, syncFreeNode);
if (tsSyncRefId < 0) {
tsPeerRefId = taosOpenRef(1000, syncFreePeer);
if (tsPeerRefId < 0) {
sError("failed to init peer ref");
syncCleanUp();
return -1;
}
......@@ -121,12 +131,12 @@ int32_t syncInit() {
}
void syncCleanUp() {
if (tsTcpPool) {
if (tsTcpPool != NULL) {
syncCloseTcpThreadPool(tsTcpPool);
tsTcpPool = NULL;
}
if (tsSyncTmrCtrl) {
if (tsSyncTmrCtrl != NULL) {
taosTmrCleanUp(tsSyncTmrCtrl);
tsSyncTmrCtrl = NULL;
}
......@@ -136,8 +146,15 @@ void syncCleanUp() {
tsVgIdHash = NULL;
}
taosCloseRef(tsSyncRefId);
tsSyncRefId = -1;
if (tsNodeRefId != -1) {
taosCloseRef(tsNodeRefId);
tsNodeRefId = -1;
}
if (tsPeerRefId != -1) {
taosCloseRef(tsPeerRefId);
tsPeerRefId = -1;
}
sInfo("sync module is cleaned up");
}
......@@ -170,7 +187,8 @@ int64_t syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
pNode->rid = taosAddRef(tsSyncRefId, pNode);
pNode->refCount = 1;
pNode->rid = taosAddRef(tsNodeRefId, pNode);
if (pNode->rid < 0) {
syncFreeNode(pNode);
return -1;
......@@ -238,7 +256,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
void syncStop(int64_t rid) {
SSyncPeer *pPeer;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId);
......@@ -259,14 +277,14 @@ void syncStop(int64_t rid) {
pthread_mutex_unlock(&pNode->mutex);
taosReleaseRef(tsSyncRefId, rid);
taosRemoveRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
taosRemoveRef(tsNodeRefId, rid);
}
int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
int32_t i, j;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
......@@ -335,23 +353,22 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
sInfo("vgId:%d, %d replicas are configured, quorum:%d", pNode->vgId, pNode->replica, pNode->quorum);
syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
return 0;
}
int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return 0;
int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
return code;
}
void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
SSyncPeer *pPeer = pNode->pMaster;
......@@ -367,14 +384,14 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
}
}
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
}
#if 0
void syncRecover(int64_t rid) {
SSyncPeer *pPeer;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
// to do: add a few lines to check if recover is OK
......@@ -395,12 +412,12 @@ void syncRecover(int64_t rid) {
pthread_mutex_unlock(&pNode->mutex);
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
}
#endif
int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return -1;
pNodesRole->selfIndex = pNode->selfIndex;
......@@ -409,8 +426,7 @@ int32_t syncGetNodesRole(int64_t rid, SNodesRole *pNodesRole) {
pNodesRole->role[i] = pNode->peerInfo[i]->role;
}
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
return 0;
}
......@@ -446,24 +462,61 @@ static void syncAddArbitrator(SSyncNode *pNode) {
static void syncFreeNode(void *param) {
SSyncNode *pNode = param;
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
sDebug("vgId:%d, snode is freed, refCount:%d", pNode->vgId, refCount);
pthread_mutex_destroy(&pNode->mutex);
tfree(pNode->pRecv);
tfree(pNode->pSyncFwds);
tfree(pNode);
}
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_32(&pPeer->refCount, 1); }
SSyncNode *syncAcquireNode(int64_t rid) {
SSyncNode *pNode = taosAcquireRef(tsNodeRefId, rid);
if (pNode == NULL) {
sDebug("failed to acquire snode from refId:%" PRId64, rid);
} else {
int32_t refCount = atomic_add_fetch_32(&pNode->refCount, 1);
sTrace("vgId:%d, acquire snode refId:%" PRId64 ", refCount:%d", pNode->vgId, rid, refCount);
}
return pNode;
}
void syncReleaseNode(SSyncNode *pNode) {
int32_t refCount = atomic_sub_fetch_32(&pNode->refCount, 1);
sTrace("vgId:%d, dec snode refId:%" PRId64 " refCount:%d", pNode->vgId, pNode->rid, refCount);
taosReleaseRef(tsNodeRefId, pNode->rid);
}
static void syncFreePeer(void *param) {
SSyncPeer *pPeer = param;
int32_t syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_32(&pPeer->refCount, 1) == 0) {
taosReleaseRef(tsSyncRefId, pPeer->pSyncNode->rid);
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
sDebug("%s, peer is freed, refCount:%d", pPeer->id, refCount);
sDebug("%s, resource is freed", pPeer->id);
syncReleaseNode(pPeer->pSyncNode);
tfree(pPeer);
return 0;
}
SSyncPeer *syncAcquirePeer(int64_t rid) {
SSyncPeer *pPeer = taosAcquireRef(tsPeerRefId, rid);
if (pPeer == NULL) {
sDebug("failed to acquire peer from refId:%" PRId64, rid);
} else {
int32_t refCount = atomic_add_fetch_32(&pPeer->refCount, 1);
sTrace("%s, acquire peer refId:%" PRId64 ", refCount:%d", pPeer->id, rid, refCount);
}
return 1;
return pPeer;
}
void syncReleasePeer(SSyncPeer *pPeer) {
int32_t refCount = atomic_sub_fetch_32(&pPeer->refCount, 1);
sTrace("%s, dec peer refId:%" PRId64 ", refCount:%d", pPeer->id, pPeer->rid, refCount);
taosReleaseRef(tsPeerRefId, pPeer->rid);
}
static void syncClosePeerConn(SSyncPeer *pPeer) {
......@@ -482,7 +535,8 @@ static void syncRemovePeer(SSyncPeer *pPeer) {
pPeer->ip = 0;
syncClosePeerConn(pPeer);
syncDecPeerRef(pPeer);
//taosRemoveRef(tsPeerRefId, pPeer->rid);
syncReleasePeer(pPeer);
}
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
......@@ -508,17 +562,18 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer->role = TAOS_SYNC_ROLE_OFFLINE;
pPeer->pSyncNode = pNode;
pPeer->refCount = 1;
pPeer->rid = taosAddRef(tsPeerRefId, pPeer);
sInfo("%s, it is configured, ep:%s:%u", pPeer->id, pPeer->fqdn, pPeer->port);
sInfo("%s, %p it is configured, ep:%s:%u rid:%" PRId64, pPeer->id, pPeer, pPeer->fqdn, pPeer->port, pPeer->rid);
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (pPeer->nodeId == 0 || (ret > 0) || (ret == 0 && pPeer->port > tsSyncPort)) {
int32_t checkMs = 100 + (pNode->vgId * 10) % 100;
if (pNode->vgId > 1) checkMs = tsStatusInterval * 1000 + checkMs;
sDebug("%s, check peer connection after %d ms", pPeer->id, checkMs);
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, tsSyncTmrCtrl, &pPeer->timer);
taosTmrReset(syncCheckPeerConnection, checkMs, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
}
taosAcquireRef(tsSyncRefId, pNode->rid);
(void)syncAcquireNode(pNode->rid);
return pPeer;
}
......@@ -751,7 +806,7 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort)) {
sDebug("%s, check peer connection in 1000 ms", pPeer->id);
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
}
}
......@@ -780,25 +835,30 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
}
// start a new thread to retrieve the data
syncAddPeerRef(pPeer);
(void)syncAcquirePeer(pPeer->rid);
pthread_attr_t thattr;
pthread_t thread;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, pPeer);
int32_t ret = pthread_create(&thread, &thattr, syncRetrieveData, (void *)pPeer->rid);
pthread_attr_destroy(&thattr);
if (ret != 0) {
sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno));
syncDecPeerRef(pPeer);
} else {
pPeer->sstatus = TAOS_SYNC_STATUS_START;
sDebug("%s, thread is created to retrieve data, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
}
syncReleasePeer(pPeer);
}
static void syncNotStarted(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex);
......@@ -807,15 +867,22 @@ static void syncNotStarted(void *param, void *tmrId) {
sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncRestartConnection(pPeer);
pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer);
}
static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex);
syncRecoverFromMaster(pPeer);
pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer);
}
static void syncRecoverFromMaster(SSyncPeer *pPeer) {
......@@ -831,7 +898,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
// Ensure the sync of mnode not interrupted
if (pNode->vgId != 1 && tsSyncNum >= SYNC_MAX_NUM) {
sInfo("%s, %d syncs are in process, try later", pPeer->id, tsSyncNum);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, pPeer, tsSyncTmrCtrl, &pPeer->timer);
taosTmrReset(syncTryRecoverFromMaster, 500 + (pNode->vgId * 10) % 200, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
return;
}
......@@ -840,7 +907,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
SSyncMsg msg;
syncBuildSyncReqMsg(&msg, pNode->vgId);
taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
taosTmrReset(syncNotStarted, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-req to peer", pPeer->id);
......@@ -929,7 +996,10 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
}
static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return -1;
SSyncHead *pHead = buffer;
SSyncNode *pNode = pPeer->pSyncNode;
......@@ -950,6 +1020,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
}
pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer);
return code;
}
......@@ -1003,7 +1074,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0);
if (connFd < 0) {
sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno));
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
return;
}
......@@ -1015,16 +1086,18 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
} else {
sDebug("%s, failed to setup peer connection to server since %s, try later", pPeer->id, strerror(errno));
taosClose(connFd);
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, pPeer, tsSyncTmrCtrl, &pPeer->timer);
taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer);
}
}
static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return;
SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex);
......@@ -1033,6 +1106,8 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
syncSetupPeerConnection(pPeer);
pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer);
}
static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
......@@ -1043,8 +1118,9 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_DETACHED);
syncAddPeerRef(pPeer);
int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, pPeer);
(void)syncAcquirePeer(pPeer->rid);
int32_t ret = pthread_create(&(thread), &thattr, (void *)syncRestoreData, (void *)pPeer->rid);
pthread_attr_destroy(&thattr);
if (ret < 0) {
......@@ -1052,10 +1128,11 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
nodeSStatus = TAOS_SYNC_STATUS_INIT;
sError("%s, failed to create sync thread, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer);
} else {
sInfo("%s, sync connection is up", pPeer->id);
}
syncReleasePeer(pPeer);
}
static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
......@@ -1087,7 +1164,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return;
}
sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId);
sDebug("vgId:%d, sync connection is incomming, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex);
......@@ -1116,7 +1193,6 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
syncClosePeerConn(pPeer);
pPeer->peerFd = connFd;
pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
syncAddPeerRef(pPeer);
sDebug("%s, ready to exchange data", pPeer->id);
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_EXCHANGE_DATA, syncGenTranId());
}
......@@ -1126,22 +1202,21 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
}
static void syncProcessBrokenLink(void *param) {
if (param == NULL) return; // the connection for arbitrator
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return;
SSyncNode *pNode = pPeer->pSyncNode;
if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return;
pthread_mutex_lock(&pNode->mutex);
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d", pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd);
pPeer->peerFd = -1;
if (syncDecPeerRef(pPeer) != 0) {
syncRestartConnection(pPeer);
}
pthread_mutex_unlock(&pNode->mutex);
taosReleaseRef(tsSyncRefId, pNode->rid);
syncReleasePeer(pPeer);
}
static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
......@@ -1212,7 +1287,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
static void syncMonitorNodeRole(void *param, void *tmrId) {
int64_t rid = (int64_t)param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
for (int32_t index = 0; index < pNode->replica; index++) {
......@@ -1229,12 +1304,12 @@ static void syncMonitorNodeRole(void *param, void *tmrId) {
}
pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
}
static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t rid = (int64_t)param;
SSyncNode *pNode = taosAcquireRef(tsSyncRefId, rid);
SSyncNode *pNode = syncAcquireNode(rid);
if (pNode == NULL) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
......@@ -1260,7 +1335,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, SYNC_FWD_TIMER, (void *)pNode->rid, tsSyncTmrCtrl);
}
taosReleaseRef(tsSyncRefId, rid);
syncReleaseNode(pNode);
}
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) {
......
......@@ -351,7 +351,10 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRestoreData(void *param) {
SSyncPeer *pPeer = param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
......@@ -382,7 +385,7 @@ void *syncRestoreData(void *param) {
taosClose(pPeer->syncFd);
syncCloseRecvBuffer(pNode);
__sync_fetch_and_sub(&tsSyncNum, 1);
syncDecPeerRef(pPeer);
syncReleasePeer(pPeer);
return NULL;
}
......@@ -464,7 +464,10 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
}
void *syncRetrieveData(void *param) {
SSyncPeer *pPeer = (SSyncPeer *)param;
int64_t rid = (int64_t)param;
SSyncPeer *pPeer = syncAcquirePeer(rid);
if (pPeer == NULL) return NULL;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE();
......@@ -493,7 +496,7 @@ void *syncRetrieveData(void *param) {
pPeer->fileChanged = 0;
taosClose(pPeer->syncFd);
syncDecPeerRef(pPeer);
syncReleasePeer(pPeer);
return NULL;
}
......@@ -130,7 +130,7 @@ void *syncAllocateTcpConn(void *param, void *pPeer, int32_t connFd) {
pConn->fd = connFd;
pConn->pThread = pThread;
pConn->ahandle = pPeer;
pConn->ahandle = (void *)(((SSyncPeer *)pPeer)->rid);
pConn->closedByApp = 0;
event.events = EPOLLIN | EPOLLRDHUP;
......
......@@ -89,7 +89,10 @@ static void vnodeIncRef(void *ptNode) {
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
SVnodeObj **ppVnode = NULL;
if (tsVnodesHash != NULL) {
ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
}
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册