mndSync.c 4.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

S
Shengliang Guan 已提交
20
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
M
Minghao Li 已提交
21

22
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
23

24
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
25 26 27 28
  SMnode    *pMnode = pFsm->data;
  SSdb      *pSdb = pMnode->pSdb;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  SSdbRaw   *pRaw = pMsg->pCont;
M
Minghao Li 已提交
29

S
Shengliang Guan 已提交
30
  mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
31 32 33
  sdbWriteWithoutFree(pSdb, pRaw);
  sdbSetApplyIndex(pSdb, cbMeta.index);
  sdbSetApplyTerm(pSdb, cbMeta.term);
34 35
  if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
    tsem_post(&pMgmt->syncSem);
M
Minghao Li 已提交
36 37 38
  }
}

39
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
40 41
  SMnode *pMnode = pFsm->data;
  pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
42
  pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
M
Minghao Li 已提交
43 44 45
  return 0;
}

46
void mndRestoreFinish(struct SSyncFSM *pFsm) {
47 48
  SMnode *pMnode = pFsm->data;
  mndTransPullup(pMnode);
49
  pMnode->syncMgmt.restored = true;
50 51
}

52 53
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
54
  pFsm->data = pMnode;
55
  pFsm->FpCommitCb = mndSyncCommitMsg;
56 57
  pFsm->FpPreCommitCb = NULL;
  pFsm->FpRollBackCb = NULL;
58
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
59 60
  pFsm->FpRestoreFinish = mndRestoreFinish;
  pFsm->FpRestoreSnapshot = NULL;
M
Minghao Li 已提交
61
  return pFsm;
62 63 64 65 66
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

67 68 69 70 71 72 73 74 75 76 77 78 79 80
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMgmt->pWal = walOpen(path, &cfg);
  if (pMgmt->pWal == NULL) {
81 82 83 84
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

85 86 87 88 89 90
  SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
  syncInfo.pWal = pMgmt->pWal;
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);

  SSyncCfg *pCfg = &syncInfo.syncCfg;
M
Minghao Li 已提交
91 92
  pCfg->replicaNum = pMnode->replica;
  pCfg->myIndex = pMnode->selfIndex;
93
  for (int32_t i = 0; i < pMnode->replica; ++i) {
94 95 96
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = pMnode->replicas[i].port;
M
Minghao Li 已提交
97 98
  }

99
  tsem_init(&pMgmt->syncSem, 0, 0);
100 101 102 103 104
  pMgmt->sync = syncOpen(&syncInfo);
  if (pMgmt->sync <= 0) {
    mError("failed to open sync since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
105

106
  mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
S
Shengliang Guan 已提交
107 108 109 110 111
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
112 113 114
  syncStop(pMgmt->sync);
  mDebug("sync:%" PRId64 " is stopped", pMgmt->sync);

115
  tsem_destroy(&pMgmt->syncSem);
116 117
  if (pMgmt->pWal != NULL) {
    walClose(pMgmt->pWal);
S
Shengliang Guan 已提交
118 119
  }

120 121
  memset(pMgmt, 0, sizeof(SSyncMgmt));
}
M
Minghao Li 已提交
122

123
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
124 125 126
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  pMgmt->errCode = 0;

127
  SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
128
  rsp.pCont = rpcMallocCont(rsp.contLen);
129
  if (rsp.pCont == NULL) return -1;
130
  memcpy(rsp.pCont, pRaw, rsp.contLen);
S
Shengliang Guan 已提交
131

132 133
  const bool isWeak = false;
  int32_t    code = syncPropose(pMgmt->sync, &rsp, isWeak);
134 135 136 137 138 139 140
  if (code == 0) {
    tsem_wait(&pMgmt->syncSem);
  } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
    terrno = TSDB_CODE_APP_NOT_READY;
  } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  } else {
141
    terrno = TSDB_CODE_APP_ERROR;
142
  }
143

144
  rpcFreeCont(rsp.pCont);
S
Shengliang Guan 已提交
145 146
  if (code != 0) return code;
  return pMgmt->errCode;
147 148
}

149
void mndSyncStart(SMnode *pMnode) {
150 151 152 153
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
  syncStart(pMgmt->sync);
  mDebug("sync:%" PRId64 " is started", pMgmt->sync);
154 155
}

156
void mndSyncStop(SMnode *pMnode) {}
157

S
Shengliang Guan 已提交
158
bool mndIsMaster(SMnode *pMnode) {
S
Shengliang Guan 已提交
159
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
160
  pMgmt->state = syncGetMyRole(pMgmt->sync);
161

162
  return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
L
Liu Jicong 已提交
163
}