提交 2f5fe68f 编写于 作者: S Shengliang Guan

TD-2415

上级 498df689
...@@ -62,12 +62,15 @@ typedef struct { ...@@ -62,12 +62,15 @@ typedef struct {
typedef struct { typedef struct {
SSyncHead syncHead; SSyncHead syncHead;
uint16_t port; uint16_t port;
uint16_t tranId;
char fqdn[TSDB_FQDN_LEN]; char fqdn[TSDB_FQDN_LEN];
int32_t sourceId; // only for arbitrator int32_t sourceId; // only for arbitrator
} SFirstPkt; } SFirstPkt;
typedef struct { typedef struct {
int8_t sync; int8_t sync;
int8_t reserved;
uint16_t tranId;
} SFirstPktRsp; } SFirstPktRsp;
typedef struct { typedef struct {
...@@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer); ...@@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer);
uint16_t syncGenTranId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
pFwdRsp->code = code; pFwdRsp->code = code;
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen); if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) {
if (retLen == msgLen) {
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward ack, restart", pPeer->id);
...@@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ; firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
firstPkt.syncHead.vgId = pNode->vgId; firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead); firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
...@@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sError("%s, failed to send sync-req to peer", pPeer->id); sError("%s, failed to send sync-req to peer", pPeer->id);
} else { } else {
nodeSStatus = TAOS_SYNC_STATUS_START; nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-req is sent to peer, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, sync-req is sent to peer, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]);
} }
} }
...@@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type ...@@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version;
} }
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, statusMsgLen); if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) {
if (retLen == statusMsgLen) {
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId,
...@@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt.syncHead.type = TAOS_SMSG_STATUS; firstPkt.syncHead.type = TAOS_SMSG_STATUS;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
firstPkt.tranId = syncGenTranId();
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d", pPeer->id, connFd, pPeer->syncFd); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd); pPeer->pConn = taosAllocateTcpConn(tsTcpPool, pPeer, connFd);
...@@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return; return;
} }
sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId);
SSyncNode *pNode = *ppNode; SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
......
...@@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
sinfo.index = 0; sinfo.index = 0;
while (1) { while (1) {
// read file info // read file info
int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(minfo)); int32_t ret = taosReadMsg(pPeer->syncFd, &(minfo), sizeof(SFileInfo));
if (ret < 0) { if (ret != sizeof(SFileInfo)) {
sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno)); sError("%s, failed to read file info while restore file since %s", pPeer->id, strerror(errno));
break; break;
} }
...@@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// send file ack // send file ack
ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); ret = taosWriteMsg(pPeer->syncFd, &fileAck, sizeof(fileAck));
if (ret < 0) { if (ret != sizeof(fileAck)) {
sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno)); sError("%s, failed to write file:%s ack while restore file since %s", pPeer->id, minfo.name, strerror(errno));
break; break;
} }
...@@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
while (1) { while (1) {
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
if (ret < 0) { if (ret != sizeof(SWalHead)) {
sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno)); sError("%s, failed to read walhead while restore wal since %s", pPeer->id, strerror(errno));
break; break;
} }
...@@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
} // wal sync over } // wal sync over
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
if (ret < 0) { if (ret != pHead->len) {
sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno)); sError("%s, failed to read walcont, len:%d while restore wal since %s", pPeer->id, pHead->len, strerror(errno));
break; break;
} }
...@@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
uint64_t fversion = 0; uint64_t fversion = 0;
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]); sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SFirstPktRsp firstPktRsp = {.sync = 1}; SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()};
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno)); sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
return -1; return -1;
} }
sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId);
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);
......
...@@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) { ...@@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t fver, wver; uint64_t fver, wver;
int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver); int32_t code = (*pNode->getVersion)(pNode->vgId, &fver, &wver);
if (code != 0) { if (code != 0) {
sDebug("%s, vnode is commiting while retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer); sDebug("%s, vnode is commiting while get fver for retrieve, last fver:%" PRIu64, pPeer->id, pPeer->lastFileVer);
return -1; return -1;
} }
...@@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
int32_t code = -1; int32_t code = -1;
char name[TSDB_FILENAME_LEN * 2] = {0}; char name[TSDB_FILENAME_LEN * 2] = {0};
if (syncGetFileVersion(pNode, pPeer) < 0) return -1; if (syncGetFileVersion(pNode, pPeer) < 0) {
pPeer->fileChanged = 1;
return -1;
}
while (1) { while (1) {
// retrieve file info // retrieve file info
...@@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo.size = 0; fileInfo.size = 0;
fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX, fileInfo.magic = (*pNode->getFileInfo)(pNode->vgId, fileInfo.name, &fileInfo.index, TAOS_SYNC_MAX_INDEX,
&fileInfo.size, &fileInfo.fversion); &fileInfo.size, &fileInfo.fversion);
// fileInfo.size = htonl(size);
sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size); sDebug("%s, file:%s info is sent, size:%" PRId64, pPeer->id, fileInfo.name, fileInfo.size);
// send the file info // send the file info
int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo)); int32_t ret = taosWriteMsg(pPeer->syncFd, &(fileInfo), sizeof(fileInfo));
if (ret < 0) { if (ret != sizeof(fileInfo)) {
code = -1; code = -1;
sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to write file:%s info while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
...@@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
} }
// wait for the ack from peer // wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(fileAck)); ret = taosReadMsg(pPeer->syncFd, &fileAck, sizeof(SFileAck));
if (ret < 0) { if (ret != sizeof(SFileAck)) {
code = -1; code = -1;
sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno)); sError("%s, failed to read file:%s ack while retrieve file since %s", pPeer->id, fileInfo.name, strerror(errno));
break; break;
...@@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
} }
if (code == 0) { if (code == 0) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SWalHead walHead; SWalHead walHead;
memset(&walHead, 0, sizeof(walHead)); memset(&walHead, 0, sizeof(walHead));
taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); if (taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)) == sizeof(walHead)) {
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, wal retrieve is finished, set sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
} else {
sError("%s, failed to send last wal record since %s", pPeer->id, strerror(errno));
code = -1;
}
} else { } else {
sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code); sError("%s, failed to send wal since %s, code:0x%x", pPeer->id, strerror(errno), code);
} }
...@@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { ...@@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
memset(&firstPkt, 0, sizeof(firstPkt)); memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA; firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
firstPkt.syncHead.vgId = pNode->vgId; firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) { if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno)); sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
return -1; return -1;
} }
sDebug("%s, send firstPkt to peer, tranId:%u", pPeer->id, firstPkt.tranId);
SFirstPktRsp firstPktRsp; SFirstPktRsp firstPktRsp;
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) { if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno)); sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
return -1; return -1;
} }
sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册