mndSync.c 16.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndSync.h"
18
#include "mndCluster.h"
19
#include "mndTrans.h"
20

21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
    return -1;
  }

36
  int32_t code = tmsgPutToQueue(msgcb, SYNC_RD_QUEUE, pMsg);
37 38 39 40 41 42 43
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}

44
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
45 46 47 48
  if (pMsg == NULL || pMsg->pCont == NULL) {
    return -1;
  }

M
Minghao Li 已提交
49 50 51 52
  SMsgHead *pHead = pMsg->pCont;
  pHead->contLen = htonl(pHead->contLen);
  pHead->vgId = htonl(pHead->vgId);

53 54 55 56 57 58
  if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
    return -1;
  }

59 60 61 62 63 64
  int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
M
Minghao Li 已提交
65
}
M
Minghao Li 已提交
66

67 68 69 70 71 72 73 74
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
  int32_t code = tmsgSendReq(pEpSet, pMsg);
  if (code != 0) {
    rpcFreeCont(pMsg->pCont);
    pMsg->pCont = NULL;
  }
  return code;
}
M
Minghao Li 已提交
75

76
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
S
Shengliang Guan 已提交
77 78 79 80
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  SSdbRaw   *pRaw = pMsg->pCont;

S
Shengliang Guan 已提交
81
  int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
82
  mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
83
        " role:%s raw:%p sec:%d seq:%" PRId64,
S
Shengliang Guan 已提交
84
        transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
85
        pRaw, pMgmt->transSec, pMgmt->transSeq);
S
Shengliang Guan 已提交
86

87
  if (pMeta->code == 0) {
88 89 90 91 92
    int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
    if (code != 0) {
      mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
      return 0;
    }
S
Shengliang Guan 已提交
93
    sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
S
Shengliang Guan 已提交
94 95
  }

96
  taosThreadMutexLock(&pMgmt->lock);
97 98
  pMgmt->errCode = pMeta->code;

99
  if (transId <= 0) {
100
    taosThreadMutexUnlock(&pMgmt->lock);
101
    mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
102
  } else if (transId == pMgmt->transId) {
S
Shengliang Guan 已提交
103
    if (pMgmt->errCode != 0) {
104 105
      mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
    } else {
106
      mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
S
Shengliang Guan 已提交
107
    }
S
Shengliang Guan 已提交
108
    pMgmt->transId = 0;
109
    pMgmt->transSec = 0;
S
Shengliang Guan 已提交
110
    pMgmt->transSeq = 0;
S
Shengliang Guan 已提交
111
    tsem_post(&pMgmt->syncSem);
112
    taosThreadMutexUnlock(&pMgmt->lock);
113
  } else {
114
    taosThreadMutexUnlock(&pMgmt->lock);
115 116
    STrans *pTrans = mndAcquireTrans(pMnode, transId);
    if (pTrans != NULL) {
117 118
      mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
            transId, pTrans->createdTime, pMgmt->transId);
119
      mndTransExecute(pMnode, pTrans, false);
120
      mndReleaseTrans(pMnode, pTrans);
121 122
    } else {
      mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
123
    }
M
Minghao Li 已提交
124
  }
125

126
  sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
127
  return 0;
M
Minghao Li 已提交
128 129
}

130
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
131
  int32_t code = 0;
132 133 134
  pMsg->info.conn.applyIndex = pMeta->index;
  pMsg->info.conn.applyTerm = pMeta->term;

135 136 137 138
  if (pMsg->code == 0) {
    SMnode *pMnode = pFsm->data;
    atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
  }
139

140 141 142 143 144 145
  if (!syncUtilUserCommit(pMsg->msgType)) {
    goto _out;
  }
  code = mndProcessWriteMsg(pFsm, pMsg, pMeta);

_out:
146 147
  rpcFreeCont(pMsg->pCont);
  pMsg->pCont = NULL;
148
  return code;
149 150
}

151 152 153 154 155
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
  SMnode *pMnode = pFSM->data;
  return atomic_load_64(&pMnode->applied);
}

S
Shengliang Guan 已提交
156
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
157
  mInfo("start to read snapshot from sdb in atomic way");
158 159 160
  SMnode *pMnode = pFsm->data;
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
                      &pSnapshot->lastConfigIndex);
161 162 163
  return 0;
}

164
static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
165
  SMnode *pMnode = pFsm->data;
166
  sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
M
Minghao Li 已提交
167 168
}

169
void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) {
170
  SMnode *pMnode = pFsm->data;
S
Shengliang Guan 已提交
171

S
Shengliang Guan 已提交
172
  if (!pMnode->deploy) {
S
Shengliang Guan 已提交
173
    if (!pMnode->restored) {
174 175 176 177 178 179
      mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
      mndTransPullup(pMnode);
      mndSetRestored(pMnode, true);
    } else {
      mInfo("vgId:1, sync restore finished, repeat call");
    }
S
Shengliang Guan 已提交
180
  } else {
181
    mInfo("vgId:1, sync restore finished");
S
Shengliang Guan 已提交
182
  }
183 184

  ASSERT(commitIdx == mndSyncAppliedIndex(pFsm));
185 186
}

S
Shengliang Guan 已提交
187
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
188
  mInfo("start to read snapshot from sdb");
S
Shengliang Guan 已提交
189
  SMnode *pMnode = pFsm->data;
190
  return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
S
Shengliang Guan 已提交
191 192
}

193
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
194
  mInfo("stop to read snapshot from sdb");
S
Shengliang Guan 已提交
195
  SMnode *pMnode = pFsm->data;
196
  sdbStopRead(pMnode->pSdb, pReader);
S
Shengliang Guan 已提交
197 198
}

S
Shengliang Guan 已提交
199
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
S
Shengliang Guan 已提交
200 201 202 203
  SMnode *pMnode = pFsm->data;
  return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
}

S
Shengliang Guan 已提交
204
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
S
Shengliang Guan 已提交
205 206 207 208 209
  mInfo("start to apply snapshot to sdb");
  SMnode *pMnode = pFsm->data;
  return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}

S
Shengliang Guan 已提交
210
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
211
  mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
S
Shengliang Guan 已提交
212
        pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
S
Shengliang Guan 已提交
213
  SMnode *pMnode = pFsm->data;
214 215
  return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
                      pSnapshot->lastConfigIndex);
S
Shengliang Guan 已提交
216 217
}

S
Shengliang Guan 已提交
218
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
S
Shengliang Guan 已提交
219 220 221 222
  SMnode *pMnode = pFsm->data;
  return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
}

S
Shengliang Guan 已提交
223
static void mndBecomeFollower(const SSyncFSM *pFsm) {
224 225
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
226
  mInfo("vgId:1, become follower");
227

228 229 230 231
  taosThreadMutexLock(&pMgmt->lock);
  if (pMgmt->transId != 0) {
    mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
    pMgmt->transId = 0;
232
    pMgmt->transSec = 0;
S
Shengliang Guan 已提交
233
    pMgmt->transSeq = 0;
234 235
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
    tsem_post(&pMgmt->syncSem);
236
  }
237
  taosThreadMutexUnlock(&pMgmt->lock);
238 239
}

C
cadem 已提交
240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
static void mndBecomeLearner(const SSyncFSM *pFsm) {
  SMnode    *pMnode = pFsm->data;
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  mInfo("vgId:1, become learner");

  taosThreadMutexLock(&pMgmt->lock);
  if (pMgmt->transId != 0) {
    mInfo("vgId:1, become learner and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
    pMgmt->transId = 0;
    pMgmt->transSec = 0;
    pMgmt->transSeq = 0;
    pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
    tsem_post(&pMgmt->syncSem);
  }
  taosThreadMutexUnlock(&pMgmt->lock);
}

S
Shengliang Guan 已提交
257
static void mndBecomeLeader(const SSyncFSM *pFsm) {
258
  mInfo("vgId:1, become leader");
259
  SMnode *pMnode = pFsm->data;
260 261
}

262 263 264
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
  SMnode *pMnode = pFsm->data;

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
    return (itemSize == 0);
  } else {
    return true;
  }
}

static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
  SMnode *pMnode = pFsm->data;

  if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
    int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
    return itemSize;
  } else {
    return -1;
  }
282 283
}

284 285
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
  SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
M
Minghao Li 已提交
286
  pFsm->data = pMnode;
287
  pFsm->FpCommitCb = mndSyncCommitMsg;
288
  pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
289 290
  pFsm->FpPreCommitCb = NULL;
  pFsm->FpRollBackCb = NULL;
291
  pFsm->FpRestoreFinishCb = mndRestoreFinish;
S
Shengliang Guan 已提交
292
  pFsm->FpLeaderTransferCb = NULL;
293
  pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
294
  pFsm->FpApplyQueueItems = mndApplyQueueItems;
S
Shengliang Guan 已提交
295
  pFsm->FpReConfigCb = NULL;
296 297
  pFsm->FpBecomeLeaderCb = mndBecomeLeader;
  pFsm->FpBecomeFollowerCb = mndBecomeFollower;
C
cadem 已提交
298
  pFsm->FpBecomeLearnerCb = mndBecomeLearner;
S
Shengliang Guan 已提交
299
  pFsm->FpGetSnapshot = mndSyncGetSnapshot;
300
  pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
S
Shengliang Guan 已提交
301 302 303 304 305 306
  pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
  pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
  pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
  pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
  pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
  pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
M
Minghao Li 已提交
307
  return pFsm;
308 309 310 311
}

int32_t mndInitSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
312
  taosThreadMutexInit(&pMgmt->lock, NULL);
S
Shengliang Guan 已提交
313 314 315 316 317
  taosThreadMutexLock(&pMgmt->lock);
  pMgmt->transId = 0;
  pMgmt->transSec = 0;
  pMgmt->transSeq = 0;
  taosThreadMutexUnlock(&pMgmt->lock);
318

S
Shengliang Guan 已提交
319 320 321 322 323
  SSyncInfo syncInfo = {
      .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
      .batchSize = 1,
      .vgId = 1,
      .pWal = pMnode->pWal,
S
Shengliang Guan 已提交
324
      .msgcb = &pMnode->msgCb,
S
Shengliang Guan 已提交
325 326
      .syncSendMSg = mndSyncSendMsg,
      .syncEqMsg = mndSyncEqMsg,
327
      .syncEqCtrlMsg = mndSyncEqCtrlMsg,
S
Shengliang Guan 已提交
328 329 330
      .pingMs = 5000,
      .electMs = 3000,
      .heartbeatMs = 500,
S
Shengliang Guan 已提交
331 332
  };

333 334 335
  snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
  syncInfo.pFsm = mndSyncMakeFsm(pMnode);

S
Shengliang Guan 已提交
336
  mInfo("vgId:1, start to open sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
337
  SSyncCfg *pCfg = &syncInfo.syncCfg;
C
cadem 已提交
338
  pCfg->totalReplicaNum = pMgmt->numOfTotalReplicas;
339 340
  pCfg->replicaNum = pMgmt->numOfReplicas;
  pCfg->myIndex = pMgmt->selfIndex;
C
cadem 已提交
341 342
  pCfg->lastIndex = pMgmt->lastIndex;
  for (int32_t i = 0; i < pMgmt->numOfTotalReplicas; ++i) {
343
    SNodeInfo *pNode = &pCfg->nodeInfo[i];
344
    pNode->nodeId = pMgmt->replicas[i].id;
345
    pNode->nodePort = pMgmt->replicas[i].port;
346
    tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
C
cadem 已提交
347
    pNode->nodeRole = pMgmt->nodeRoles[i];
348
    (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
S
Shengliang Guan 已提交
349
    mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
350
          pNode->clusterId);
M
Minghao Li 已提交
351 352
  }

353
  tsem_init(&pMgmt->syncSem, 0, 0);
354 355 356 357 358
  pMgmt->sync = syncOpen(&syncInfo);
  if (pMgmt->sync <= 0) {
    mError("failed to open sync since %s", terrstr());
    return -1;
  }
359
  pMnode->pSdb->sync = pMgmt->sync;
M
Minghao Li 已提交
360

361
  mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
S
Shengliang Guan 已提交
362 363 364 365 366
  return 0;
}

void mndCleanupSync(SMnode *pMnode) {
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
367
  syncStop(pMgmt->sync);
368
  mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
369

370
  tsem_destroy(&pMgmt->syncSem);
371
  taosThreadMutexDestroy(&pMgmt->lock);
372 373
  memset(pMgmt, 0, sizeof(SSyncMgmt));
}
M
Minghao Li 已提交
374

375
void mndSyncCheckTimeout(SMnode *pMnode) {
376
  mTrace("check sync timeout");
377 378 379 380 381
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
  taosThreadMutexLock(&pMgmt->lock);
  if (pMgmt->transId != 0) {
    int32_t curSec = taosGetTimestampSec();
    int32_t delta = curSec - pMgmt->transSec;
S
Shengliang Guan 已提交
382 383 384
    if (delta > MNODE_TIMEOUT_SEC) {
      mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
             pMgmt->transSec, curSec, delta, pMgmt->transSeq);
385 386
      pMgmt->transId = 0;
      pMgmt->transSec = 0;
S
Shengliang Guan 已提交
387
      pMgmt->transSeq = 0;
388 389 390 391
      terrno = TSDB_CODE_SYN_TIMEOUT;
      pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
      tsem_post(&pMgmt->syncSem);
    } else {
S
Shengliang Guan 已提交
392 393
      mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
             pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
394 395
    }
  } else {
396
    // mTrace("check sync timeout msg, no trans waiting for confirm");
397 398 399 400
  }
  taosThreadMutexUnlock(&pMgmt->lock);
}

S
Shengliang Guan 已提交
401
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
S
Shengliang Guan 已提交
402
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
S
Shengliang Guan 已提交
403

S
Shengliang Guan 已提交
404
  SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
S
Shengliang Guan 已提交
405
  if (req.contLen <= 0) return -1;
S
Shengliang Guan 已提交
406

407 408 409
  req.pCont = rpcMallocCont(req.contLen);
  if (req.pCont == NULL) return -1;
  memcpy(req.pCont, pRaw, req.contLen);
S
Shengliang Guan 已提交
410

411
  taosThreadMutexLock(&pMgmt->lock);
412 413
  pMgmt->errCode = 0;

S
Shengliang Guan 已提交
414
  if (pMgmt->transId != 0 /* && pMgmt->transId != transId*/) {
S
Shengliang Guan 已提交
415
    mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
416
    taosThreadMutexUnlock(&pMgmt->lock);
S
Shengliang Guan 已提交
417
    rpcFreeCont(req.pCont);
418
    terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED;
419
    return terrno;
420
  }
S
Shengliang Guan 已提交
421

S
Shengliang Guan 已提交
422 423
  mInfo("trans:%d, will be proposed", transId);
  pMgmt->transId = transId;
424
  pMgmt->transSec = taosGetTimestampSec();
425

S
Shengliang Guan 已提交
426 427
  int64_t seq = 0;
  int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
428
  if (code == 0) {
S
Shengliang Guan 已提交
429 430 431
    mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
    pMgmt->transSeq = seq;
    taosThreadMutexUnlock(&pMgmt->lock);
432
    tsem_wait(&pMgmt->syncSem);
433 434 435
  } else if (code > 0) {
    mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
    pMgmt->transId = 0;
436
    pMgmt->transSec = 0;
S
Shengliang Guan 已提交
437
    pMgmt->transSeq = 0;
438
    taosThreadMutexUnlock(&pMgmt->lock);
439 440 441
    sdbWriteWithoutFree(pMnode->pSdb, pRaw);
    sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
    code = 0;
442
  } else {
443
    mError("trans:%d, failed to proposed since %s", transId, terrstr());
444
    pMgmt->transId = 0;
445
    pMgmt->transSec = 0;
S
Shengliang Guan 已提交
446
    pMgmt->transSeq = 0;
447
    taosThreadMutexUnlock(&pMgmt->lock);
448
    if (terrno == 0) {
449 450
      terrno = TSDB_CODE_APP_ERROR;
    }
451
  }
452

453
  rpcFreeCont(req.pCont);
S
Shengliang Guan 已提交
454
  req.pCont = NULL;
S
Shengliang Guan 已提交
455 456 457 458 459
  if (code != 0) {
    mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
    return code;
  }

S
Shengliang Guan 已提交
460
  terrno = pMgmt->errCode;
461
  return terrno;
462 463
}

464
void mndSyncStart(SMnode *pMnode) {
465
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;
B
Benguang Zhao 已提交
466
  if (syncStart(pMgmt->sync) < 0) {
467
    mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
B
Benguang Zhao 已提交
468 469
    return;
  }
470
  mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
471 472
}

S
Shengliang Guan 已提交
473
void mndSyncStop(SMnode *pMnode) {
474 475 476 477 478 479
  SSyncMgmt *pMgmt = &pMnode->syncMgmt;

  taosThreadMutexLock(&pMgmt->lock);
  if (pMgmt->transId != 0) {
    mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
    pMgmt->transId = 0;
480
    pMgmt->transSec = 0;
481
    pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
482
    tsem_post(&pMgmt->syncSem);
S
Shengliang Guan 已提交
483
  }
484
  taosThreadMutexUnlock(&pMgmt->lock);
S
Shengliang Guan 已提交
485
}
486

487
bool mndIsLeader(SMnode *pMnode) {
488
  terrno = 0;
489
  SSyncState state = syncGetState(pMnode->syncMgmt.sync);
490

491 492 493 494 495 496 497 498
  if (terrno != 0) {
    mDebug("vgId:1, mnode is stopping");
    return false;
  }

  if (state.state != TAOS_SYNC_STATE_LEADER) {
    terrno = TSDB_CODE_SYN_NOT_LEADER;
    mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
499 500 501
    return false;
  }

502 503 504
  if (!state.restored || !pMnode->restored) {
    terrno = TSDB_CODE_SYN_RESTORING;
    mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
505 506 507 508
    return false;
  }

  return true;
L
Liu Jicong 已提交
509
}