未验证 提交 acbfa5de 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4053 from taosdata/patch/TD-1715

Patch/td 1715
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tlog.h" #include "tlog.h"
#include "tutil.h" #include "tutil.h"
#include "ttimer.h" #include "ttimer.h"
#include "tref.h"
#include "tsocket.h" #include "tsocket.h"
#include "tglobal.h" #include "tglobal.h"
#include "taoserror.h" #include "taoserror.h"
...@@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN]; ...@@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN];
static ttpool_h tsTcpPool; static ttpool_h tsTcpPool;
static void * syncTmrCtrl = NULL; static void * syncTmrCtrl = NULL;
static void * vgIdHash; static void * vgIdHash;
static int tsSyncRefId = -1;
// local functions // local functions
static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer); static void syncProcessSyncRequest(char *pMsg, SSyncPeer *pPeer);
...@@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer); ...@@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer);
static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp); static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp);
static void syncRemovePeer(SSyncPeer *pPeer); static void syncRemovePeer(SSyncPeer *pPeer);
static void syncAddArbitrator(SSyncNode *pNode); static void syncAddArbitrator(SSyncNode *pNode);
static void syncAddNodeRef(SSyncNode *pNode); static void syncFreeNode(void *);
static void syncDecNodeRef(SSyncNode *pNode);
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode); static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode);
static void syncMonitorFwdInfos(void *param, void *tmrId); static void syncMonitorFwdInfos(void *param, void *tmrId);
static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code);
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle);
static void syncRestartPeer(SSyncPeer *pPeer); static void syncRestartPeer(SSyncPeer *pPeer);
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtyp);
static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo);
char* syncRole[] = { char* syncRole[] = {
...@@ -106,6 +108,12 @@ int32_t syncInit() { ...@@ -106,6 +108,12 @@ int32_t syncInit() {
return -1; return -1;
} }
tsSyncRefId = taosOpenRef(200, syncFreeNode);
if (tsSyncRefId < 0) {
syncCleanUp();
return -1;
}
tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn)); tstrncpy(tsNodeFqdn, tsLocalFqdn, sizeof(tsNodeFqdn));
sInfo("sync module initialized successfully"); sInfo("sync module initialized successfully");
...@@ -128,6 +136,9 @@ void syncCleanUp() { ...@@ -128,6 +136,9 @@ void syncCleanUp() {
vgIdHash = NULL; vgIdHash = NULL;
} }
taosCloseRef(tsSyncRefId);
tsSyncRefId = -1;
sInfo("sync module is cleaned up"); sInfo("sync module is cleaned up");
} }
...@@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode->quorum = pCfg->quorum; pNode->quorum = pCfg->quorum;
if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica; if (pNode->quorum > pNode->replica) pNode->quorum = pNode->replica;
int ret = taosAddRef(tsSyncRefId, pNode);
if (ret < 0) {
syncFreeNode(pNode);
return NULL;
}
for (int i = 0; i < pCfg->replica; ++i) { for (int i = 0; i < pCfg->replica; ++i) {
const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i; const SNodeInfo *pNodeInfo = pCfg->nodeInfo + i;
pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo); pNode->peerInfo[i] = syncAddPeer(pNode, pNodeInfo);
...@@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) { ...@@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) {
} }
} }
syncAddNodeRef(pNode);
if (pNode->selfIndex < 0) { if (pNode->selfIndex < 0) {
sInfo("vgId:%d, this node is not configured", pNode->vgId); sInfo("vgId:%d, this node is not configured", pNode->vgId);
terrno = TSDB_CODE_SYN_INVALID_CONFIG; terrno = TSDB_CODE_SYN_INVALID_CONFIG;
...@@ -210,7 +225,9 @@ void syncStop(void *param) { ...@@ -210,7 +225,9 @@ void syncStop(void *param) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
if (pNode == NULL) return; int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
sInfo("vgId:%d, cleanup sync", pNode->vgId); sInfo("vgId:%d, cleanup sync", pNode->vgId);
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
...@@ -228,14 +245,17 @@ void syncStop(void *param) { ...@@ -228,14 +245,17 @@ void syncStop(void *param) {
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode); taosReleaseRef(tsSyncRefId, pNode);
taosRemoveRef(tsSyncRefId, pNode);
} }
int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
int i, j; int i, j;
if (pNode == NULL) return TSDB_CODE_SYN_INVALID_CONFIG; int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return TSDB_CODE_SYN_INVALID_CONFIG;
sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica, sInfo("vgId:%d, reconfig, role:%s replica:%d old:%d", pNode->vgId, syncRole[nodeRole], pNewCfg->replica,
pNode->replica); pNode->replica);
...@@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) { ...@@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole[nodeRole]); syncRole[nodeRole]);
syncBroadcastStatus(pNode); syncBroadcastStatus(pNode);
taosReleaseRef(tsSyncRefId, pNode);
return 0; return 0;
} }
int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) { int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncPeer *pPeer;
SSyncHead *pSyncHead;
SWalHead * pWalHead = data;
int fwdLen;
int code = 0;
if (pNode == NULL) return 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
pWalHead->version, nodeVersion);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer);
}
return TSDB_CODE_SYN_INVALID_VERSION;
}
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
qtype, pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance int ret = taosAcquireRef(tsSyncRefId, pNode);
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); if (ret < 0) return 0;
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex)); int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype);
for (int i = 0; i < pNode->replica; ++i) { taosReleaseRef(tsSyncRefId, pNode);
pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
}
int retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) {
sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else {
sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
syncRestartConnection(pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
return code; return code;
} }
void syncConfirmForward(void *param, uint64_t version, int32_t code) { void syncConfirmForward(void *param, uint64_t version, int32_t code) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
if (pNode == NULL) return;
if (pNode->quorum <= 1) return;
SSyncPeer *pPeer = pNode->pMaster; int ret = taosAcquireRef(tsSyncRefId, pNode);
if (pPeer == NULL) return; if (ret < 0) return;
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0}; SSyncPeer *pPeer = pNode->pMaster;
if (pPeer && pNode->quorum > 1) {
char msg[sizeof(SSyncHead) + sizeof(SFwdRsp)] = {0};
SSyncHead *pHead = (SSyncHead *)msg; SSyncHead *pHead = (SSyncHead *)msg;
pHead->type = TAOS_SMSG_FORWARD_RSP; pHead->type = TAOS_SMSG_FORWARD_RSP;
pHead->len = sizeof(SFwdRsp); pHead->len = sizeof(SFwdRsp);
SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead)); SFwdRsp *pFwdRsp = (SFwdRsp *)(msg + sizeof(SSyncHead));
pFwdRsp->version = version; pFwdRsp->version = version;
pFwdRsp->code = code; pFwdRsp->code = code;
int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp); int msgLen = sizeof(SSyncHead) + sizeof(SFwdRsp);
int retLen = write(pPeer->peerFd, msg, msgLen); int retLen = write(pPeer->peerFd, msg, msgLen);
if (retLen == msgLen) { if (retLen == msgLen) {
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version); sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version);
} else { } else {
sDebug("%s, failed to send forward ack, restart", pPeer->id); sDebug("%s, failed to send forward ack, restart", pPeer->id);
syncRestartConnection(pPeer); syncRestartConnection(pPeer);
}
} }
taosReleaseRef(tsSyncRefId, pNode);
} }
void syncRecover(void *param) { void syncRecover(void *param) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
SSyncPeer *pPeer; SSyncPeer *pPeer;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return;
// to do: add a few lines to check if recover is OK // to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work // if take this node to unsync state, the whole system may not work
...@@ -414,17 +392,24 @@ void syncRecover(void *param) { ...@@ -414,17 +392,24 @@ void syncRecover(void *param) {
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
taosReleaseRef(tsSyncRefId, pNode);
} }
int syncGetNodesRole(void *param, SNodesRole *pNodesRole) { int syncGetNodesRole(void *param, SNodesRole *pNodesRole) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if (ret < 0) return -1;
pNodesRole->selfIndex = pNode->selfIndex; pNodesRole->selfIndex = pNode->selfIndex;
for (int i = 0; i < pNode->replica; ++i) { for (int i = 0; i < pNode->replica; ++i) {
pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId; pNodesRole->nodeId[i] = pNode->peerInfo[i]->nodeId;
pNodesRole->role[i] = pNode->peerInfo[i]->role; pNodesRole->role[i] = pNode->peerInfo[i]->role;
} }
taosReleaseRef(tsSyncRefId, pNode);
return 0; return 0;
} }
...@@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) { ...@@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) {
pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo); pNode->peerInfo[TAOS_SYNC_MAX_REPLICA] = syncAddPeer(pNode, &nodeInfo);
} }
static void syncAddNodeRef(SSyncNode *pNode) { atomic_add_fetch_8(&pNode->refCount, 1); } static void syncFreeNode(void *param) {
SSyncNode *pNode = param;
static void syncDecNodeRef(SSyncNode *pNode) { pthread_mutex_destroy(&pNode->mutex);
if (atomic_sub_fetch_8(&pNode->refCount, 1) == 0) { taosTFree(pNode->pRecv);
pthread_mutex_destroy(&pNode->mutex); taosTFree(pNode->pSyncFwds);
taosTFree(pNode->pRecv); taosTFree(pNode);
taosTFree(pNode->pSyncFwds);
taosTFree(pNode);
}
} }
void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); } void syncAddPeerRef(SSyncPeer *pPeer) { atomic_add_fetch_8(&pPeer->refCount, 1); }
int syncDecPeerRef(SSyncPeer *pPeer) { int syncDecPeerRef(SSyncPeer *pPeer) {
if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) { if (atomic_sub_fetch_8(&pPeer->refCount, 1) == 0) {
syncDecNodeRef(pPeer->pSyncNode); taosReleaseRef(tsSyncRefId, pPeer->pSyncNode);
sDebug("%s, resource is freed", pPeer->id); sDebug("%s, resource is freed", pPeer->id);
taosTFree(pPeer->watchFd); taosTFree(pPeer->watchFd);
...@@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer); taosTmrReset(syncCheckPeerConnection, checkMs, pPeer, syncTmrCtrl, &pPeer->timer);
} }
syncAddNodeRef(pNode); taosAcquireRef(tsSyncRefId, pNode);
return pPeer; return pPeer;
} }
...@@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer *pPeer = param; SSyncPeer *pPeer = param;
SSyncNode *pNode = pPeer->pSyncNode; SSyncNode *pNode = pPeer->pSyncNode;
syncAddNodeRef(pNode); if (taosAcquireRef(tsSyncRefId, pNode) < 0) return;
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno));
...@@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) {
} }
pthread_mutex_unlock(&(pNode->mutex)); pthread_mutex_unlock(&(pNode->mutex));
syncDecNodeRef(pNode); taosReleaseRef(tsSyncRefId, pNode);
} }
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
...@@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code ...@@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
static void syncMonitorFwdInfos(void *param, void *tmrId) { static void syncMonitorFwdInfos(void *param, void *tmrId) {
SSyncNode *pNode = param; SSyncNode *pNode = param;
int ret = taosAcquireRef(tsSyncRefId, pNode);
if ( ret < 0) return;
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
if (pSyncFwds == NULL) return;
uint64_t time = taosGetTimestampMs(); if (pSyncFwds) {;
uint64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds > 0) { if (pSyncFwds->fwds > 0) {
pthread_mutex_lock(&(pNode->mutex)); pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pSyncFwds->fwds; ++i) { for (int i = 0; i < pSyncFwds->fwds; ++i) {
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo; SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + (pSyncFwds->first + i) % tsMaxFwdInfo;
if (time - pFwdInfo->time < 2000) break; if (time - pFwdInfo->time < 2000) break;
syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL); syncProcessFwdAck(pNode, pFwdInfo, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
syncRemoveConfirmedFwdInfo(pNode);
pthread_mutex_unlock(&(pNode->mutex));
} }
syncRemoveConfirmedFwdInfo(pNode); pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl);
pthread_mutex_unlock(&(pNode->mutex));
} }
pNode->pFwdTimer = taosTmrStart(syncMonitorFwdInfos, 300, pNode, syncTmrCtrl); taosReleaseRef(tsSyncRefId, pNode);
} }
static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int qtype) {
SSyncPeer *pPeer;
SSyncHead *pSyncHead;
SWalHead * pWalHead = data;
int fwdLen;
int32_t code = 0;
if (nodeRole == TAOS_SYNC_ROLE_SLAVE && pWalHead->version != nodeVersion + 1) {
sError("vgId:%d, received ver:%" PRIu64 ", inconsistent with last ver:%" PRIu64 ", restart connection", pNode->vgId,
pWalHead->version, nodeVersion);
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
syncRestartConnection(pPeer);
}
return TSDB_CODE_SYN_INVALID_VERSION;
}
// always update version
nodeVersion = pWalHead->version;
sDebug("vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole],
qtype, pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
// a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD;
pSyncHead->pversion = 0;
pSyncHead->len = sizeof(SWalHead) + pWalHead->len;
fwdLen = pSyncHead->len + sizeof(SSyncHead); // include the WAL and SYNC head
pthread_mutex_lock(&(pNode->mutex));
for (int i = 0; i < pNode->replica; ++i) {
pPeer = pNode->peerInfo[i];
if (pPeer == NULL || pPeer->peerFd < 0) continue;
if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue;
if (pNode->quorum > 1 && code == 0) {
syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
code = 1;
}
int retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) {
sDebug("%s, forward is sent, ver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else {
sError("%s, failed to forward, ver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
syncRestartConnection(pPeer);
}
}
pthread_mutex_unlock(&(pNode->mutex));
return code;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册