提交 e8d9017d 编写于 作者: S Shengliang Guan

TD-2428

上级 966a5e18
......@@ -43,6 +43,7 @@ typedef enum {
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
#define SYNC_PROTOCOL_VERSION 0
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
......@@ -51,27 +52,27 @@ typedef enum {
#pragma pack(push, 1)
typedef struct {
char type; // msg type
char pversion; // protocol version
char reserved[6]; // not used
int8_t type; // msg type
int8_t protocol; // protocol version
int8_t reserved[6]; // not used
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
// char cont[]; // message content starts from here
} SSyncHead;
typedef struct {
SSyncHead syncHead;
SSyncHead head;
uint16_t port;
uint16_t tranId;
char fqdn[TSDB_FQDN_LEN];
int32_t sourceId; // only for arbitrator
} SFirstPkt;
} SSyncMsg;
typedef struct {
int8_t sync;
int8_t reserved;
uint16_t tranId;
} SFirstPktRsp;
SSyncHead head;
int8_t sync;
int8_t reserved;
uint16_t tranId;
} SSyncRsp;
typedef struct {
int8_t role;
......@@ -101,6 +102,7 @@ typedef struct {
} SFileAck;
typedef struct {
SSyncHead head;
uint64_t version;
int32_t code;
} SFwdRsp;
......
......@@ -113,9 +113,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
SSyncMsg msg;
if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd);
return;
}
......@@ -127,9 +127,9 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return;
}
firstPkt.fqdn[sizeof(firstPkt.fqdn) - 1] = 0;
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", firstPkt.sourceId, firstPkt.fqdn, firstPkt.port);
if (firstPkt.syncHead.vgId) {
msg.fqdn[TSDB_FQDN_LEN - 1] = 0;
snprintf(pNode->id, sizeof(pNode->id), "vgId:%d, peer:%s:%d", msg.sourceId, msg.fqdn, msg.port);
if (msg.head.vgId) {
sDebug("%s, vgId in head is not zero, close the connection", pNode->id);
tfree(pNode);
taosCloseSocket(connFd);
......@@ -156,8 +156,8 @@ static int32_t arbProcessPeerMsg(void *param, void *buffer) {
int32_t bytes = 0;
char * cont = (char *)buffer;
int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(head));
if (hlen != sizeof(head)) {
int32_t hlen = taosReadMsg(pNode->nodeFd, &head, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pNode->id, hlen);
return -1;
}
......
......@@ -384,18 +384,16 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
SFwdRsp fwdRsp = {0};
SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp);
fwdRsp.head.type = TAOS_SMSG_FORWARD_RSP;
fwdRsp.head.protocol = SYNC_PROTOCOL_VERSION;
fwdRsp.head.vgId = pNode->vgId;
fwdRsp.head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
fwdRsp.version = version;
fwdRsp.code = code;
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
pFwdRsp->version = version;
pFwdRsp->code = code;
int32_t msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
if (taosWriteMsg(pPeer->peerFd, msg, msgLen) == msgLen) {
if (taosWriteMsg(pPeer->peerFd, &fwdRsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
} else {
sDebug("%s, failed to send forward ack, restart", pPeer->id);
......@@ -865,20 +863,22 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sDebug("%s, try to sync", pPeer->id);
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.type = TAOS_SMSG_SYNC_REQ;
firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.syncHead.len = sizeof(firstPkt) - sizeof(SSyncHead);
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg));
msg.head.type = TAOS_SMSG_SYNC_REQ;
msg.head.protocol = SYNC_PROTOCOL_VERSION;
msg.head.vgId = pNode->vgId;
msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
msg.port = tsSyncPort;
msg.tranId = syncGenTranId();
tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
if (taosWriteMsg(pPeer->peerFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-req to peer", pPeer->id);
} else {
sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, firstPkt.tranId, syncStatus[nodeSStatus]);
sInfo("%s, sync-req is sent to peer, tranId:%u, sstatus:%s", pPeer->id, msg.tranId, syncStatus[nodeSStatus]);
}
}
......@@ -958,7 +958,7 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
// head.len = htonl(head.len);
if (pHead->len < 0) {
sError("%s, invalid pkt length, hlen:%d", pPeer->id, pHead->len);
sError("%s, invalid msg length, hlen:%d", pPeer->id, pHead->len);
return -1;
}
......@@ -1052,17 +1052,19 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
return;
}
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.vgId = pPeer->nodeId ? pNode->vgId : 0;
firstPkt.syncHead.type = TAOS_SMSG_STATUS;
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
firstPkt.tranId = syncGenTranId();
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg));
msg.head.type = TAOS_SMSG_STATUS;
msg.head.protocol = SYNC_PROTOCOL_VERSION;
msg.head.vgId = pPeer->nodeId ? pNode->vgId : 0;
msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
msg.port = tsSyncPort;
msg.tranId = syncGenTranId();
msg.sourceId = pNode->vgId; // tell arbitrator its vgId
tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
if (taosWriteMsg(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, firstPkt.tranId);
if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
pPeer->pConn = syncAllocateTcpConn(tsTcpPool, pPeer, connFd);
......@@ -1116,14 +1118,14 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
tinet_ntoa(ipstr, sourceIp);
sDebug("peer TCP connection from ip:%s", ipstr);
SFirstPkt firstPkt;
if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno));
SSyncMsg msg;
if (taosReadMsg(connFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("failed to read peer sync msg from ip:%s since %s", ipstr, strerror(errno));
taosCloseSocket(connFd);
return;
}
int32_t vgId = firstPkt.syncHead.vgId;
int32_t vgId = msg.head.vgId;
SSyncNode **ppNode = taosHashGet(tsVgIdHash, &vgId, sizeof(int32_t));
if (ppNode == NULL || *ppNode == NULL) {
sError("vgId:%d, vgId could not be found", vgId);
......@@ -1131,7 +1133,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return;
}
sDebug("vgId:%d, firstPkt is received, tranId:%u", vgId, firstPkt.tranId);
sDebug("vgId:%d, sync msg is received, tranId:%u", vgId, msg.tranId);
SSyncNode *pNode = *ppNode;
pthread_mutex_lock(&pNode->mutex);
......@@ -1139,20 +1141,20 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
SSyncPeer *pPeer;
for (i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer && (strcmp(pPeer->fqdn, firstPkt.fqdn) == 0) && (pPeer->port == firstPkt.port)) break;
if (pPeer && (strcmp(pPeer->fqdn, msg.fqdn) == 0) && (pPeer->port == msg.port)) break;
}
pPeer = (i < pNode->replica) ? pNode->peerInfo[i] : NULL;
if (pPeer == NULL) {
sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, firstPkt.fqdn, firstPkt.port);
sError("vgId:%d, peer:%s:%u not configured", pNode->vgId, msg.fqdn, msg.port);
taosCloseSocket(connFd);
// syncSendVpeerCfgMsg(sync);
} else {
// first packet tells what kind of link
if (firstPkt.syncHead.type == TAOS_SMSG_SYNC_DATA) {
if (msg.head.type == TAOS_SMSG_SYNC_DATA) {
pPeer->syncFd = connFd;
nodeSStatus = TAOS_SYNC_STATUS_START;
sInfo("%s, sync-data pkt from master is received, tranId:%u, set sstatus:%s", pPeer->id, firstPkt.tranId,
sInfo("%s, sync-data msg from master is received, tranId:%u, set sstatus:%s", pPeer->id, msg.tranId,
syncStatus[nodeSStatus]);
syncCreateRestoreDataThread(pPeer);
} else {
......@@ -1334,13 +1336,14 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
// only msg from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
pSyncHead->protocol = SYNC_PROTOCOL_VERSION;
pSyncHead->vgId = pNode->vgId;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
......
......@@ -289,12 +289,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
uint64_t fversion = 0;
sInfo("%s, start to restore, sstatus:%s", pPeer->id, syncStatus[pPeer->sstatus]);
SFirstPktRsp firstPktRsp = {.sync = 1, .tranId = syncGenTranId()};
if (taosWriteMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
sError("%s, failed to send sync firstPkt rsp since %s", pPeer->id, strerror(errno));
SSyncRsp rsp = {.sync = 1, .tranId = syncGenTranId()};
if (taosWriteMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
sError("%s, failed to send sync rsp since %s", pPeer->id, strerror(errno));
return -1;
}
sDebug("%s, send firstPktRsp to peer, tranId:%u", pPeer->id, firstPktRsp.tranId);
sDebug("%s, send sync rsp to peer, tranId:%u", pPeer->id, rsp.tranId);
sInfo("%s, start to restore file, set sstatus:%s", pPeer->id, syncStatus[nodeSStatus]);
int32_t code = syncRestoreFile(pPeer, &fversion);
......
......@@ -405,27 +405,29 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt));
firstPkt.syncHead.type = TAOS_SMSG_SYNC_DATA;
firstPkt.syncHead.vgId = pNode->vgId;
firstPkt.tranId = syncGenTranId();
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort;
if (taosWriteMsg(pPeer->syncFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) {
sError("%s, failed to send sync firstPkt since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg));
msg.head.type = TAOS_SMSG_SYNC_DATA;
msg.head.protocol = SYNC_PROTOCOL_VERSION;
msg.head.vgId = pNode->vgId;
msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
msg.port = tsSyncPort;
msg.tranId = syncGenTranId();
tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1;
}
sDebug("%s, send sync-data pkt to peer, tranId:%u", pPeer->id, firstPkt.tranId);
sDebug("%s, send sync-data msg to peer, tranId:%u", pPeer->id, msg.tranId);
SFirstPktRsp firstPktRsp;
if (taosReadMsg(pPeer->syncFd, &firstPktRsp, sizeof(SFirstPktRsp)) != sizeof(SFirstPktRsp)) {
sError("%s, failed to read sync firstPkt rsp since %s, tranId:%u", pPeer->id, strerror(errno), firstPkt.tranId);
SSyncRsp rsp;
if (taosReadMsg(pPeer->syncFd, &rsp, sizeof(SSyncRsp)) != sizeof(SSyncRsp)) {
sError("%s, failed to read sync-data rsp since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
return -1;
}
sDebug("%s, recv firstPktRsp from peer, tranId:%u", pPeer->id, firstPkt.tranId);
sDebug("%s, recv sync-data rsp from peer, tranId:%u rsp-tranId:%u", pPeer->id, msg.tranId, rsp.tranId);
return 0;
}
......
......@@ -100,7 +100,7 @@ int processRpcMsg(void *item) {
pHead->msgType = pMsg->msgType;
pHead->len = pMsg->contLen;
uDebug("ver:%" PRIu64 ", pkt from client processed", pHead->version);
uDebug("ver:%" PRIu64 ", rsp from client processed", pHead->version);
writeIntoWal(pHead);
syncForwardToPeer(syncHandle, pHead, item, TAOS_QTYPE_RPC);
......@@ -275,7 +275,7 @@ int getWalInfo(int32_t vgId, char *name, int64_t *index) {
int writeToCache(int32_t vgId, void *data, int type) {
SWalHead *pHead = data;
uDebug("pkt from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
uDebug("rsp from peer is received, ver:%" PRIu64 " len:%d type:%d", pHead->version, pHead->len, type);
int msgSize = pHead->len + sizeof(SWalHead);
void *pMsg = taosAllocateQitem(msgSize);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册