mndSync.c 11.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

20
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
M
Minghao Li 已提交
21 22 23 24
  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

25 26 27 28 29 30
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
M
Minghao Li 已提交
31
}
M
Minghao Li 已提交
32

33 34 35 36 37 38 39 40
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
41

42
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
S
Shengliang Guan 已提交
43 44 45 46
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  SSdbRaw   *pRaw = pMsg->pCont;

47 48 49 50
  // delete msg handle
  SRpcMsg rpcMsg = {0};
  syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);

S
Shengliang Guan 已提交
51
  int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
S
Shengliang Guan 已提交
52
  pMgmt->errCode = cbMeta.code;
53 54 55 56
  mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
         " role:%s raw:%p",
         transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
         pRaw);
S
Shengliang Guan 已提交
57 58 59

  if (pMgmt->errCode == 0) {
    sdbWriteWithoutFree(pMnode->pSdb, pRaw);
60
    sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
S
Shengliang Guan 已提交
61 62
  }

63
  taosRLockLatch(&pMgmt->lock);
64
  if (transId <= 0) {
65
    taosRUnLockLatch(&pMgmt->lock);
66 67
    mError("trans:%d, invalid commit msg", transId);
  } else if (transId == pMgmt->transId) {
68
    taosRUnLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
69 70 71
    if (pMgmt->errCode != 0) {
      mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
    }
S
Shengliang Guan 已提交
72
    pMgmt->transId = 0;
S
Shengliang Guan 已提交
73
    tsem_post(&pMgmt->syncSem);
74
  } else {
75
    taosRUnLockLatch(&pMgmt->lock);
76 77
    STrans *pTrans = mndAcquireTrans(pMnode, transId);
    if (pTrans != NULL) {
78
      mDebug("trans:%d, execute in mnode which not leader", transId);
79 80
      mndTransExecute(pMnode, pTrans);
      mndReleaseTrans(pMnode, pTrans);
81 82 83
      // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
    } else {
      mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
84
    }
M
Minghao Li 已提交
85 86 87
  }
}

88
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
S
Shengliang Guan 已提交
89
  mDebug("start to read snapshot from sdb in atomic way");
90 91 92
  SMnode *pMnode = pFsm->data;
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
                      &pSnapshot->lastConfigIndex);
93 94 95 96
  return 0;
}

int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
97
  SMnode *pMnode = pFsm->data;
98
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
M
Minghao Li 已提交
99 100 101
  return 0;
}

102
void mndRestoreFinish(struct SSyncFSM *pFsm) {
103
  SMnode *pMnode = pFsm->data;
S
Shengliang Guan 已提交
104

S
Shengliang Guan 已提交
105
  if (!pMnode->deploy) {
106
    mInfo("mnode sync restore finished, and will handle outstanding transactions");
S
Shengliang Guan 已提交
107
    mndTransPullup(pMnode);
S
Shengliang Guan 已提交
108
    mndSetRestore(pMnode, true);
S
Shengliang Guan 已提交
109
  } else {
S
Shengliang Guan 已提交
110
    mInfo("mnode sync restore finished");
S
Shengliang Guan 已提交
111
  }
112 113
}

114
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
S
Shengliang Guan 已提交
115 116 117 118
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

  pMgmt->errCode = cbMeta.code;
119 120
  mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
        cbMeta.code, cbMeta.index, cbMeta.term);
S
Shengliang Guan 已提交
121

122
  taosWLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
123 124 125 126
  if (pMgmt->transId == -1) {
    if (pMgmt->errCode != 0) {
      mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
    }
S
Shengliang Guan 已提交
127
    pMgmt->transId = 0;
S
Shengliang Guan 已提交
128
    tsem_post(&pMgmt->syncSem);
S
Shengliang Guan 已提交
129
  }
130
  taosWUnLockLatch(&pMgmt->lock);
131 132
}

133
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
S
Shengliang Guan 已提交
134
  mDebug("start to read snapshot from sdb");
S
Shengliang Guan 已提交
135
  SMnode *pMnode = pFsm->data;
136
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
S
Shengliang Guan 已提交
137 138 139
}

int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
S
Shengliang Guan 已提交
140
  mDebug("stop to read snapshot from sdb");
S
Shengliang Guan 已提交
141 142 143 144 145 146 147 148 149
  SMnode *pMnode = pFsm->data;
  return sdbStopRead(pMnode->pSdb, pReader);
}

int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
  SMnode *pMnode = pFsm->data;
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
}

150
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
S
Shengliang Guan 已提交
151 152 153 154 155
  mInfo("start to apply snapshot to sdb");
  SMnode *pMnode = pFsm->data;
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}

156
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
157
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
S
Shengliang Guan 已提交
158
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
S
Shengliang Guan 已提交
159
  SMnode *pMnode = pFsm->data;
160 161
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
                      pSnapshot->lastConfigIndex);
S
Shengliang Guan 已提交
162 163 164 165 166 167 168
}

int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
  SMnode *pMnode = pFsm->data;
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
}

169 170 171
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
  SMnode *pMnode = pFsm->data;
  atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
172
  mDebug("vgId:1, mnode leader transfer finish");
173 174
}

175 176 177 178
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
  SMnode *pMnode = pFsm->data;
  mDebug("vgId:1, become follower");

179
  taosWLockLatch(&pMnode->syncMgmt.lock);
180 181 182 183
  if (pMnode->syncMgmt.transId != 0) {
    pMnode->syncMgmt.transId = 0;
    tsem_post(&pMnode->syncMgmt.syncSem);
  }
184
  taosWUnLockLatch(&pMnode->syncMgmt.lock);
185 186 187 188
}

static void mndBecomeLeader(struct SSyncFSM *pFsm) {
  mDebug("vgId:1, become leader");
189 190
  SMnode *pMnode = pFsm->data;

191
  taosWLockLatch(&pMnode->syncMgmt.lock);
192 193 194 195
  if (pMnode->syncMgmt.transId != 0) {
    pMnode->syncMgmt.transId = 0;
    tsem_post(&pMnode->syncMgmt.syncSem);
  }
196
  taosWUnLockLatch(&pMnode->syncMgmt.lock);
197 198
}

199 200
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
201
  pFsm->data = pMnode;
202
  pFsm->FpCommitCb = mndSyncCommitMsg;
203 204
  pFsm->FpPreCommitCb = NULL;
  pFsm->FpRollBackCb = NULL;
205
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
206
  pFsm->FpLeaderTransferCb = mndLeaderTransfer;
207
  pFsm->FpReConfigCb = mndReConfig;
208 209
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
S
Shengliang Guan 已提交
210
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
211
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
S
Shengliang Guan 已提交
212 213 214 215 216 217
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
M
Minghao Li 已提交
218
  return pFsm;
219 220 221 222
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
223 224
  taosInitRWLatch(&pMgmt->lock);
  pMgmt->transId = 0;
225

226 227
  SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
228
  syncInfo.pWal = pMnode->pWal;
229
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);
M
Minghao Li 已提交
230
  syncInfo.isStandBy = pMgmt->standby;
M
Minghao Li 已提交
231
  syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
232

S
Shengliang Guan 已提交
233 234 235 236 237 238 239 240
  mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
  if (pMgmt->standby || pMgmt->replica.id > 0) {
    SSyncCfg *pCfg = &syncInfo.syncCfg;
    pCfg->replicaNum = 1;
    pCfg->myIndex = 0;
    SNodeInfo *pNode = &pCfg->nodeInfo[0];
    tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = pMgmt->replica.port;
S
Shengliang Guan 已提交
241
    mInfo("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
M
Minghao Li 已提交
242 243
  }

244
  tsem_init(&pMgmt->syncSem, 0, 0);
245 246 247 248 249
  pMgmt->sync = syncOpen(&syncInfo);
  if (pMgmt->sync <= 0) {
    mError("failed to open sync since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
250

251
  // decrease election timer
M
Minghao Li 已提交
252
  setPingTimerMS(pMgmt->sync, 5000);
253 254 255
  setElectTimerMS(pMgmt->sync, 600);
  setHeartbeatTimerMS(pMgmt->sync, 300);

S
Shengliang Guan 已提交
256
  mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
S
Shengliang Guan 已提交
257 258 259 260 261
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
262
  syncStop(pMgmt->sync);
S
Shengliang Guan 已提交
263
  mDebug("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
264

265
  tsem_destroy(&pMgmt->syncSem);
266 267
  memset(pMgmt, 0, sizeof(SSyncMgmt));
}
M
Minghao Li 已提交
268

S
Shengliang Guan 已提交
269
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
S
Shengliang Guan 已提交
270
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
271 272 273 274
  SRpcMsg    req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
  req.pCont = rpcMallocCont(req.contLen);
  if (req.pCont == NULL) return -1;
  memcpy(req.pCont, pRaw, req.contLen);
S
Shengliang Guan 已提交
275

S
Shengliang Guan 已提交
276
  pMgmt->errCode = 0;
277
  taosWLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
278
  pMgmt->transId = transId;
279
  taosWUnLockLatch(&pMgmt->lock);
S
Shengliang Guan 已提交
280 281
  mTrace("trans:%d, will be proposed", pMgmt->transId);

282
  const bool isWeak = false;
283
  int32_t    code = syncPropose(pMgmt->sync, &req, isWeak);
284

285 286
  if (code == 0) {
    tsem_wait(&pMgmt->syncSem);
M
Minghao Li 已提交
287
  } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
288
    terrno = TSDB_CODE_APP_NOT_READY;
M
Minghao Li 已提交
289
  } else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {
290 291
    terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
  } else {
292
    terrno = TSDB_CODE_APP_ERROR;
293
  }
294

295
  rpcFreeCont(req.pCont);
S
Shengliang Guan 已提交
296 297 298 299 300
  if (code != 0) {
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
    return code;
  }

S
Shengliang Guan 已提交
301
  return pMgmt->errCode;
302 303
}

304
void mndSyncStart(SMnode *pMnode) {
305 306
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
M
Minghao Li 已提交
307 308
  syncStart(pMgmt->sync);
  mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
309 310
}

S
Shengliang Guan 已提交
311
void mndSyncStop(SMnode *pMnode) {
312
  taosWLockLatch(&pMnode->syncMgmt.lock);
S
Shengliang Guan 已提交
313
  if (pMnode->syncMgmt.transId != 0) {
S
Shengliang Guan 已提交
314
    pMnode->syncMgmt.transId = 0;
S
Shengliang Guan 已提交
315 316
    tsem_post(&pMnode->syncMgmt.syncSem);
  }
317
  taosWUnLockLatch(&pMnode->syncMgmt.lock);
S
Shengliang Guan 已提交
318
}
319

S
Shengliang Guan 已提交
320
bool mndIsMaster(SMnode *pMnode) {
S
Shengliang Guan 已提交
321
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
322

S
Shengliang Guan 已提交
323
  if (!syncIsReady(pMgmt->sync)) {
324 325
    // get terrno from syncIsReady
    // terrno = TSDB_CODE_SYN_NOT_LEADER;
326 327 328
    return false;
  }

S
Shengliang Guan 已提交
329
  if (!pMnode->restored) {
330 331 332 333 334
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }

  return true;
L
Liu Jicong 已提交
335
}