提交 37c2d3eb 编写于 作者: S Shengliang Guan

minor changes

上级 b3a78f8f
...@@ -108,8 +108,7 @@ static void syncModuleInitFunc() { ...@@ -108,8 +108,7 @@ static void syncModuleInitFunc() {
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
} }
void *syncStart(const SSyncInfo *pInfo) void *syncStart(const SSyncInfo *pInfo) {
{
const SSyncCfg *pCfg = &pInfo->syncCfg; const SSyncCfg *pCfg = &pInfo->syncCfg;
SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1); SSyncNode *pNode = (SSyncNode *) calloc(sizeof(SSyncNode), 1);
...@@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo) ...@@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo)
return pNode; return pNode;
} }
void syncStop(void *param) void syncStop(void *param) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
if (pNode == NULL) return; if (pNode == NULL) return;
...@@ -215,9 +213,8 @@ void syncStop(void *param) ...@@ -215,9 +213,8 @@ void syncStop(void *param)
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param;
int i, j; int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG;
...@@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) ...@@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg)
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
{ SSyncNode * pNode = param;
SSyncNode *pNode = param; SSyncPeer * pPeer;
SSyncPeer *pPeer;
SSyncHead *pSyncHead; SSyncHead *pSyncHead;
SWalHead *pWalHead = data; SWalHead *pWalHead = data;
int fwdLen; int fwdLen;
...@@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) ...@@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype)
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) void syncConfirmForward(void *param, uint64_t version, int32_t code) {
{ SSyncNode *pNode = param;
SSyncNode *pNode = param;
if (pNode == NULL) return; if (pNode == NULL) return;
if (pNode->quorum <= 1) return; if (pNode->quorum <= 1) return;
...@@ -387,10 +382,9 @@ void syncRecover(void *param) { ...@@ -387,10 +382,9 @@ void syncRecover(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
{
SSyncNode *pNode = param; SSyncNode *pNode = param;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i=0; i<pNode->replica; ++i) { for (int i=0; i<pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
...@@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole) ...@@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole)
return 0; return 0;
} }
static void syncAddArbitrator(SSyncNode *pNode) static void syncAddArbitrator(SSyncNode *pNode) {
{
SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA]; SSyncPeer *pPeer = pNode->peerInfo[TAOS_SYNC_MAX_REPLICA];
// if not configured, return right away // if not configured, return right away
...@@ -454,13 +447,11 @@ static void syncDecNodeRef(SSyncNode *pNode) ...@@ -454,13 +447,11 @@ static void syncDecNodeRef(SSyncNode *pNode)
} }
} }
void syncAddPeerRef(SSyncPeer *pPeer) void syncAddPeerRef(SSyncPeer *pPeer) {
{
atomic_add_fetch_8(&pPeer->refCount, 1); atomic_add_fetch_8(&pPeer->refCount, 1);
} }
int syncDecPeerRef(SSyncPeer *pPeer) int syncDecPeerRef(SSyncPeer *pPeer) {
{
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
syncDecNodeRef(pPeer->pSyncNode); syncDecNodeRef(pPeer->pSyncNode);
...@@ -473,18 +464,16 @@ int syncDecPeerRef(SSyncPeer *pPeer) ...@@ -473,18 +464,16 @@ int syncDecPeerRef(SSyncPeer *pPeer)
return 1; return 1;
} }
static void syncClosePeerConn(SSyncPeer *pPeer) static void syncClosePeerConn(SSyncPeer *pPeer) {
{
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
taosClose(pPeer->syncFd); taosClose(pPeer->syncFd);
if (pPeer->peerFd >=0) { if (pPeer->peerFd >= 0) {
pPeer->peerFd = -1; pPeer->peerFd = -1;
taosFreeTcpConn(pPeer->pConn); taosFreeTcpConn(pPeer->pConn);
} }
} }
static void syncRemovePeer(SSyncPeer *pPeer) static void syncRemovePeer(SSyncPeer *pPeer) {
{
sInfo("%s, it is removed", pPeer->id); sInfo("%s, it is removed", pPeer->id);
pPeer->ip = 0; pPeer->ip = 0;
...@@ -492,8 +481,7 @@ static void syncRemovePeer(SSyncPeer *pPeer) ...@@ -492,8 +481,7 @@ static void syncRemovePeer(SSyncPeer *pPeer)
syncDecPeerRef(pPeer); syncDecPeerRef(pPeer);
} }
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
{
uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn); uint32_t ip = taosGetIpFromFqdn(pInfo->nodeFqdn);
if (ip == -1) return NULL; if (ip == -1) return NULL;
...@@ -523,25 +511,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) ...@@ -523,25 +511,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
return pPeer; return pPeer;
} }
void syncBroadcastStatus(SSyncNode *pNode) void syncBroadcastStatus(SSyncNode *pNode) {
{
SSyncPeer *pPeer; SSyncPeer *pPeer;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
if ( i == pNode->selfIndex ) continue; if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i]; pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1); syncSendPeersStatusMsgToPeer(pPeer, 1);
} }
} }
static void syncResetFlowCtrl(SSyncNode *pNode) { static void syncResetFlowCtrl(SSyncNode *pNode) {
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pNode->peerInfo[i]->numOfRetrieves = 0; pNode->peerInfo[i]->numOfRetrieves = 0;
} }
if (pNode->notifyFlowCtrl) if (pNode->notifyFlowCtrl) {
(*pNode->notifyFlowCtrl)(pNode->ahandle, 0); (*pNode->notifyFlowCtrl)(pNode->ahandle, 0);
}
} }
static void syncChooseMaster(SSyncNode *pNode) { static void syncChooseMaster(SSyncNode *pNode) {
...@@ -598,9 +585,9 @@ static void syncChooseMaster(SSyncNode *pNode) { ...@@ -598,9 +585,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
} else { } else {
sDebug("vgId:%d, failed to choose master", pNode->vgId); sDebug("vgId:%d, failed to choose master", pNode->vgId);
} }
} }
static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
int onlineNum = 0; int onlineNum = 0;
int index = -1; int index = -1;
int replica = pNode->replica; int replica = pNode->replica;
...@@ -617,7 +604,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { ...@@ -617,7 +604,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
replica = pNode->replica + 1; replica = pNode->replica + 1;
} }
if (onlineNum <= replica*0.5) { if (onlineNum <= replica * 0.5) {
if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole != TAOS_SYNC_ROLE_UNSYNCED) {
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
pNode->peerInfo[pNode->selfIndex]->role = nodeRole; pNode->peerInfo[pNode->selfIndex]->role = nodeRole;
...@@ -625,13 +612,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { ...@@ -625,13 +612,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica); sInfo("vgId:%d, change to unsynced state, online:%d replica:%d", pNode->vgId, onlineNum, replica);
} }
} else { } else {
for (int i=0; i<pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
SSyncPeer *pTemp = pNode->peerInfo[i]; SSyncPeer *pTemp = pNode->peerInfo[i];
if ( pTemp->role != TAOS_SYNC_ROLE_MASTER ) continue; if (pTemp->role != TAOS_SYNC_ROLE_MASTER) continue;
if ( index < 0 ) { if (index < 0) {
index = i; index = i;
} else { // multiple masters, it shall not happen } else { // multiple masters, it shall not happen
if ( i == pNode->selfIndex ) { if (i == pNode->selfIndex) {
sError("%s, peer is master, work as slave instead", pTemp->id); sError("%s, peer is master, work as slave instead", pTemp->id);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
...@@ -640,7 +627,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) { ...@@ -640,7 +627,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
} }
} }
SSyncPeer *pMaster = (index>=0) ? pNode->peerInfo[index]:NULL; SSyncPeer *pMaster = (index >= 0) ? pNode->peerInfo[index] : NULL;
return pMaster; return pMaster;
} }
...@@ -649,7 +636,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) { ...@@ -649,7 +636,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
int code = 0; int code = 0;
if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) { if (nodeRole == TAOS_SYNC_ROLE_MASTER && nodeVersion < pPeer->version) {
sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id); sDebug("%s, slave has higher version, restart all connections!!!", pPeer->id);
nodeRole = TAOS_SYNC_ROLE_UNSYNCED; nodeRole = TAOS_SYNC_ROLE_UNSYNCED;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
code = -1; code = -1;
...@@ -658,13 +645,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) { ...@@ -658,13 +645,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
if ( i == pNode->selfIndex ) continue; if ( i == pNode->selfIndex ) continue;
syncRestartPeer(pNode->peerInfo[i]); syncRestartPeer(pNode->peerInfo[i]);
} }
} }
return code; return code;
} }
static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t newRole) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int8_t peerOldRole = pPeer->role; int8_t peerOldRole = pPeer->role;
int8_t selfOldRole = nodeRole; int8_t selfOldRole = nodeRole;
...@@ -686,14 +672,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne ...@@ -686,14 +672,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if (syncValidateMaster(pPeer) < 0) return; if (syncValidateMaster(pPeer) < 0) return;
if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) { if (nodeRole == TAOS_SYNC_ROLE_UNSYNCED) {
if ( nodeVersion < pMaster->version) { if (nodeVersion < pMaster->version) {
syncRequired = 1; syncRequired = 1;
} else { } else {
sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version); sInfo("%s is master, work as slave, ver:%" PRIu64, pMaster->id, pMaster->version);
nodeRole = TAOS_SYNC_ROLE_SLAVE; nodeRole = TAOS_SYNC_ROLE_SLAVE;
(*pNode->notifyRole)(pNode->ahandle, nodeRole); (*pNode->notifyRole)(pNode->ahandle, nodeRole);
} }
} else if ( nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) { } else if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pMaster == pPeer) {
// nodeVersion = pMaster->version; // nodeVersion = pMaster->version;
} }
} else { } else {
...@@ -734,20 +720,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) { ...@@ -734,20 +720,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer->sstatus = TAOS_SYNC_STATUS_INIT; pPeer->sstatus = TAOS_SYNC_STATUS_INIT;
int ret = strcmp(pPeer->fqdn, tsNodeFqdn); int ret = strcmp(pPeer->fqdn, tsNodeFqdn);
if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort) ) if (ret > 0 || (ret == 0 && pPeer->port > tsSyncPort))
taosTmrReset(syncCheckPeerConnection, tsSyncTimer*1000, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, tsSyncTimer * 1000, pPeer, syncTmrCtrl, &pPeer->timer);
} }
void syncRestartConnection(SSyncPeer *pPeer) void syncRestartConnection(SSyncPeer *pPeer) {
{
if (pPeer->ip == 0) return; if (pPeer->ip == 0) return;
syncRestartPeer(pPeer); syncRestartPeer(pPeer);
syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE); syncCheckRole(pPeer, NULL, TAOS_SYNC_ROLE_OFFLINE);
} }
static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
sDebug("%s, sync-req is received", pPeer->id); sDebug("%s, sync-req is received", pPeer->id);
...@@ -782,8 +766,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) ...@@ -782,8 +766,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer)
} }
} }
static void syncNotStarted(void *param, void *tmrId) static void syncNotStarted(void *param, void *tmrId) {
{
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
...@@ -803,14 +786,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) { ...@@ -803,14 +786,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
static void syncRecoverFromMaster(SSyncPeer *pPeer) static void syncRecoverFromMaster(SSyncPeer *pPeer) {
{ SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
if ( nodeSStatus != TAOS_SYNC_STATUS_INIT) { if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus); sDebug("%s, sync is already started, status:%d", pPeer->id, nodeSStatus);
return; return;
} }
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
if (tsSyncNum >= tsMaxSyncNum) { if (tsSyncNum >= tsMaxSyncNum) {
...@@ -840,9 +822,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) ...@@ -840,9 +822,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer)
return; return;
} }
static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFwdRsp *pFwdRsp = (SFwdRsp *) cont; SFwdRsp *pFwdRsp = (SFwdRsp *) cont;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
SFwdInfo *pFwdInfo; SFwdInfo *pFwdInfo;
...@@ -862,10 +843,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) ...@@ -862,10 +843,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer)
} }
} }
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) SSyncNode * pNode = pPeer->pSyncNode;
{
SSyncNode *pNode = pPeer->pSyncNode;
SWalHead *pHead = (SWalHead *)cont; SWalHead *pHead = (SWalHead *)cont;
sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, forward is received, ver:%" PRIu64, pPeer->id, pHead->version);
...@@ -884,9 +863,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) ...@@ -884,9 +863,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer)
return; return;
} }
static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SPeersStatus *pPeersStatus = (SPeersStatus *)cont; SPeersStatus *pPeersStatus = (SPeersStatus *)cont;
sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id, sDebug("%s, status msg received, self:%s ver:%" PRIu64 " peer:%s ver:%" PRIu64 ", ack:%d", pPeer->id,
...@@ -909,10 +887,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -909,10 +887,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
} }
// head.len = htonl(head.len); // head.len = htonl(head.len);
if (pHead->len <0) { if (pHead->len < 0) {
sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len); sError("%s, invalid pkt length, len:%d", pPeer->id, pHead->len);
return -1; return -1;
} }
int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len); int bytes = taosReadMsg(pPeer->peerFd, cont, pHead->len);
if (bytes != pHead->len) { if (bytes != pHead->len) {
...@@ -923,9 +901,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) { ...@@ -923,9 +901,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return 0; return 0;
} }
static int syncProcessPeerMsg(void *param, void *buffer) static int syncProcessPeerMsg(void *param, void *buffer) {
{ SSyncPeer * pPeer = param;
SSyncPeer *pPeer = param;
SSyncHead head; SSyncHead head;
char *cont = (char *)buffer; char *cont = (char *)buffer;
...@@ -953,8 +930,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) ...@@ -953,8 +930,7 @@ static int syncProcessPeerMsg(void *param, void *buffer)
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA #define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
char msg[statusMsgLen] = {0}; char msg[statusMsgLen] = {0};
...@@ -1011,7 +987,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1011,7 +987,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId firstPkt.sourceId = pNode->vgId; // tell arbitrator its vgId
if ( write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) { if (write(connFd, &firstPkt, sizeof(firstPkt)) == sizeof(firstPkt)) {
sDebug("%s, connection to peer server is setup", pPeer->id); sDebug("%s, connection to peer server is setup", pPeer->id);
pPeer->peerFd = connFd; pPeer->peerFd = connFd;
pPeer->role = TAOS_SYNC_ROLE_UNSYNCED; pPeer->role = TAOS_SYNC_ROLE_UNSYNCED;
...@@ -1024,8 +1000,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { ...@@ -1024,8 +1000,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
} }
} }
static void syncCheckPeerConnection(void *param, void *tmrId) static void syncCheckPeerConnection(void *param, void *tmrId) {
{
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
...@@ -1037,8 +1012,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) ...@@ -1037,8 +1012,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId)
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
} }
static void syncCreateRestoreDataThread(SSyncPeer *pPeer) static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
{
taosTmrStopA(&pPeer->timer); taosTmrStopA(&pPeer->timer);
pthread_attr_t thattr; pthread_attr_t thattr;
...@@ -1059,8 +1033,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) ...@@ -1059,8 +1033,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer)
} }
} }
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
{
char ipstr[24]; char ipstr[24];
int i; int i;
...@@ -1137,8 +1110,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1137,8 +1110,7 @@ static void syncProcessBrokenLink(void *param) {
syncDecNodeRef(pNode); syncDecNodeRef(pNode);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
...@@ -1160,8 +1132,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) ...@@ -1160,8 +1132,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle)
sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); sDebug("vgId:%d, fwd info is saved, ver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
} }
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
{
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
int fwds = pSyncFwds->fwds; int fwds = pSyncFwds->fwds;
...@@ -1178,8 +1149,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) ...@@ -1178,8 +1149,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode)
} }
} }
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code) {
{
int confirm = 0; int confirm = 0;
if (pFwdInfo->code == 0) pFwdInfo->code = code; if (pFwdInfo->code == 0) pFwdInfo->code = code;
...@@ -1200,8 +1170,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1200,8 +1170,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
} }
} }
static void syncMonitorFwdInfos(void *param, void *tmrId) static void syncMonitorFwdInfos(void *param, void *tmrId) {
{
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); uint64_t time = taosGetTimestampMs();
...@@ -1220,6 +1189,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) ...@@ -1220,6 +1189,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId)
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
} }
...@@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind ...@@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
} }
} }
static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info SFileInfo minfo; memset(&minfo, 0, sizeof(minfo)); /* = {0}; */ // master file info
SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info SFileInfo sinfo; memset(&sinfo, 0, sizeof(sinfo)); /* = {0}; */ // slave file info
...@@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) ...@@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
return code; return code;
} }
static int syncRestoreWal(SSyncPeer *pPeer) static int syncRestoreWal(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int ret, code = -1; int ret, code = -1;
...@@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) ...@@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
return offset; return offset;
} }
static int syncProcessBufferedFwd(SSyncPeer *pPeer) static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
int forwards = 0; int forwards = 0;
...@@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) ...@@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer)
return pRecv->code; return pRecv->code;
} }
int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
SRecvBuffer *pRecv = pNode->pRecv; SRecvBuffer *pRecv = pNode->pRecv;
...@@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) ...@@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead)
return pRecv->code; return pRecv->code;
} }
static void syncCloseRecvBuffer(SSyncNode *pNode) static void syncCloseRecvBuffer(SSyncNode *pNode) {
{
if (pNode->pRecv) { if (pNode->pRecv) {
taosTFree(pNode->pRecv->buffer); taosTFree(pNode->pRecv->buffer);
} }
...@@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) ...@@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode)
taosTFree(pNode->pRecv); taosTFree(pNode->pRecv);
} }
static int syncOpenRecvBuffer(SSyncNode *pNode) static int syncOpenRecvBuffer(SSyncNode *pNode) {
{
syncCloseRecvBuffer(pNode); syncCloseRecvBuffer(pNode);
SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1); SRecvBuffer *pRecv = calloc(sizeof(SRecvBuffer), 1);
...@@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) ...@@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode)
return 0; return 0;
} }
static int syncRestoreDataStepByStep(SSyncPeer *pPeer) static int syncRestoreDataStepByStep(SSyncPeer *pPeer) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
nodeSStatus = TAOS_SYNC_STATUS_FILE; nodeSStatus = TAOS_SYNC_STATUS_FILE;
uint64_t fversion = 0; uint64_t fversion = 0;
...@@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer) ...@@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer)
return 0; return 0;
} }
void *syncRestoreData(void *param) void *syncRestoreData(void *param) {
{ SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param; SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
__sync_fetch_and_add(&tsSyncNum, 1); __sync_fetch_and_add(&tsSyncNum, 1);
...@@ -326,4 +318,3 @@ void *syncRestoreData(void *param) ...@@ -326,4 +318,3 @@ void *syncRestoreData(void *param)
return NULL; return NULL;
} }
...@@ -27,11 +27,10 @@ ...@@ -27,11 +27,10 @@
#include "tsync.h" #include "tsync.h"
#include "syncInt.h" #include "syncInt.h"
static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
{
sDebug("%s, start to monitor:%s", pPeer->id, name); sDebug("%s, start to monitor:%s", pPeer->id, name);
if (pPeer->notifyFd <=0) { if (pPeer->notifyFd <= 0) {
pPeer->watchNum = 0; pPeer->watchNum = 0;
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
if (pPeer->notifyFd < 0) { if (pPeer->notifyFd < 0) {
...@@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) ...@@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
return 0; return 0;
} }
static int syncAreFilesModified(SSyncPeer *pPeer) static int syncAreFilesModified(SSyncPeer *pPeer) {
{ if (pPeer->notifyFd <= 0) return 0;
if (pPeer->notifyFd <=0) return 0;
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
...@@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) ...@@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
} }
} }
return code; return code;
} }
static int syncRetrieveFile(SSyncPeer *pPeer) static int syncRetrieveFile(SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFileInfo fileInfo; SFileInfo fileInfo;
SFileAck fileAck; SFileAck fileAck;
int code = -1; int code = -1;
...@@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) ...@@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// wait for the ack from peer // wait for the ack from peer
ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck)); ret = taosReadMsg(pPeer->syncFd, &(fileAck), sizeof(fileAck));
if (ret <0) break; if (ret < 0) break;
// set the peer sync version // set the peer sync version
pPeer->sversion = fileInfo.fversion; pPeer->sversion = fileInfo.fversion;
...@@ -148,11 +145,11 @@ static int syncRetrieveFile(SSyncPeer *pPeer) ...@@ -148,11 +145,11 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// send the file to peer // send the file to peer
int sfd = open(name, O_RDONLY); int sfd = open(name, O_RDONLY);
if ( sfd < 0 ) break; if (sfd < 0) break;
ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size); ret = taosTSendFile(pPeer->syncFd, sfd, NULL, fileInfo.size);
close(sfd); close(sfd);
if (ret <0) break; if (ret < 0) break;
sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size); sDebug("%s, %s is sent, size:%" PRId64, pPeer->id, name, fileInfo.size);
fileInfo.index++; fileInfo.index++;
...@@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) ...@@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
/* if only a partial record is read out, set the IN_MODIFY flag in event, /* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */ so upper layer will reload the file to get a complete record */
static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) {
{
int ret; int ret;
ret = read(sfd, pHead, sizeof(SWalHead)); ret = read(sfd, pHead, sizeof(SWalHead));
...@@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) ...@@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
} }
ret = read(sfd, pHead->cont, pHead->len); ret = read(sfd, pHead->cont, pHead->len);
if (ret <0) return -1; if (ret < 0) return -1;
if (ret != pHead->len) { if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded // file is not at end yet, it shall be reloaded
...@@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) ...@@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
} }
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
} }
static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
{
pPeer->watchNum = 0; pPeer->watchNum = 0;
taosClose(pPeer->notifyFd); taosClose(pPeer->notifyFd);
pPeer->notifyFd = inotify_init1(IN_NONBLOCK); pPeer->notifyFd = inotify_init1(IN_NONBLOCK);
...@@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) ...@@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name)
return -1; return -1;
} }
return 0; return 0;
} }
static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
{
char buf[2048]; char buf[2048];
int len = read(pPeer->notifyFd, buf, sizeof(buf)); int len = read(pPeer->notifyFd, buf, sizeof(buf));
if (len <0 && errno != EAGAIN) { if (len < 0 && errno != EAGAIN) {
sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno));
return -1; return -1;
} }
if (len == 0) return 0; if (len == 0) return 0;
struct inotify_event *event; struct inotify_event *event;
...@@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) ...@@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent)
return 0; return 0;
} }
static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, int64_t offset, uint32_t *pEvent) {
{
SWalHead *pHead = (SWalHead *) malloc(640000); SWalHead *pHead = (SWalHead *) malloc(640000);
int code = -1; int code = -1;
int32_t bytes = 0; int32_t bytes = 0;
...@@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, ...@@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion); sDebug("%s, retrieve last wal, offset:%" PRId64 " fversion:%" PRIu64, pPeer->id, offset, fversion);
while (1) { while (1) {
int wsize = syncReadOneWalRecord(sfd, pHead, pEvent); int wsize = syncReadOneWalRecord(sfd, pHead, pEvent);
if (wsize <0) break; if (wsize < 0) break;
if (wsize == 0) { code = 0; break; } if (wsize == 0) {
code = 0;
break;
}
sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version); sDebug("%s, last wal is forwarded, ver:%" PRIu64, pPeer->id, pHead->version);
int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); int ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
...@@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion, ...@@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return -1; return -1;
} }
static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) {
{
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
int code = -1; int code = -1;
char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file char fname[TSDB_FILENAME_LEN * 2]; // full path to wal file
...@@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) ...@@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
} }
if (code < 0) break; if (code < 0) break;
if (pPeer->sversion >= fversion && fversion > 0) break; if (pPeer->sversion >= fversion && fversion > 0) break;
index++; wname[0] = 0; index++;
wname[0] = 0;
code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index); code = (*pNode->getWalInfo)(pNode->ahandle, wname, &index);
if ( code < 0) break; if (code < 0) break;
if ( wname[0] == 0 ) {code = 0; break;} if (wname[0] == 0) {
code = 0;
break;
}
// current last wal is closed, there is a new one // current last wal is closed, there is a new one
sDebug("%s, last wal is closed, try new one", pPeer->id); sDebug("%s, last wal is closed, try new one", pPeer->id);
...@@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index) ...@@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
return code; return code;
} }
static int syncRetrieveWal(SSyncPeer *pPeer) static int syncRetrieveWal(SSyncPeer *pPeer) {
{ SSyncNode * pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
char fname[TSDB_FILENAME_LEN * 3]; char fname[TSDB_FILENAME_LEN * 3];
char wname[TSDB_FILENAME_LEN * 2]; char wname[TSDB_FILENAME_LEN * 2];
int32_t size; int32_t size;
...@@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) ...@@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
// send wal file, // send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok // inotify is not required, old wal file won't be modified, even remove is ok
if ( stat(fname, &fstat) < 0 ) break; if (stat(fname, &fstat) < 0) break;
size = fstat.st_size; size = fstat.st_size;
sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size); sDebug("%s, retrieve wal:%s size:%d", pPeer->id, fname, size);
...@@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer) ...@@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
return code; return code;
} }
static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
{ SSyncNode *pNode = pPeer->pSyncNode;
SSyncNode *pNode = pPeer->pSyncNode;
SFirstPkt firstPkt; SFirstPkt firstPkt;
memset(&firstPkt, 0, sizeof(firstPkt)); memset(&firstPkt, 0, sizeof(firstPkt));
...@@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer) ...@@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
return 0; return 0;
} }
void *syncRetrieveData(void *param) void *syncRetrieveData(void *param) {
{ SSyncPeer * pPeer = (SSyncPeer *)param;
SSyncPeer *pPeer = (SSyncPeer *)param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
......
...@@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param); ...@@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param);
static SThreadObj *taosGetTcpThread(SPoolObj *pPool); static SThreadObj *taosGetTcpThread(SPoolObj *pPool);
static void taosStopPoolThread(SThreadObj* pThread); static void taosStopPoolThread(SThreadObj* pThread);
void *taosOpenTcpThreadPool(SPoolInfo *pInfo) void *taosOpenTcpThreadPool(SPoolInfo *pInfo) {
{
pthread_attr_t thattr; pthread_attr_t thattr;
SPoolObj *pPool = calloc(sizeof(SPoolObj), 1); SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
...@@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo) ...@@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo)
return pPool; return pPool;
} }
void taosCloseTcpThreadPool(void *param) void taosCloseTcpThreadPool(void *param) {
{
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
SThreadObj *pThread; SThreadObj *pThread;
...@@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param) ...@@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param)
uDebug("%p TCP pool is closed", pPool); uDebug("%p TCP pool is closed", pPool);
} }
void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) {
{
struct epoll_event event; struct epoll_event event;
SPoolObj *pPool = (SPoolObj *)param; SPoolObj *pPool = (SPoolObj *)param;
...@@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd) ...@@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd)
return pConn; return pConn;
} }
void taosFreeTcpConn(void *param) void taosFreeTcpConn(void *param) {
{ SConnObj * pConn = (SConnObj *)param;
SConnObj *pConn = (SConnObj *)param;
SThreadObj *pThread = pConn->pThread; SThreadObj *pThread = pConn->pThread;
uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd); uDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
......
...@@ -156,9 +156,8 @@ static void arbProcessBrokenLink(void *param) { ...@@ -156,9 +156,8 @@ static void arbProcessBrokenLink(void *param) {
taosTFree(pNode); taosTFree(pNode);
} }
static int arbProcessPeerMsg(void *param, void *buffer) static int arbProcessPeerMsg(void *param, void *buffer) {
{ SNodeConn * pNode = param;
SNodeConn *pNode = param;
SSyncHead head; SSyncHead head;
int bytes = 0; int bytes = 0;
char *cont = (char *)buffer; char *cont = (char *)buffer;
...@@ -180,7 +179,6 @@ static int arbProcessPeerMsg(void *param, void *buffer) ...@@ -180,7 +179,6 @@ static int arbProcessPeerMsg(void *param, void *buffer)
} }
static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) { static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) {
struct sigaction act = {{0}}; struct sigaction act = {{0}};
act.sa_handler = SIG_IGN; act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL); sigaction(SIGTERM, &act, NULL);
...@@ -192,4 +190,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context) ...@@ -192,4 +190,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context)
// inform main thread to exit // inform main thread to exit
tsem_post(&tsArbSem); tsem_post(&tsArbSem);
} }
...@@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->sync = syncStart(&syncInfo); pVnode->sync = syncStart(&syncInfo);
if (pVnode->sync == NULL) { if (pVnode->sync == NULL) {
vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return terrno; return terrno;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册