提交 6d2d52bc 编写于 作者: S Shengliang Guan

TD-2428

上级 e8d9017d
...@@ -267,6 +267,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul ...@@ -267,6 +267,11 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_VERSION, 0, 0x0902, "Invalid Sync version")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_CONFIRM_EXPIRED, 0, 0x0903, "Sync confirm expired")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TOO_MANY_FWDINFO, 0, 0x0904, "Too many sync fwd infos")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_PROTOCOL, 0, 0x0905, "Mismatched protocol")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_CLUSTERID, 0, 0x0906, "Mismatched clusterId")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, 0, 0x0907, "Mismatched signature")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, 0, 0x0908, "Invalid msg checksum")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, 0, 0x0909, "Invalid msg length")
// wal // wal
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal")
......
...@@ -48,7 +48,7 @@ typedef struct { ...@@ -48,7 +48,7 @@ typedef struct {
void * pVnode; void * pVnode;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
SRspRet rspRet; SRspRet rspRet;
char reserveForSync[16]; char reserveForSync[24];
SWalHead pHead[]; SWalHead pHead[];
} SVWriteMsg; } SVWriteMsg;
......
...@@ -59,7 +59,7 @@ typedef struct SSdbRow { ...@@ -59,7 +59,7 @@ typedef struct SSdbRow {
SMnodeMsg *pMsg; SMnodeMsg *pMsg;
int32_t (*fpReq)(SMnodeMsg *pMsg); int32_t (*fpReq)(SMnodeMsg *pMsg);
int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code); int32_t (*fpRsp)(SMnodeMsg *pMsg, int32_t code);
char reserveForSync[16]; char reserveForSync[24];
SWalHead pHead[]; SWalHead pHead[];
} SSdbRow; } SSdbRow;
......
...@@ -13,12 +13,14 @@ ...@@ -13,12 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef TDENGINE_SYNCINT_H #ifndef TDENGINE_SYNC_INT_H
#define TDENGINE_SYNCINT_H #define TDENGINE_SYNC_INT_H
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "syncMsg.h"
#include "twal.h"
#define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }} #define sFatal(...) { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", sDebugFlag, __VA_ARGS__); }}
#define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }} #define sError(...) { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", sDebugFlag, __VA_ARGS__); }}
...@@ -27,88 +29,16 @@ extern "C" { ...@@ -27,88 +29,16 @@ extern "C" {
#define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sDebug(...) { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
#define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }} #define sTrace(...) { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); }}
typedef enum {
TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_FORWARD = 2,
TAOS_SMSG_FORWARD_RSP = 3,
TAOS_SMSG_SYNC_REQ = 4,
TAOS_SMSG_SYNC_RSP = 5,
TAOS_SMSG_SYNC_MUST = 6,
TAOS_SMSG_STATUS = 7,
TAOS_SMSG_SYNC_DATA_RSP = 8,
} ESyncMsgType;
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16) #define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024) #define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_FWD_TIMER 300 #define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 10000 #define SYNC_ROLE_TIMER 10000
#define SYNC_WAIT_AFTER_CHOOSE_MASTER 3 #define SYNC_WAIT_AFTER_CHOOSE_MASTER 3
#define SYNC_PROTOCOL_VERSION 0
#define nodeRole pNode->peerInfo[pNode->selfIndex]->role #define nodeRole pNode->peerInfo[pNode->selfIndex]->role
#define nodeVersion pNode->peerInfo[pNode->selfIndex]->version #define nodeVersion pNode->peerInfo[pNode->selfIndex]->version
#define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus #define nodeSStatus pNode->peerInfo[pNode->selfIndex]->sstatus
#pragma pack(push, 1)
typedef struct {
int8_t type; // msg type
int8_t protocol; // protocol version
int8_t reserved[6]; // not used
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
} SSyncHead;
typedef struct {
SSyncHead head;
uint16_t port;
uint16_t tranId;
char fqdn[TSDB_FQDN_LEN];
int32_t sourceId; // only for arbitrator
} SSyncMsg;
typedef struct {
SSyncHead head;
int8_t sync;
int8_t reserved;
uint16_t tranId;
} SSyncRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[];
} SPeersStatus;
typedef struct {
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
typedef struct {
int8_t sync;
} SFileAck;
typedef struct {
SSyncHead head;
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
typedef struct { typedef struct {
char * buffer; char * buffer;
int32_t bufferSize; int32_t bufferSize;
...@@ -192,7 +122,6 @@ void syncRestartConnection(SSyncPeer *pPeer); ...@@ -192,7 +122,6 @@ void syncRestartConnection(SSyncPeer *pPeer);
void syncBroadcastStatus(SSyncNode *pNode); void syncBroadcastStatus(SSyncNode *pNode);
void syncAddPeerRef(SSyncPeer *pPeer); void syncAddPeerRef(SSyncPeer *pPeer);
int32_t syncDecPeerRef(SSyncPeer *pPeer); int32_t syncDecPeerRef(SSyncPeer *pPeer);
uint16_t syncGenTranId();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_SYNC_MSG_H
#define TDENGINE_SYNC_MSG_H
#ifdef __cplusplus
extern "C" {
#endif
#include "tsync.h"
typedef enum {
TAOS_SMSG_START = 0,
TAOS_SMSG_SYNC_DATA = 1,
TAOS_SMSG_SYNC_DATA_RSP = 2,
TAOS_SMSG_SYNC_FWD = 3,
TAOS_SMSG_SYNC_FWD_RSP = 4,
TAOS_SMSG_SYNC_REQ = 5,
TAOS_SMSG_SYNC_REQ_RSP = 6,
TAOS_SMSG_SYNC_MUST = 7,
TAOS_SMSG_SYNC_MUST_RSP = 8,
TAOS_SMSG_STATUS = 9,
TAOS_SMSG_STATUS_RSP = 10,
TAOS_SMSG_SETUP = 11,
TAOS_SMSG_SETUP_RSP = 12,
TAOS_SMSG_END = 13,
} ESyncMsgType;
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
#pragma pack(push, 1)
typedef struct {
int8_t type; // msg type
int8_t protocol; // protocol version
uint16_t signature; // fixed value
int32_t code; //
int32_t cId; // cluster Id
int32_t vgId; // vg ID
int32_t len; // content length, does not include head
uint32_t cksum;
} SSyncHead;
typedef struct {
SSyncHead head;
uint16_t port;
uint16_t tranId;
int32_t sourceId; // only for arbitrator
char fqdn[TSDB_FQDN_LEN];
} SSyncMsg;
typedef struct {
SSyncHead head;
int8_t sync;
int8_t reserved;
uint16_t tranId;
int8_t reserverd[4];
} SSyncRsp;
typedef struct {
int8_t role;
uint64_t version;
} SPeerStatus;
typedef struct {
SSyncHead head;
int8_t role;
int8_t ack;
int8_t type;
int8_t reserved[3];
uint16_t tranId;
uint64_t version;
SPeerStatus peersStatus[TAOS_SYNC_MAX_REPLICA];
} SPeersStatus;
typedef struct {
SSyncHead head;
char name[TSDB_FILENAME_LEN];
uint32_t magic;
uint32_t index;
uint64_t fversion;
int64_t size;
} SFileInfo;
typedef struct {
SSyncHead head;
int8_t sync;
} SFileAck;
typedef struct {
SSyncHead head;
uint64_t version;
int32_t code;
} SFwdRsp;
#pragma pack(pop)
#define SYNC_PROTOCOL_VERSION 0
#define SYNC_SIGNATURE ((uint16_t)(0xCDEF))
extern char *statusType[];
uint16_t syncGenTranId();
int32_t syncCheckHead(SSyncHead *pHead);
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len);
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code);
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId);
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_VNODEPEER_H
...@@ -80,32 +80,6 @@ char *syncStatus[] = { ...@@ -80,32 +80,6 @@ char *syncStatus[] = {
"invalid" "invalid"
}; };
typedef enum {
SYNC_STATUS_BROADCAST,
SYNC_STATUS_BROADCAST_RSP,
SYNC_STATUS_SETUP_CONN,
SYNC_STATUS_SETUP_CONN_RSP,
SYNC_STATUS_EXCHANGE_DATA,
SYNC_STATUS_EXCHANGE_DATA_RSP,
SYNC_STATUS_CHECK_ROLE,
SYNC_STATUS_CHECK_ROLE_RSP
} ESyncStatusType;
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp",
"check-role",
"check-role-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
int32_t syncInit() { int32_t syncInit() {
SPoolInfo info = {0}; SPoolInfo info = {0};
...@@ -384,19 +358,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { ...@@ -384,19 +358,13 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
SSyncPeer *pPeer = pNode->pMaster; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) { if (pPeer && pNode->quorum > 1) {
SFwdRsp fwdRsp = {0}; SFwdRsp rsp;
syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code);
fwdRsp.head.type = TAOS_SMSG_FORWARD_RSP;
fwdRsp.head.protocol = SYNC_PROTOCOL_VERSION;
fwdRsp.head.vgId = pNode->vgId;
fwdRsp.head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
fwdRsp.version = version;
fwdRsp.code = code;
if (taosWriteMsg(pPeer->peerFd, &fwdRsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) { if (taosWriteMsg(pPeer->peerFd, &rsp, sizeof(SFwdRsp)) == sizeof(SFwdRsp)) {
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); sTrace("%s, forward-rsp is sent, code:0x%x hver:%" PRIu64, pPeer->id, code, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward-rsp, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
} }
} }
...@@ -864,14 +832,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -864,14 +832,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sDebug("%s, try to sync", pPeer->id); sDebug("%s, try to sync", pPeer->id);
SSyncMsg msg; SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg)); syncBuildSyncReqMsg(&msg, pNode->vgId);
msg.head.type = TAOS_SMSG_SYNC_REQ;
msg.head.protocol = SYNC_PROTOCOL_VERSION;
msg.head.vgId = pNode->vgId;
msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
msg.port = tsSyncPort;
msg.tranId = syncGenTranId();
tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer); taosTmrReset(syncNotStarted, tsSyncTimer * 1000, pPeer, tsSyncTmrCtrl, &pPeer->timer);
...@@ -882,9 +843,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) { ...@@ -882,9 +843,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
} }
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { static void syncProcessFwdResponse(SFwdRsp *pFwdRsp, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp * pFwdRsp = (SFwdRsp *)cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo * pFwdInfo; SFwdInfo * pFwdInfo;
...@@ -914,7 +874,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { ...@@ -914,7 +874,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SWalHead * pHead = (SWalHead *)cont; SWalHead * pHead = (SWalHead *)(cont + sizeof(SSyncHead));
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
...@@ -931,9 +891,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { ...@@ -931,9 +891,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
} }
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
SSyncNode * pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is received, self:%s:%s:%" PRIu64 ", peer:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeersStatus->role],
...@@ -947,23 +906,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) { ...@@ -947,23 +906,22 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
} }
} }
static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead) {
if (pPeer->peerFd < 0) return -1; if (pPeer->peerFd < 0) return -1;
int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead)); int32_t hlen = taosReadMsg(pPeer->peerFd, pHead, sizeof(SSyncHead));
if (hlen != sizeof(SSyncHead)) { if (hlen != sizeof(SSyncHead)) {
sDebug("%s, failed to read msg, hlen:%d", pPeer->id, hlen); sDebug("%s, failed to read msg since %s, hlen:%d", pPeer->id, tstrerror(errno), hlen);
return -1; return -1;
} }
// head.len = htonl(head.len); int32_t code = syncCheckHead(pHead);
if (pHead->len < 0) { if (code != 0) {
sError("%s, invalid msg length, hlen:%d", pPeer->id, pHead->len); sError("%s, failed to check msg head since %s, type:%d", pPeer->id, tstrerror(code), pHead->type);
return -1; return -1;
} }
assert(pHead->len <= TSDB_MAX_WAL_SIZE); int32_t bytes = taosReadMsg(pPeer->peerFd, (char *)pHead + sizeof(SSyncHead), pHead->len);
int32_t bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len); sError("%s, failed to read, bytes:%d len:%d", pPeer->id, bytes, pHead->len);
return -1; return -1;
...@@ -974,23 +932,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -974,23 +932,22 @@ static int32_t syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
static int32_t syncProcessPeerMsg(void *param, void *buffer) { static int32_t syncProcessPeerMsg(void *param, void *buffer) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead *pHead = buffer;
char * cont = buffer;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
int32_t code = syncReadPeerMsg(pPeer, &head, cont); int32_t code = syncReadPeerMsg(pPeer, pHead);
if (code == 0) { if (code == 0) {
if (head.type == TAOS_SMSG_FORWARD) { if (pHead->type == TAOS_SMSG_SYNC_FWD) {
syncProcessForwardFromPeer(cont, pPeer); syncProcessForwardFromPeer(buffer, pPeer);
} else if (head.type == TAOS_SMSG_FORWARD_RSP) { } else if (pHead->type == TAOS_SMSG_SYNC_FWD_RSP) {
syncProcessFwdResponse(cont, pPeer); syncProcessFwdResponse(buffer, pPeer);
} else if (head.type == TAOS_SMSG_SYNC_REQ) { } else if (pHead->type == TAOS_SMSG_SYNC_REQ) {
syncProcessSyncRequest(cont, pPeer); syncProcessSyncRequest(buffer, pPeer);
} else if (head.type == TAOS_SMSG_STATUS) { } else if (pHead->type == TAOS_SMSG_STATUS) {
syncProcessPeersStatusMsg(cont, pPeer); syncProcessPeersStatusMsg(buffer, pPeer);
} }
} }
...@@ -999,36 +956,29 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) { ...@@ -999,36 +956,29 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
return code; return code;
} }
#define statusMsgLen sizeof(SSyncHead) + sizeof(SPeersStatus) + sizeof(SPeerStatus) * TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) { static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId) {
SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0};
if (pPeer->peerFd < 0 || pPeer->ip == 0) return; if (pPeer->peerFd < 0 || pPeer->ip == 0) return;
SSyncHead * pHead = (SSyncHead *)msg; SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)(msg + sizeof(SSyncHead)); SPeersStatus msg = {0};
pHead->type = TAOS_SMSG_STATUS; syncBuildPeersStatus(&msg, pNode->vgId);
pHead->len = statusMsgLen - sizeof(SSyncHead);
pPeersStatus->version = nodeVersion; msg.role = nodeRole;
pPeersStatus->role = nodeRole; msg.ack = ack;
pPeersStatus->ack = ack; msg.type = type;
pPeersStatus->type = type; msg.tranId = tranId;
pPeersStatus->tranId = tranId; msg.version = nodeVersion;
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t i = 0; i < pNode->replica; ++i) {
pPeersStatus->peersStatus[i].role = pNode->peerInfo[i]->role; msg.peersStatus[i].role = pNode->peerInfo[i]->role;
pPeersStatus->peersStatus[i].version = pNode->peerInfo[i]->version; msg.peersStatus[i].version = pNode->peerInfo[i]->version;
} }
if (taosWriteMsg(pPeer->peerFd, msg, statusMsgLen) == statusMsgLen) { if (taosWriteMsg(pPeer->peerFd, &msg, sizeof(SPeersStatus)) == sizeof(SPeersStatus)) {
sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d", sDebug("%s, status is sent, self:%s:%s:%" PRIu64 ", peer:%s:%s:%" PRIu64 ", ack:%d tranId:%u type:%s pfd:%d",
pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role], pPeer->id, syncRole[nodeRole], syncStatus[nodeSStatus], nodeVersion, syncRole[pPeer->role],
syncStatus[pPeer->sstatus], pPeer->version, pPeersStatus->ack, pPeersStatus->tranId, syncStatus[pPeer->sstatus], pPeer->version, ack, tranId, statusType[type], pPeer->peerFd);
statusType[pPeersStatus->type], pPeer->peerFd);
} else { } else {
sDebug("%s, failed to send status msg, restart", pPeer->id); sDebug("%s, failed to send status msg, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
...@@ -1053,15 +1003,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1053,15 +1003,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} }
SSyncMsg msg; SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg)); syncBuildSyncSetupMsg(&msg, pPeer->nodeId ? pNode->vgId : 0);
msg.head.type = TAOS_SMSG_STATUS;
msg.head.protocol = SYNC_PROTOCOL_VERSION;
msg.head.vgId = pPeer->nodeId ? pNode->vgId : 0;
msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
msg.port = tsSyncPort;
msg.tranId = syncGenTranId();
msg.sourceId = pNode->vgId; // tell arbitrator its vgId
tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) { if (taosWriteMsg(connFd, &msg, sizeof(SSyncMsg)) == sizeof(SSyncMsg)) {
sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId); sDebug("%s, connection to peer server is setup, pfd:%d sfd:%d tranId:%u", pPeer->id, connFd, pPeer->syncFd, msg.tranId);
...@@ -1341,10 +1283,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1341,10 +1283,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// a hacker way to improve the performance // a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD; syncBuildSyncFwdMsg(pSyncHead, pNode->vgId, sizeof(SWalHead) + pWalHead->len);
pSyncHead->protocol = SYNC_PROTOCOL_VERSION;
pSyncHead->vgId = pNode->vgId;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&pNode->mutex); pthread_mutex_lock(&pNode->mutex);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "syncInt.h"
char *statusType[] = {
"broadcast",
"broadcast-rsp",
"setup-conn",
"setup-conn-rsp",
"exchange-data",
"exchange-data-rsp",
"check-role",
"check-role-rsp"
};
uint16_t syncGenTranId() {
return taosRand() & 0XFFFF;
}
static void syncBuildHead(SSyncHead *pHead) {
pHead->protocol = SYNC_PROTOCOL_VERSION;
pHead->signature = SYNC_SIGNATURE;
pHead->code = 0;
pHead->cId = 0;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SSyncHead));
}
int32_t syncCheckHead(SSyncHead *pHead) {
if (pHead->protocol != SYNC_PROTOCOL_VERSION) return TSDB_CODE_SYN_MISMATCHED_PROTOCOL;
if (pHead->signature != SYNC_SIGNATURE) return TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
if (pHead->cId != 0) return TSDB_CODE_SYN_MISMATCHED_CLUSTERID;
if (pHead->len <= 0 || pHead->len > TSDB_MAX_WAL_SIZE) return TSDB_CODE_SYN_INVALID_MSGLEN;
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SSyncHead))) return TSDB_CODE_SYN_INVALID_CHECKSUM;
return TSDB_CODE_SUCCESS;
}
void syncBuildSyncFwdMsg(SSyncHead *pHead, int32_t vgId, int32_t len) {
pHead->type = TAOS_SMSG_SYNC_FWD;
pHead->vgId = vgId;
pHead->len = len;
syncBuildHead(pHead);
}
void syncBuildSyncFwdRsp(SFwdRsp *pMsg, int32_t vgId, uint64_t version, int32_t code) {
pMsg->head.type = TAOS_SMSG_SYNC_FWD_RSP;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SFwdRsp) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
pMsg->version = version;
pMsg->code = code;
}
static void syncBuildMsg(SSyncMsg *pMsg, int32_t vgId, ESyncMsgType type) {
pMsg->head.type = type;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
pMsg->port = tsSyncPort;
pMsg->tranId = syncGenTranId();
pMsg->sourceId = vgId;
tstrncpy(pMsg->fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
}
void syncBuildSyncReqMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_REQ); }
void syncBuildSyncDataMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SYNC_DATA); }
void syncBuildSyncSetupMsg(SSyncMsg *pMsg, int32_t vgId) { syncBuildMsg(pMsg, vgId, TAOS_SMSG_SETUP); }
void syncBuildPeersStatus(SPeersStatus *pMsg, int32_t vgId) {
pMsg->head.type = TAOS_SMSG_STATUS;
pMsg->head.vgId = vgId;
pMsg->head.len = sizeof(SPeersStatus) - sizeof(SSyncHead);
syncBuildHead(&pMsg->head);
}
...@@ -406,14 +406,7 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) { ...@@ -406,14 +406,7 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SSyncMsg msg; SSyncMsg msg;
memset(&msg, 0, sizeof(SSyncMsg)); syncBuildSyncDataMsg(&msg, pNode->vgId);
msg.head.type = TAOS_SMSG_SYNC_DATA;
msg.head.protocol = SYNC_PROTOCOL_VERSION;
msg.head.vgId = pNode->vgId;
msg.head.len = sizeof(SSyncMsg) - sizeof(SSyncHead);
msg.port = tsSyncPort;
msg.tranId = syncGenTranId();
tstrncpy(msg.fqdn, tsNodeFqdn, TSDB_FQDN_LEN);
if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) { if (taosWriteMsg(pPeer->syncFd, &msg, sizeof(SSyncMsg)) != sizeof(SSyncMsg)) {
sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId); sError("%s, failed to send sync-data msg since %s, tranId:%u", pPeer->id, strerror(errno), msg.tranId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册