mndSync.c 7.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

S
Shengliang Guan 已提交
20
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
21 22 23 24
  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

S
Shengliang Guan 已提交
25
  return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
M
Minghao Li 已提交
26
}
M
Minghao Li 已提交
27

28
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
M
Minghao Li 已提交
29

30
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
S
Shengliang Guan 已提交
31 32 33 34
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  SSdbRaw   *pRaw = pMsg->pCont;

S
Shengliang Guan 已提交
35
  int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
S
Shengliang Guan 已提交
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
  pMgmt->errCode = cbMeta.code;
  mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId,
         pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw);

  if (pMgmt->errCode == 0) {
    sdbWriteWithoutFree(pMnode->pSdb, pRaw);
    sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
    sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
  }

  if (pMgmt->transId == transId) {
    if (pMgmt->errCode != 0) {
      mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
    }
    tsem_post(&pMgmt->syncSem);
M
Minghao Li 已提交
51 52 53
  }
}

54
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
55 56
  SMnode *pMnode = pFsm->data;
  pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
57
  pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
M
Minghao Li 已提交
58 59 60
  return 0;
}

61
void mndRestoreFinish(struct SSyncFSM *pFsm) {
62
  SMnode *pMnode = pFsm->data;
S
Shengliang Guan 已提交
63
  if (!pMnode->deploy) {
S
Shengliang Guan 已提交
64
    mInfo("mnode sync restore finished");
S
Shengliang Guan 已提交
65
    mndTransPullup(pMnode);
S
Shengliang Guan 已提交
66
    mndSetRestore(pMnode, true);
S
Shengliang Guan 已提交
67
  }
68 69
}

S
Shengliang Guan 已提交
70
int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void **ppIter, char **ppBuf, int32_t *len) {
71
  SMnode *pMnode = pFsm->data;
S
Shengliang Guan 已提交
72 73
  mInfo("start to read snapshot from sdb");

S
Shengliang Guan 已提交
74 75 76
  // sdbStartRead
  // sdbDoRead
  // sdbStopRead
77

S
Shengliang Guan 已提交
78
  return 0;
79 80
}

S
Shengliang Guan 已提交
81
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
82
  SMnode *pMnode = pFsm->data;
S
Shengliang Guan 已提交
83 84 85 86

  // sdbStartWrite
  // sdbDoWrite

S
Shengliang Guan 已提交
87
  mndSetRestore(pMnode, false);
S
Shengliang Guan 已提交
88
  mInfo("start to apply snapshot to sdb");
S
Shengliang Guan 已提交
89

S
Shengliang Guan 已提交
90 91 92
  // sdbStopWrite
  mInfo("successfully to apply snapshot to sdb");
  mndSetRestore(pMnode, true);
S
Shengliang Guan 已提交
93 94

  // taosMemoryFree(pBuf);
S
Shengliang Guan 已提交
95
  return 0;
96
}
S
Shengliang Guan 已提交
97 98

void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
S
Shengliang Guan 已提交
99 100 101 102 103 104 105 106 107 108 109 110 111
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

  pMgmt->errCode = cbMeta.code;
  mInfo("trans:-1, sync reconfig is proposed, savedTransId:%d code:0x%x, curTerm:%" PRId64 " term:%" PRId64,
        pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);

  if (pMgmt->transId == -1) {
    if (pMgmt->errCode != 0) {
      mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
    }
    tsem_post(&pMgmt->syncSem);
  }
112 113
}

114 115
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
116
  pFsm->data = pMnode;
117

118
  pFsm->FpCommitCb = mndSyncCommitMsg;
119 120
  pFsm->FpPreCommitCb = NULL;
  pFsm->FpRollBackCb = NULL;
121

122
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
123 124 125 126
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
  pFsm->FpSnapshotRead = mndSnapshotRead;
  pFsm->FpSnapshotApply = mndSnapshotApply;
  pFsm->FpReConfigCb = mndReConfig;
S
Shengliang Guan 已提交
127

M
Minghao Li 已提交
128
  return pFsm;
129 130 131 132 133
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

134 135 136 137 138 139 140 141 142 143 144 145 146 147
  char path[PATH_MAX + 20] = {0};
  snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
  SWalCfg cfg = {
      .vgId = 1,
      .fsyncPeriod = 0,
      .rollPeriod = -1,
      .segSize = -1,
      .retentionPeriod = -1,
      .retentionSize = -1,
      .level = TAOS_WAL_FSYNC,
  };

  pMgmt->pWal = walOpen(path, &cfg);
  if (pMgmt->pWal == NULL) {
148 149 150 151
    mError("failed to open wal since %s", terrstr());
    return -1;
  }

152 153 154 155
  SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
  syncInfo.pWal = pMgmt->pWal;
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
M
Minghao Li 已提交
156
  syncInfo.isStandBy = pMgmt->standby;
157 158

  SSyncCfg *pCfg = &syncInfo.syncCfg;
M
Minghao Li 已提交
159 160
  pCfg->replicaNum = pMnode->replica;
  pCfg->myIndex = pMnode->selfIndex;
S
Shengliang Guan 已提交
161
  mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby);
162
  for (int32_t i = 0; i < pMnode->replica; ++i) {
163 164 165
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = pMnode->replicas[i].port;
S
Shengliang Guan 已提交
166
    mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
M
Minghao Li 已提交
167 168
  }

169
  tsem_init(&pMgmt->syncSem, 0, 0);
170 171 172 173 174
  pMgmt->sync = syncOpen(&syncInfo);
  if (pMgmt->sync <= 0) {
    mError("failed to open sync since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
175

176
  mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
S
Shengliang Guan 已提交
177 178 179 180 181
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
182 183 184
  syncStop(pMgmt->sync);
  mDebug("sync:%" PRId64 " is stopped", pMgmt->sync);

185
  tsem_destroy(&pMgmt->syncSem);
186 187
  if (pMgmt->pWal != NULL) {
    walClose(pMgmt->pWal);
S
Shengliang Guan 已提交
188 189
  }

190 191
  memset(pMgmt, 0, sizeof(SSyncMgmt));
}
M
Minghao Li 已提交
192

S
Shengliang Guan 已提交
193
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
S
Shengliang Guan 已提交
194
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
S
Shengliang Guan 已提交
195
  SRpcMsg    rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
196
  rsp.pCont = rpcMallocCont(rsp.contLen);
197
  if (rsp.pCont == NULL) return -1;
198
  memcpy(rsp.pCont, pRaw, rsp.contLen);
S
Shengliang Guan 已提交
199

S
Shengliang Guan 已提交
200 201 202 203
  pMgmt->errCode = 0;
  pMgmt->transId = transId;
  mTrace("trans:%d, will be proposed", pMgmt->transId);

204 205
  const bool isWeak = false;
  int32_t    code = syncPropose(pMgmt->sync, &rsp, isWeak);
206 207 208 209 210 211 212
  if (code == 0) {
    tsem_wait(&pMgmt->syncSem);
  } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
    terrno = TSDB_CODE_APP_NOT_READY;
  } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  } else {
213
    terrno = TSDB_CODE_APP_ERROR;
214
  }
215

216
  rpcFreeCont(rsp.pCont);
S
Shengliang Guan 已提交
217 218 219 220 221
  if (code != 0) {
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
    return code;
  }

S
Shengliang Guan 已提交
222
  return pMgmt->errCode;
223 224
}

225
void mndSyncStart(SMnode *pMnode) {
226 227
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
M
Minghao Li 已提交
228

S
Shengliang Guan 已提交
229
  if (pMgmt->standby) {
M
Minghao Li 已提交
230 231 232 233
    syncStartStandBy(pMgmt->sync);
  } else {
    syncStart(pMgmt->sync);
  }
234
  mDebug("sync:%" PRId64 " is started, standby:%d", pMgmt->sync, pMgmt->standby);
235 236
}

237
void mndSyncStop(SMnode *pMnode) {}
238

S
Shengliang Guan 已提交
239
bool mndIsMaster(SMnode *pMnode) {
S
Shengliang Guan 已提交
240
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
241

242
  ESyncState state = syncGetMyRole(pMgmt->sync);
243 244 245 246 247
  if (state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    return false;
  }

S
Shengliang Guan 已提交
248
  if (!pMnode->restored) {
249 250 251 252 253
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }

  return true;
L
Liu Jicong 已提交
254
}