mndSync.c 5.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

20
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
M
Minghao Li 已提交
21

22
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
23

24
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
25 26 27 28
  SMnode    *pMnode = pFsm->data;
  SSdb      *pSdb = pMnode->pSdb;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  SSdbRaw   *pRaw = pMsg->pCont;
M
Minghao Li 已提交
29

30 31 32 33
  SSnapshot snapshot = {0};
  (*pFsm->FpGetSnapshot)(pFsm, &snapshot);

  if (cbMeta.index > snapshot.lastApplyIndex) {
34
    mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state));
35 36
    sdbWriteWithoutFree(pSdb, pRaw);
    sdbSetApplyIndex(pSdb, cbMeta.index);
M
Minghao Li 已提交
37 38 39
    if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
      tsem_post(&pMgmt->syncSem);
    }
40
  } else {
41
    mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, snapshot.lastApplyIndex);
M
Minghao Li 已提交
42 43 44
  }
}

45
static void mndSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
46 47 48
  // strict consistent, do nothing
}

49
static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
M
Minghao Li 已提交
50 51 52
  // strict consistent, do nothing
}

53 54 55
static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
  SMnode *pMnode = pFsm->data;
  pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
56
  pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
M
Minghao Li 已提交
57 58 59
  return 0;
}

60 61
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
62
  pFsm->data = pMnode;
63 64 65 66
  pFsm->FpCommitCb = mndSyncCommitMsg;
  pFsm->FpPreCommitCb = mndSyncPreCommitMsg;
  pFsm->FpRollBackCb = mndSyncRollBackMsg;
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
M
Minghao Li 已提交
67
  return pFsm;
68 69 70 71 72
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

73 74 75 76 77 78 79 80 81 82 83 84 85 86
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMgmt->pWal = walOpen(path, &cfg);
  if (pMgmt->pWal == NULL) {
87 88 89 90
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

91 92 93 94 95 96
  SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
  syncInfo.pWal = pMgmt->pWal;
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);

  SSyncCfg *pCfg = &syncInfo.syncCfg;
M
Minghao Li 已提交
97 98
  pCfg->replicaNum = pMnode->replica;
  pCfg->myIndex = pMnode->selfIndex;
99
  for (int32_t i = 0; i < pMnode->replica; ++i) {
100 101 102
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = pMnode->replicas[i].port;
M
Minghao Li 已提交
103 104
  }

105
  tsem_init(&pMgmt->syncSem, 0, 0);
106 107 108 109 110
  pMgmt->sync = syncOpen(&syncInfo);
  if (pMgmt->sync <= 0) {
    mError("failed to open sync since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
111

S
Shengliang Guan 已提交
112 113 114 115 116
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
117
  tsem_destroy(&pMgmt->syncSem);
118 119
  if (pMgmt->pWal != NULL) {
    walClose(pMgmt->pWal);
S
Shengliang Guan 已提交
120 121
  }

122 123
  memset(pMgmt, 0, sizeof(SSyncMgmt));
}
M
Minghao Li 已提交
124

125
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
126 127 128
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->errCode = 0;

129
  SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
130
  rsp.pCont = rpcMallocCont(rsp.contLen);
131
  if (rsp.pCont == NULL) return -1;
132
  memcpy(rsp.pCont, pRaw, rsp.contLen);
S
Shengliang Guan 已提交
133

134 135
  const bool isWeak = false;
  int32_t    code = syncPropose(pMgmt->sync, &rsp, isWeak);
136 137 138 139 140 141 142
  if (code == 0) {
    tsem_wait(&pMgmt->syncSem);
  } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
    terrno = TSDB_CODE_APP_NOT_READY;
  } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  } else {
143
    terrno = TSDB_CODE_APP_ERROR;
144
  }
145

S
Shengliang Guan 已提交
146 147
  if (code != 0) return code;
  return pMgmt->errCode;
148 149
}

150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
void mndSyncStart(SMnode *pMnode) {
  SSdb   *pSdb = pMnode->pSdb;
  int64_t lastApplyIndex = sdbGetApplyIndex(pSdb);

  syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb);
  syncStart(pMnode->syncMgmt.sync);

  int64_t applyIndex = sdbGetApplyIndex(pSdb);
  mndTransPullup(pMnode);
  mDebug("pullup trans finished, applyIndex:%" PRId64, applyIndex);
  if (applyIndex != lastApplyIndex) {
    mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastApplyIndex, applyIndex);
    sdbWriteFile(pSdb);
  }

  pMnode->syncMgmt.restored = true;
}

void mndSyncStop(SMnode *pMnode) { syncStop(pMnode->syncMgmt.sync); }

S
Shengliang Guan 已提交
170
bool mndIsMaster(SMnode *pMnode) {
S
Shengliang Guan 已提交
171
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
172
  pMgmt->state = syncGetMyRole(pMgmt->sync);
S
Shengliang Guan 已提交
173
  return pMgmt->state == TAOS_SYNC_STATE_LEADER;
L
Liu Jicong 已提交
174
}
175 176

bool mndIsRestored(SMnode *pMnode) { return pMnode->syncMgmt.restored; }