vnodeSync.c 6.8 KB
Newer Older
M
Minghao Li 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
H
Hongze Cheng 已提交
17
#include "vnd.h"
M
Minghao Li 已提交
18

S
Shengliang Guan 已提交
19 20 21 22 23 24 25
static int32_t   vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
static int32_t   vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
static void      vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void      vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static void      vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
static int32_t   vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
M
Minghao Li 已提交
26

S
Shengliang Guan 已提交
27 28 29
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
  SSyncInfo syncInfo = {
      .vgId = pVnode->config.vgId,
30
      .isStandBy = pVnode->config.standby,
S
Shengliang Guan 已提交
31 32 33 34 35 36 37 38 39
      .syncCfg = pVnode->config.syncCfg,
      .pWal = pVnode->pWal,
      .msgcb = NULL,
      .FpSendMsg = vnodeSyncSendMsg,
      .FpEqMsg = vnodeSyncEqMsg,
  };

  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
  syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
M
Minghao Li 已提交
40 41

  pVnode->sync = syncOpen(&syncInfo);
S
Shengliang Guan 已提交
42 43 44 45
  if (pVnode->sync <= 0) {
    vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
    return -1;
  }
M
Minghao Li 已提交
46 47 48 49 50 51 52

  setPingTimerMS(pVnode->sync, 3000);
  setElectTimerMS(pVnode->sync, 500);
  setHeartbeatTimerMS(pVnode->sync, 100);
  return 0;
}

S
Shengliang Guan 已提交
53
int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
54 55 56
  SAlterVnodeReq req = {0};
  if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
    terrno = TSDB_CODE_INVALID_MSG;
S
Shengliang Guan 已提交
57
    return TSDB_CODE_INVALID_MSG;
58 59 60 61 62 63 64 65 66 67 68
  }

  vInfo("vgId:%d, start to alter vnode replica to %d", TD_VID(pVnode), req.replica);
  SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
  for (int32_t r = 0; r < req.replica; ++r) {
    SNodeInfo *pNode = &cfg.nodeInfo[r];
    tstrncpy(pNode->nodeFqdn, req.replicas[r].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = req.replicas[r].port;
    vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
  }

S
Shengliang Guan 已提交
69 70 71
  int32_t code = syncReconfig(pVnode->sync, &cfg);
  if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
    // todo refactor
72
    SRpcMsg rsp = {.info = pMsg->info, .code = 0};
73
    tmsgSendRsp(&rsp);
S
Shengliang Guan 已提交
74
    return TSDB_CODE_ACTION_IN_PROGRESS;
75
  }
S
Shengliang Guan 已提交
76 77

  return code;
78 79
}

S
Shengliang Guan 已提交
80 81
void vnodeSyncStart(SVnode *pVnode) {
  syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
82 83 84 85 86
  if (pVnode->config.standby) {
    syncStartStandBy(pVnode->sync);
  } else {
    syncStart(pVnode->sync);
  }
M
Minghao Li 已提交
87 88
}

S
Shengliang Guan 已提交
89
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
M
Minghao Li 已提交
90

91
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
92 93 94
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
95
    pMsg->pCont = NULL;
M
Minghao Li 已提交
96 97 98
  }
  return code;
}
M
Minghao Li 已提交
99

100 101 102 103 104 105 106 107
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
108

S
Shengliang Guan 已提交
109 110
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
  vnodeGetSnapshot(pFsm->data, pSnapshot);
M
Minghao Li 已提交
111 112 113
  return 0;
}

114 115 116 117 118 119 120
void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
  SVnode *pVnode = pFsm->data;
  vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode));

  // todo rpc response here
}

S
Shengliang Guan 已提交
121
void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
122 123
  SyncIndex beginIndex = SYNC_INDEX_INVALID;
  if (pFsm->FpGetSnapshot != NULL) {
S
Shengliang Guan 已提交
124
    SSnapshot snapshot = {0};
M
Minghao Li 已提交
125 126 127 128 129
    pFsm->FpGetSnapshot(pFsm, &snapshot);
    beginIndex = snapshot.lastApplyIndex;
  }

  if (cbMeta.index > beginIndex) {
130
    char logBuf[256] = {0};
M
Minghao Li 已提交
131 132 133 134 135 136
    snprintf(
        logBuf, sizeof(logBuf),
        "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
        pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);

137
    SVnode       *pVnode = pFsm->data;
138 139 140 141 142
    SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
    SRpcMsg       applyMsg;
    syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
    syncApplyMsgDestroy(pSyncApplyMsg);

M
Minghao Li 已提交
143 144 145
    // recover handle for response
    SRpcMsg saveRpcMsg;
    int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
M
Minghao Li 已提交
146
    if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
147
      applyMsg.info = saveRpcMsg.info;
M
Minghao Li 已提交
148
    } else {
S
Shengliang Guan 已提交
149 150
      applyMsg.info.handle = NULL;
      applyMsg.info.ahandle = NULL;
M
Minghao Li 已提交
151 152 153 154 155 156
    }

    // put to applyQ
    tmsgPutToQueue(&(pVnode->msgCb), APPLY_QUEUE, &applyMsg);

  } else {
157
    char logBuf[256] = {0};
M
Minghao Li 已提交
158 159 160 161 162 163 164 165 166
    snprintf(logBuf, sizeof(logBuf),
             "==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
             "beginIndex :%ld\n",
             pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
             beginIndex);
    syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
  }
}

S
Shengliang Guan 已提交
167
void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
168
  char logBuf[256] = {0};
M
Minghao Li 已提交
169 170 171 172 173 174
  snprintf(logBuf, sizeof(logBuf),
           "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
           cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
175
void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
176
  char logBuf[256] = {0};
M
Minghao Li 已提交
177 178 179 180 181
  snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
           pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
  syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}

S
Shengliang Guan 已提交
182 183
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
184
  pFsm->data = pVnode;
S
Shengliang Guan 已提交
185 186 187 188
  pFsm->FpCommitCb = vnodeSyncCommitMsg;
  pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
  pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
  pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
189
  pFsm->FpRestoreFinishCb = NULL;
190
  pFsm->FpReConfigCb = vnodeSyncReconfig;
191

M
Minghao Li 已提交
192
  return pFsm;
S
Shengliang Guan 已提交
193
}