未验证 提交 3ef0c27a 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #15521 from shiyi23/hsy_dev_newest

fix: Cover Scan
...@@ -41,7 +41,7 @@ typedef enum { ...@@ -41,7 +41,7 @@ typedef enum {
typedef struct { typedef struct {
uint32_t nodeId; // node ID assigned by TDengine uint32_t nodeId; // node ID assigned by TDengine
uint16_t nodePort; // node sync Port uint16_t nodePort; // node sync Port
char nodeFqdn[TSDB_FQDN_LEN]; // node FQDN char nodeFqdn[TSDB_FQDN_LEN * 2]; // node FQDN
} SNodeInfo; } SNodeInfo;
typedef struct { typedef struct {
......
...@@ -418,8 +418,12 @@ void syncRecover(int64_t rid) { ...@@ -418,8 +418,12 @@ void syncRecover(int64_t rid) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRoleFp)(pNode->vgId, nodeRole); (*pNode->notifyRoleFp)(pNode->vgId, nodeRole);
pthread_mutex_lock(&pNode->mutex); // pthread_mutex_lock(&pNode->mutex);
int err = pthread_mutex_trylock(&pNode->mutex);
if(EBUSY == err)
{
sDebug("pthread_mutex_trylock failed!");
}
nodeVersion = 0; nodeVersion = 0;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
...@@ -431,7 +435,7 @@ void syncRecover(int64_t rid) { ...@@ -431,7 +435,7 @@ void syncRecover(int64_t rid) {
} }
} }
pthread_mutex_unlock(&pNode->mutex); if(0 == err) pthread_mutex_unlock(&pNode->mutex);
syncReleaseNode(pNode); syncReleaseNode(pNode);
} }
...@@ -987,12 +991,17 @@ static void syncNotStarted(void *param, void *tmrId) { ...@@ -987,12 +991,17 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); // pthread_mutex_lock(&pNode->mutex);
int err = pthread_mutex_trylock(&pNode->mutex);
if(EBUSY == err)
{
sDebug("pthread_mutex_trylock failed!");
}
pPeer->timer = NULL; pPeer->timer = NULL;
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, sync conn is still not up, restart and set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
pthread_mutex_unlock(&pNode->mutex); if(0 == err) pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
} }
...@@ -1136,7 +1145,12 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { ...@@ -1136,7 +1145,12 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
SSyncHead *pHead = buffer; SSyncHead *pHead = buffer;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); // pthread_mutex_lock(&pNode->mutex);
int err = pthread_mutex_trylock(&pNode->mutex);
if(EBUSY == err)
{
sDebug("pthread_mutex_trylock failed!");
}
int32_t code = syncReadPeerMsg(pPeer, pHead); int32_t code = syncReadPeerMsg(pPeer, pHead);
...@@ -1152,7 +1166,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) { ...@@ -1152,7 +1166,7 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
} }
} }
pthread_mutex_unlock(&pNode->mutex); if(0 == err) pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
return code; return code;
...@@ -1245,12 +1259,17 @@ static void syncCheckPeerConnection(void *param, void *tmrId) { ...@@ -1245,12 +1259,17 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); // pthread_mutex_lock(&pNode->mutex);
int err = pthread_mutex_trylock(&pNode->mutex);
if(EBUSY == err)
{
sDebug("pthread_mutex_trylock failed!");
}
sDebug("%s, check peer connection", pPeer->id); sDebug("%s, check peer connection", pPeer->id);
syncSetupPeerConnection(pPeer); syncSetupPeerConnection(pPeer);
pthread_mutex_unlock(&pNode->mutex); if(0 == err) pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
} }
...@@ -1331,7 +1350,12 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -1331,7 +1350,12 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId); sDebug("vgId:%d, sync connection is incoming, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode; SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex); // pthread_mutex_lock(&pNode->mutex);
int err = pthread_mutex_trylock(&pNode->mutex);
if(EBUSY == err)
{
sDebug("pthread_mutex_trylock failed!");
}
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) { for (i = 0; i < pNode->replica; ++i) {
...@@ -1362,7 +1386,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { ...@@ -1362,7 +1386,7 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
} }
} }
pthread_mutex_unlock(&pNode->mutex); if(0 == err) pthread_mutex_unlock(&pNode->mutex);
} }
static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
...@@ -1371,7 +1395,12 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { ...@@ -1371,7 +1395,12 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); // pthread_mutex_lock(&pNode->mutex);
int err = pthread_mutex_trylock(&pNode->mutex);
if(EBUSY == err)
{
sDebug("pthread_mutex_trylock failed!");
}
sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d", sDebug("%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d",
pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp); pPeer->id, strerror(errno), pPeer->peerFd, pPeer->syncFd, closedByApp);
...@@ -1381,7 +1410,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) { ...@@ -1381,7 +1410,7 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
} }
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
pthread_mutex_unlock(&pNode->mutex); if (0 == err) pthread_mutex_unlock(&pNode->mutex);
syncReleasePeer(pPeer); syncReleasePeer(pPeer);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册