未验证 提交 b12cf2fe 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4330 from taosdata/feature/wal

[TD-2211]<fix>: Invalid sync version while sync data
...@@ -28,19 +28,21 @@ extern "C" { ...@@ -28,19 +28,21 @@ extern "C" {
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
typedef enum { typedef enum {
TAOS_SMSG_SYNC_DATA = 1, TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_FORWARD = 2, TAOS_SMSG_FORWARD = 2,
TAOS_SMSG_FORWARD_RSP = 3, TAOS_SMSG_FORWARD_RSP = 3,
TAOS_SMSG_SYNC_REQ = 4, TAOS_SMSG_SYNC_REQ = 4,
TAOS_SMSG_SYNC_RSP = 5, TAOS_SMSG_SYNC_RSP = 5,
TAOS_SMSG_SYNC_MUST = 6, TAOS_SMSG_SYNC_MUST = 6,
TAOS_SMSG_STATUS = 7 TAOS_SMSG_STATUS = 7,
TAOS_SMSG_SYNC_DATA_RSP = 8,
} ESyncMsgType; } ESyncMsgType;
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000 #define SYNC_ROLE_TIMER 10000
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
...@@ -64,6 +66,10 @@ typedef struct { ...@@ -64,6 +66,10 @@ typedef struct {
int32_t sourceId; // only for arbitrator int32_t sourceId; // only for arbitrator
} SFirstPkt; } SFirstPkt;
typedef struct {
int8_t sync;
} SFirstPktRsp;
typedef struct { typedef struct {
int8_t role; int8_t role;
uint64_t version; uint64_t version;
......
...@@ -626,17 +626,9 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -626,17 +626,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
sInfo("vgId:%d, start to work as master", pNode->vgId); sInfo("vgId:%d, start to work as master", pNode->vgId);
nodeRole = TAOS_SYNC_ROLE_MASTER; nodeRole = TAOS_SYNC_ROLE_MASTER;
#if 0 // Wait for other nodes to receive status to avoid version inconsistency
for (int32_t i = 0; i < pNode->replica; ++i) { taosMsleep(SYNC_WAIT_AFTER_CHOOSE_MASTER);
if (i == index) continue;
pPeer = pNode->peerInfo[i];
if (pPeer->version == nodeVersion) {
pPeer->role = TAOS_SYNC_ROLE_SLAVE;
pPeer->sstatus = TAOS_SYNC_STATUS_CACHE;
sInfo("%s, it shall work as slave", pPeer->id);
}
}
#endif
syncResetFlowCtrl(pNode); syncResetFlowCtrl(pNode);
(*pNode->notifyRole)(pNode->vgId, nodeRole); (*pNode->notifyRole)(pNode->vgId, nodeRole);
} else { } else {
...@@ -761,7 +753,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new ...@@ -761,7 +753,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
sDebug("vgId:%d, choose master", pNode->vgId); sDebug("vgId:%d, choose master", pNode->vgId);
syncChooseMaster(pNode); syncChooseMaster(pNode);
} else { } else {
sDebug("vgId:%d, cannot choose master since roles inconsistent", pNode->vgId); sDebug("vgId:%d, cannot choose master since roles inequality", pNode->vgId);
} }
} }
...@@ -1124,7 +1116,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { ...@@ -1124,7 +1116,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
} }
int32_t vgId = firstPkt.syncHead.vgId; int32_t vgId = firstPkt.syncHead.vgId;
SSyncNode **ppNode = (SSyncNode **)taosHashGet(tsVgIdHash, (const char *)&vgId, sizeof(int32_t)); SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) { if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId); sError("vgId:%d, vgId could not be found", vgId);
taosCloseSocket(connFd); taosCloseSocket(connFd);
...@@ -1321,6 +1313,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1321,6 +1313,8 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
} }
// always update version // always update version
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
...@@ -1328,10 +1322,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1328,10 +1322,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// only pkt from RPC or CQ can be forwarded // only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, // a hacker way to improve the performance
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD; pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0; pSyncHead->pversion = 0;
...@@ -1352,9 +1343,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1352,9 +1343,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) { if (retLen == fwdLen) {
sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); sTrace("%s, forward is sent, role:%s sstatus:%s hver:%" PRIu64 " contLen:%d", pPeer->id, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pWalHead->version, pWalHead->len);
} else { } else {
sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); sError("%s, failed to forward, role:%s sstatus:%s hver:%" PRIu64 " retLen:%d", pPeer->id, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pWalHead->version, retLen);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
} }
......
...@@ -147,12 +147,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { ...@@ -147,12 +147,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
static int32_t syncRestoreWal(SSyncPeer *pPeer) { static int32_t syncRestoreWal(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int32_t ret, code = -1; int32_t ret, code = -1;
uint64_t lastVer = 0;
void *buffer = calloc(SYNC_MAX_SIZE, 1); // size for one record SWalHead *pHead = calloc(SYNC_MAX_SIZE, 1); // size for one record
if (buffer == NULL) return -1; if (pHead == NULL) return -1;
SWalHead *pHead = (SWalHead *)buffer;
uint64_t lastVer = 0;
while (1) { while (1) {
ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead)); ret = taosReadMsg(pPeer->syncFd, pHead, sizeof(SWalHead));
...@@ -188,7 +186,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ...@@ -188,7 +186,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno)); sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno));
} }
free(buffer); free(pHead);
return code; return code;
} }
...@@ -233,10 +231,13 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) { ...@@ -233,10 +231,13 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode * pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
if (pRecv == NULL) return -1;
int32_t len = pHead->len + sizeof(SWalHead); int32_t len = pHead->len + sizeof(SWalHead);
if (pRecv == NULL) {
sError("%s, recv buffer is not create yet", pPeer->id);
return -1;
}
if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) { if (pRecv->bufferSize - (pRecv->offset - pRecv->buffer) >= len) {
memcpy(pRecv->offset, pHead, len); memcpy(pRecv->offset, pHead, len);
pRecv->offset += len; pRecv->offset += len;
...@@ -284,7 +285,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -284,7 +285,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
nodeSStatus = TAOS_SYNC_STATUS_FILE; nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0; uint64_t fversion = 0;
sDebug("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SFirstPktRsp firstPktRsp = {.sync = 1};
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
return -1;
}
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion); int32_t code = syncRestoreFile(pPeer, &fversion);
if (code < 0) { if (code < 0) {
sError("%s, failed to restore file", pPeer->id); sError("%s, failed to restore file", pPeer->id);
...@@ -301,14 +309,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { ...@@ -301,14 +309,14 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
nodeVersion = fversion; nodeVersion = fversion;
sDebug("%s, start to restore wal", pPeer->id); sInfo("%s, start to restore wal", pPeer->id);
if (syncRestoreWal(pPeer) < 0) { if (syncRestoreWal(pPeer) < 0) {
sError("%s, failed to restore wal", pPeer->id); sError("%s, failed to restore wal", pPeer->id);
return -1; return -1;
} }
nodeSStatus = TAOS_SYNC_STATUS_CACHE; nodeSStatus = TAOS_SYNC_STATUS_CACHE;
sDebug("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]); sInfo("%s, start to insert buffered points, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
if (syncProcessBufferedFwd(pPeer) < 0) { if (syncProcessBufferedFwd(pPeer) < 0) {
sError("%s, failed to insert buffered points", pPeer->id); sError("%s, failed to insert buffered points", pPeer->id);
return -1; return -1;
......
...@@ -448,7 +448,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { ...@@ -448,7 +448,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
return code; return code;
} }
static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SFirstPkt firstPkt;
...@@ -458,8 +458,24 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -458,8 +458,24 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
if (taosWriteMsg(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) { if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) < 0) {
sError("%s, failed to send syncCmd", pPeer->id); sError("%s, failed to send sync firstPkt since %s", pPeer->id, strerror(errno));
return -1;
}
SFirstPktRsp firstPktRsp;
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) < 0) {
sError("%s, failed to read sync firstPkt rsp since %s", pPeer->id, strerror(errno));
return -1;
}
return 0;
}
static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
sInfo("%s, start to retrieve, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
if (syncRetrieveFirstPkt(pPeer) < 0) {
sError("%s, failed to start retrieve", pPeer->id);
return -1; return -1;
} }
......
...@@ -203,16 +203,19 @@ static void *taosProcessTcpData(void *param) { ...@@ -203,16 +203,19 @@ static void *taosProcessTcpData(void *param) {
assert(pConn); assert(pConn);
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
sDebug("conn is broken since EPOLLERR");
taosProcessBrokenLink(pConn); taosProcessBrokenLink(pConn);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
sDebug("conn is broken since EPOLLHUP");
taosProcessBrokenLink(pConn); taosProcessBrokenLink(pConn);
continue; continue;
} }
if (events[i].events & EPOLLRDHUP) { if (events[i].events & EPOLLRDHUP) {
sDebug("conn is broken since EPOLLRDHUP");
taosProcessBrokenLink(pConn); taosProcessBrokenLink(pConn);
continue; continue;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册