vnodeSync.c 5.5 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23 24 25
static int32_t   vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
static int32_t   vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
static void      vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void      vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void      vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static int32_t   vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
M
Minghao Li 已提交
26

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
M
Minghao Li 已提交
39 40

  pVnode->sync = syncOpen(&syncInfo);
S
Shengliang Guan 已提交
41 42 43 44
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
M
Minghao Li 已提交
45 46 47 48 49 50 51

  setPingTimerMS(pVnode->sync, 3000);
  setElectTimerMS(pVnode->sync, 500);
  setHeartbeatTimerMS(pVnode->sync, 100);
  return 0;
}

S
Shengliang Guan 已提交
52 53
void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
54 55 56
  syncStart(pVnode->sync);
}

S
Shengliang Guan 已提交
57
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
M
Minghao Li 已提交
58

M
Minghao Li 已提交
59 60 61 62 63 64 65
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { 
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
  }
  return code;
}
M
Minghao Li 已提交
66

S
Shengliang Guan 已提交
67
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
68

S
Shengliang Guan 已提交
69 70
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
71 72 73
  return 0;
}

S
Shengliang Guan 已提交
74
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
75 76
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
  if (pFsm->FpGetSnapshot != NULL) {
S
Shengliang Guan 已提交
77
    SSnapshot snapshot = {0};
M
Minghao Li 已提交
78 79 80 81 82 83 84 85 86 87 88 89
    pFsm->FpGetSnapshot(pFsm, &snapshot);
    beginIndex = snapshot.lastApplyIndex;
  }

  if (cbMeta.index > beginIndex) {
    char logBuf[256];
    snprintf(
        logBuf, sizeof(logBuf),
        "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
        pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);

H
Hongze Cheng 已提交
90
    SVnode       *pVnode = (SVnode *)(pFsm->data);
91 92 93 94 95 96 97 98 99 100 101 102
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
    SRpcMsg       applyMsg;
    syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
    syncApplyMsgDestroy(pSyncApplyMsg);

    /*
        SRpcMsg applyMsg;
        applyMsg = *pMsg;
        applyMsg.pCont = rpcMallocCont(applyMsg.contLen);
        assert(applyMsg.contLen == pMsg->contLen);
        memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen);
    */
M
Minghao Li 已提交
103 104 105 106

    // recover handle for response
    SRpcMsg saveRpcMsg;
    int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
M
Minghao Li 已提交
107
    if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
108
      applyMsg.info = saveRpcMsg.info;
M
Minghao Li 已提交
109
    } else {
S
Shengliang Guan 已提交
110 111
      applyMsg.info.handle = NULL;
      applyMsg.info.ahandle = NULL;
M
Minghao Li 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
    }

    // put to applyQ
    tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg);

  } else {
    char logBuf[256];
    snprintf(logBuf, sizeof(logBuf),
             "==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
             "beginIndex :%ld\n",
             pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
             beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
  }
}

S
Shengliang Guan 已提交
128
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
129 130 131 132 133 134 135
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
           cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
136
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
137 138 139 140 141 142
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
143 144
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
145
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
146 147 148 149
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
  pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
M
Minghao Li 已提交
150
  pFsm->FpRestoreFinish = NULL;
M
Minghao Li 已提交
151
  return pFsm;
S
Shengliang Guan 已提交
152
}