vnodeSync.c 5.4 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23 24 25
static int32_t   vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
static int32_t   vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
static void      vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void      vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void      vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static int32_t   vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
M
Minghao Li 已提交
26

S
Shengliang Guan 已提交
27 28 29 30 31 32 33 34 35 36 37 38
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
      .vgId = pVnode->config.vgId,
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
M
Minghao Li 已提交
39 40

  pVnode->sync = syncOpen(&syncInfo);
S
Shengliang Guan 已提交
41 42 43 44
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
M
Minghao Li 已提交
45 46 47 48 49 50 51

  setPingTimerMS(pVnode->sync, 3000);
  setElectTimerMS(pVnode->sync, 500);
  setHeartbeatTimerMS(pVnode->sync, 100);
  return 0;
}

S
Shengliang Guan 已提交
52 53
void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
M
Minghao Li 已提交
54 55 56
  syncStart(pVnode->sync);
}

S
Shengliang Guan 已提交
57
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
M
Minghao Li 已提交
58

S
Shengliang Guan 已提交
59
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
M
Minghao Li 已提交
60

S
Shengliang Guan 已提交
61
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
62

S
Shengliang Guan 已提交
63 64
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
65 66 67
  return 0;
}

S
Shengliang Guan 已提交
68
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
69 70
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
  if (pFsm->FpGetSnapshot != NULL) {
S
Shengliang Guan 已提交
71
    SSnapshot snapshot = {0};
M
Minghao Li 已提交
72 73 74 75 76 77 78 79 80 81 82 83
    pFsm->FpGetSnapshot(pFsm, &snapshot);
    beginIndex = snapshot.lastApplyIndex;
  }

  if (cbMeta.index > beginIndex) {
    char logBuf[256];
    snprintf(
        logBuf, sizeof(logBuf),
        "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
        pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);

H
Hongze Cheng 已提交
84
    SVnode       *pVnode = (SVnode *)(pFsm->data);
85 86 87 88 89 90 91 92 93 94 95 96
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
    SRpcMsg       applyMsg;
    syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
    syncApplyMsgDestroy(pSyncApplyMsg);

    /*
        SRpcMsg applyMsg;
        applyMsg = *pMsg;
        applyMsg.pCont = rpcMallocCont(applyMsg.contLen);
        assert(applyMsg.contLen == pMsg->contLen);
        memcpy(applyMsg.pCont, pMsg->pCont, applyMsg.contLen);
    */
M
Minghao Li 已提交
97 98 99 100

    // recover handle for response
    SRpcMsg saveRpcMsg;
    int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
M
Minghao Li 已提交
101
    if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
102
      applyMsg.info = saveRpcMsg.info;
M
Minghao Li 已提交
103
    } else {
S
Shengliang Guan 已提交
104 105
      applyMsg.info.handle = NULL;
      applyMsg.info.ahandle = NULL;
M
Minghao Li 已提交
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
    }

    // put to applyQ
    tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg);

  } else {
    char logBuf[256];
    snprintf(logBuf, sizeof(logBuf),
             "==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
             "beginIndex :%ld\n",
             pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
             beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
  }
}

S
Shengliang Guan 已提交
122
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
123 124 125 126 127 128 129
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
           cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
130
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
131 132 133 134 135 136
  char logBuf[256];
  snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
137 138
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
139
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
140 141 142 143
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
  pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
M
Minghao Li 已提交
144
  pFsm->FpRestoreFinish = NULL;
M
Minghao Li 已提交
145
  return pFsm;
S
Shengliang Guan 已提交
146
}