未验证 提交 dcd389e1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4318 from taosdata/feature/wal

TD-2153
...@@ -211,7 +211,7 @@ void sdbUpdateMnodeRoles() { ...@@ -211,7 +211,7 @@ void sdbUpdateMnodeRoles() {
if (tsSdbMgmt.sync <= 0) return; if (tsSdbMgmt.sync <= 0) return;
SNodesRole roles = {0}; SNodesRole roles = {0};
syncGetNodesRole(tsSdbMgmt.sync, &roles); if (syncGetNodesRole(tsSdbMgmt.sync, &roles) != 0) return;
sdbInfo("vgId:1, update mnodes role, replica:%d", tsSdbMgmt.cfg.replica); sdbInfo("vgId:1, update mnodes role, replica:%d", tsSdbMgmt.cfg.replica);
for (int32_t i = 0; i < tsSdbMgmt.cfg.replica; ++i) { for (int32_t i = 0; i < tsSdbMgmt.cfg.replica; ++i) {
......
...@@ -159,14 +159,14 @@ typedef struct SSyncNode { ...@@ -159,14 +159,14 @@ typedef struct SSyncNode {
SSyncFwds * pSyncFwds; // saved forward info if quorum >1 SSyncFwds * pSyncFwds; // saved forward info if quorum >1
void * pFwdTimer; void * pFwdTimer;
void * pRoleTimer; void * pRoleTimer;
FGetFileInfo getFileInfo; FGetFileInfo getFileInfo;
FGetWalInfo getWalInfo; FGetWalInfo getWalInfo;
FWriteToCache writeToCache; FWriteToCache writeToCache;
FConfirmForward confirmForward; FConfirmForward confirmForward;
FNotifyRole notifyRole; FNotifyRole notifyRole;
FNotifyFlowCtrl notifyFlowCtrl; FNotifyFlowCtrl notifyFlowCtrl;
FNotifyFileSynced notifyFileSynced; FNotifyFileSynced notifyFileSynced;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SSyncNode; } SSyncNode;
// sync module global // sync module global
......
...@@ -552,18 +552,16 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) { ...@@ -552,18 +552,16 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
} }
void syncBroadcastStatus(SSyncNode *pNode) { void syncBroadcastStatus(SSyncNode *pNode) {
SSyncPeer *pPeer; for (int32_t index = 0; index < pNode->replica; ++index) {
if (index == pNode->selfIndex) continue;
for (int32_t i = 0; i < pNode->replica; ++i) { SSyncPeer *pPeer = pNode->peerInfo[index];
if (i == pNode->selfIndex) continue;
pPeer = pNode->peerInfo[i];
syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId()); syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_BROADCAST, syncGenTranId());
} }
} }
static void syncResetFlowCtrl(SSyncNode *pNode) { static void syncResetFlowCtrl(SSyncNode *pNode) {
for (int32_t i = 0; i < pNode->replica; ++i) { for (int32_t index = 0; index < pNode->replica; ++index) {
pNode->peerInfo[i]->numOfRetrieves = 0; pNode->peerInfo[index]->numOfRetrieves = 0;
} }
if (pNode->notifyFlowCtrl) { if (pNode->notifyFlowCtrl) {
...@@ -1171,7 +1169,7 @@ static void syncProcessBrokenLink(void *param) { ...@@ -1171,7 +1169,7 @@ static void syncProcessBrokenLink(void *param) {
static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
SSyncFwds *pSyncFwds = pNode->pSyncFwds; SSyncFwds *pSyncFwds = pNode->pSyncFwds;
uint64_t time = taosGetTimestampMs(); int64_t time = taosGetTimestampMs();
if (pSyncFwds->fwds >= tsMaxFwdInfo) { if (pSyncFwds->fwds >= tsMaxFwdInfo) {
pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo; pSyncFwds->first = (pSyncFwds->first + 1) % tsMaxFwdInfo;
...@@ -1289,7 +1287,6 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1289,7 +1287,6 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int32_t fwdLen; int32_t fwdLen;
int32_t code = 0; int32_t code = 0;
if (pWalHead->version > nodeVersion + 1) { if (pWalHead->version > nodeVersion + 1) {
sError("vgId:%d, hver:%" PRIu64 ", inconsistent with sver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion); sError("vgId:%d, hver:%" PRIu64 ", inconsistent with sver:%" PRIu64, pNode->vgId, pWalHead->version, nodeVersion);
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
...@@ -1305,14 +1302,15 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1305,14 +1302,15 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// always update version // always update version
nodeVersion = pWalHead->version; nodeVersion = pWalHead->version;
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0;
// only pkt from RPC or CQ can be forwarded // only pkt from RPC or CQ can be forwarded
if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0; if (qtype != TAOS_QTYPE_RPC && qtype != TAOS_QTYPE_CQ) return 0;
sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica,
syncRole[nodeRole], qtypeStr[qtype], pWalHead->version);
// a hacker way to improve the performance // a hacker way to improve the performance
pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead)); pSyncHead = (SSyncHead *)(((char *)pWalHead) - sizeof(SSyncHead));
pSyncHead->type = TAOS_SMSG_FORWARD; pSyncHead->type = TAOS_SMSG_FORWARD;
...@@ -1332,7 +1330,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle ...@@ -1332,7 +1330,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
code = 1; code = 1;
} }
int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen); int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
if (retLen == fwdLen) { if (retLen == fwdLen) {
sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
} else { } else {
......
...@@ -438,7 +438,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) { ...@@ -438,7 +438,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn)); tstrncpy(firstPkt.fqdn, tsNodeFqdn, sizeof(firstPkt.fqdn));
firstPkt.port = tsSyncPort; firstPkt.port = tsSyncPort;
if (write(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) { if (taosWriteMsg(pPeer->syncFd, (char *)&firstPkt, sizeof(firstPkt)) < 0) {
sError("%s, failed to send syncCmd", pPeer->id); sError("%s, failed to send syncCmd", pPeer->id);
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册