mndSync.c 12.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndTrans.h"
19

20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
    return -1;
  }

  int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

43
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
44 45 46 47
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

M
Minghao Li 已提交
48 49 50 51
  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

52 53 54 55 56 57
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
    return -1;
  }

58 59 60 61 62 63
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
M
Minghao Li 已提交
64
}
M
Minghao Li 已提交
65

66 67 68 69 70 71 72 73
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
74

S
Shengliang Guan 已提交
75
void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
76 77 78 79
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  SSdbRaw   *pRaw = pMsg->pCont;

80 81
  // delete msg handle
  SRpcMsg rpcMsg = {0};
S
Shengliang Guan 已提交
82
  rpcMsg.info = pMsg->info;
83

S
Shengliang Guan 已提交
84
  int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
S
Shengliang Guan 已提交
85
  pMgmt->errCode = pMeta->code;
86
  mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
H
Hongze Cheng 已提交
87
        " role:%s raw:%p",
S
Shengliang Guan 已提交
88
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
H
Hongze Cheng 已提交
89
        pRaw);
S
Shengliang Guan 已提交
90 91 92

  if (pMgmt->errCode == 0) {
    sdbWriteWithoutFree(pMnode->pSdb, pRaw);
S
Shengliang Guan 已提交
93
    sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
S
Shengliang Guan 已提交
94 95
  }

96
  taosWLockLatch(&pMgmt->lock);
97
  if (transId <= 0) {
98
    taosWUnLockLatch(&pMgmt->lock);
99 100
    mError("trans:%d, invalid commit msg", transId);
  } else if (transId == pMgmt->transId) {
S
Shengliang Guan 已提交
101
    if (pMgmt->errCode != 0) {
102 103
      mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
    } else {
S
Shengliang Guan 已提交
104
      mInfo("trans:%d, is proposed and post sem", transId);
S
Shengliang Guan 已提交
105
    }
S
Shengliang Guan 已提交
106
    pMgmt->transId = 0;
S
Shengliang Guan 已提交
107
    tsem_post(&pMgmt->syncSem);
108
    taosWUnLockLatch(&pMgmt->lock);
109
  } else {
110
    taosWUnLockLatch(&pMgmt->lock);
111 112
    STrans *pTrans = mndAcquireTrans(pMnode, transId);
    if (pTrans != NULL) {
113
      mInfo("trans:%d, execute in mnode which not leader", transId);
114 115
      mndTransExecute(pMnode, pTrans);
      mndReleaseTrans(pMnode, pTrans);
116 117 118
      // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
    } else {
      mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
119
    }
M
Minghao Li 已提交
120 121 122
  }
}

S
Shengliang Guan 已提交
123
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
124
  mInfo("start to read snapshot from sdb in atomic way");
125 126 127
  SMnode *pMnode = pFsm->data;
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
                      &pSnapshot->lastConfigIndex);
128 129 130
  return 0;
}

S
Shengliang Guan 已提交
131
int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
132
  SMnode *pMnode = pFsm->data;
133
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
M
Minghao Li 已提交
134 135 136
  return 0;
}

S
Shengliang Guan 已提交
137
void mndRestoreFinish(const SSyncFSM *pFsm) {
138
  SMnode *pMnode = pFsm->data;
S
Shengliang Guan 已提交
139

S
Shengliang Guan 已提交
140
  if (!pMnode->deploy) {
141
    mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
S
Shengliang Guan 已提交
142
    mndTransPullup(pMnode);
143
    mndSetRestored(pMnode, true);
S
Shengliang Guan 已提交
144
  } else {
145
    mInfo("vgId:1, sync restore finished");
S
Shengliang Guan 已提交
146
  }
147 148
}

S
Shengliang Guan 已提交
149
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
150
  mInfo("start to read snapshot from sdb");
S
Shengliang Guan 已提交
151
  SMnode *pMnode = pFsm->data;
152
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
S
Shengliang Guan 已提交
153 154
}

S
Shengliang Guan 已提交
155
int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
156
  mInfo("stop to read snapshot from sdb");
S
Shengliang Guan 已提交
157 158 159 160
  SMnode *pMnode = pFsm->data;
  return sdbStopRead(pMnode->pSdb, pReader);
}

S
Shengliang Guan 已提交
161
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
S
Shengliang Guan 已提交
162 163 164 165
  SMnode *pMnode = pFsm->data;
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
}

S
Shengliang Guan 已提交
166
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
S
Shengliang Guan 已提交
167 168 169 170 171
  mInfo("start to apply snapshot to sdb");
  SMnode *pMnode = pFsm->data;
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}

S
Shengliang Guan 已提交
172
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
173
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
S
Shengliang Guan 已提交
174
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
S
Shengliang Guan 已提交
175
  SMnode *pMnode = pFsm->data;
176 177
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
                      pSnapshot->lastConfigIndex);
S
Shengliang Guan 已提交
178 179
}

S
Shengliang Guan 已提交
180
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
S
Shengliang Guan 已提交
181 182 183 184
  SMnode *pMnode = pFsm->data;
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
}

S
Shengliang Guan 已提交
185
static void mndBecomeFollower(const SSyncFSM *pFsm) {
186
  SMnode *pMnode = pFsm->data;
187
  mInfo("vgId:1, become follower");
188

189
  taosWLockLatch(&pMnode->syncMgmt.lock);
190
  if (pMnode->syncMgmt.transId != 0) {
191 192
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader",
          pMnode->syncMgmt.transId);
193
    pMnode->syncMgmt.transId = 0;
194
    pMnode->syncMgmt.errCode = TSDB_CODE_SYN_NOT_LEADER;
195 196
    tsem_post(&pMnode->syncMgmt.syncSem);
  }
197
  taosWUnLockLatch(&pMnode->syncMgmt.lock);
198 199
}

S
Shengliang Guan 已提交
200
static void mndBecomeLeader(const SSyncFSM *pFsm) {
201
  mInfo("vgId:1, become leader");
202
  SMnode *pMnode = pFsm->data;
203 204
}

205 206 207 208 209 210 211
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
  SMnode *pMnode = pFsm->data;

  int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
  return (itemSize == 0);
}

212 213
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
214
  pFsm->data = pMnode;
215
  pFsm->FpCommitCb = mndSyncCommitMsg;
216 217
  pFsm->FpPreCommitCb = NULL;
  pFsm->FpRollBackCb = NULL;
218
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
S
Shengliang Guan 已提交
219
  pFsm->FpLeaderTransferCb = NULL;
220
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
S
Shengliang Guan 已提交
221
  pFsm->FpReConfigCb = NULL;
222 223
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
S
Shengliang Guan 已提交
224
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
225
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
S
Shengliang Guan 已提交
226 227 228 229 230 231
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
M
Minghao Li 已提交
232
  return pFsm;
233 234 235 236
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
237 238
  taosInitRWLatch(&pMgmt->lock);
  pMgmt->transId = 0;
239

S
Shengliang Guan 已提交
240 241 242 243 244
  SSyncInfo syncInfo = {
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
      .batchSize = 1,
      .vgId = 1,
      .pWal = pMnode->pWal,
S
Shengliang Guan 已提交
245
      .msgcb = &pMnode->msgCb,
S
Shengliang Guan 已提交
246 247
      .syncSendMSg = mndSyncSendMsg,
      .syncEqMsg = mndSyncEqMsg,
248
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
S
Shengliang Guan 已提交
249 250 251
      .pingMs = 5000,
      .electMs = 3000,
      .heartbeatMs = 500,
S
Shengliang Guan 已提交
252 253
  };

254 255 256
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);

S
Shengliang Guan 已提交
257
  mInfo("vgId:1, start to open sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
258 259 260 261 262 263 264 265
  SSyncCfg *pCfg = &syncInfo.syncCfg;
  pCfg->replicaNum = pMgmt->numOfReplicas;
  pCfg->myIndex = pMgmt->selfIndex;
  for (int32_t i = 0; i < pMgmt->numOfReplicas; ++i) {
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
    pNode->nodePort = pMgmt->replicas[i].port;
    mInfo("vgId:1, index:%d ep:%s:%u", i, pNode->nodeFqdn, pNode->nodePort);
M
Minghao Li 已提交
266 267
  }

268
  tsem_init(&pMgmt->syncSem, 0, 0);
269 270 271 272 273
  pMgmt->sync = syncOpen(&syncInfo);
  if (pMgmt->sync <= 0) {
    mError("failed to open sync since %s", terrstr());
    return -1;
  }
M
Minghao Li 已提交
274

275
  mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
S
Shengliang Guan 已提交
276 277 278 279 280
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
281
  syncStop(pMgmt->sync);
282
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
283

284
  tsem_destroy(&pMgmt->syncSem);
285 286
  memset(pMgmt, 0, sizeof(SSyncMgmt));
}
M
Minghao Li 已提交
287

S
Shengliang Guan 已提交
288
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
S
Shengliang Guan 已提交
289
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
S
Shengliang Guan 已提交
290
  pMgmt->errCode = 0;
S
Shengliang Guan 已提交
291

S
Shengliang Guan 已提交
292
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
S
Shengliang Guan 已提交
293 294
  if (req.contLen <= 0) return -1;
  
295 296 297
  req.pCont = rpcMallocCont(req.contLen);
  if (req.pCont == NULL) return -1;
  memcpy(req.pCont, pRaw, req.contLen);
S
Shengliang Guan 已提交
298

299
  taosWLockLatch(&pMgmt->lock);
300
  if (pMgmt->transId != 0) {
S
Shengliang Guan 已提交
301
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
302 303 304 305
    taosWUnLockLatch(&pMgmt->lock);
    terrno = TSDB_CODE_APP_NOT_READY;
    return -1;
  }
S
Shengliang Guan 已提交
306

S
Shengliang Guan 已提交
307 308 309
  mInfo("trans:%d, will be proposed", transId);
  pMgmt->transId = transId;
  taosWUnLockLatch(&pMgmt->lock);
310

S
Shengliang Guan 已提交
311
  int32_t code = syncPropose(pMgmt->sync, &req, false);
312
  if (code == 0) {
313
    mInfo("trans:%d, is proposing and wait sem", pMgmt->transId);
314
    tsem_wait(&pMgmt->syncSem);
315 316 317 318 319 320 321 322
  } else if (code > 0) {
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
    taosWLockLatch(&pMgmt->lock);
    pMgmt->transId = 0;
    taosWUnLockLatch(&pMgmt->lock);
    sdbWriteWithoutFree(pMnode->pSdb, pRaw);
    sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
    code = 0;
323
  } else {
324
    mInfo("trans:%d, failed to proposed since %s", transId, terrstr());
S
Shengliang Guan 已提交
325
    taosWLockLatch(&pMgmt->lock);
326 327 328 329 330 331 332
    pMgmt->transId = 0;
    taosWUnLockLatch(&pMgmt->lock);
    if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
      terrno = TSDB_CODE_APP_NOT_READY;
    } else {
      terrno = TSDB_CODE_APP_ERROR;
    }
333
  }
334

335
  rpcFreeCont(req.pCont);
S
Shengliang Guan 已提交
336 337 338 339 340
  if (code != 0) {
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
    return code;
  }

S
Shengliang Guan 已提交
341
  terrno = pMgmt->errCode;
S
Shengliang Guan 已提交
342
  return pMgmt->errCode;
343 344
}

345
void mndSyncStart(SMnode *pMnode) {
346
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
M
Minghao Li 已提交
347
  syncStart(pMgmt->sync);
348
  mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
349 350
}

S
Shengliang Guan 已提交
351
void mndSyncStop(SMnode *pMnode) {
352
  taosWLockLatch(&pMnode->syncMgmt.lock);
S
Shengliang Guan 已提交
353
  if (pMnode->syncMgmt.transId != 0) {
354
    mInfo("vgId:1, is stopped and post sem, trans:%d", pMnode->syncMgmt.transId);
S
Shengliang Guan 已提交
355
    pMnode->syncMgmt.transId = 0;
S
Shengliang Guan 已提交
356 357
    tsem_post(&pMnode->syncMgmt.syncSem);
  }
358
  taosWUnLockLatch(&pMnode->syncMgmt.lock);
S
Shengliang Guan 已提交
359
}
360

361
bool mndIsLeader(SMnode *pMnode) {
362
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
363

364 365 366 367 368 369 370
  if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) {
    if (state.state != TAOS_SYNC_STATE_LEADER) {
      terrno = TSDB_CODE_SYN_NOT_LEADER;
    } else {
      terrno = TSDB_CODE_APP_NOT_READY;
    }
    mDebug("vgId:1, mnode not ready, state:%s, restore:%d", syncStr(state.state), state.restored);
371 372 373
    return false;
  }

374
  if (!mndGetRestored(pMnode)) {
375 376 377 378 379
    terrno = TSDB_CODE_APP_NOT_READY;
    return false;
  }

  return true;
L
Liu Jicong 已提交
380
}