mndTrans.c 33.5 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define MND_TRANS_VER_NUMBER 1
#define MND_TRANS_ARRAY_SIZE 8
#define MND_TRANS_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
27
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
S
Shengliang Guan 已提交
28 29
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
30
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
S
Shengliang Guan 已提交
32 33
static void    mndTransDropLogs(SArray *pArray);
static void    mndTransDropActions(SArray *pArray);
34
static void    mndTransDropData(STrans *pTrans);
S
Shengliang Guan 已提交
35
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
36
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
S
Shengliang Guan 已提交
37 38 39 40
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50 51
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
52
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
53
static void    mndTransSendRpcRsp(STrans *pTrans);
S
Shengliang Guan 已提交
54
static int32_t mndProcessTransReq(SMnodeMsg *pMsg);
S
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

S
Shengliang Guan 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
S
Shengliang Guan 已提交
66 67 68 69 70 71
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
72 73
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
74
  int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE;
S
Shengliang Guan 已提交
75 76 77 78 79 80
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

81
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
82
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
83
    rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
S
Shengliang Guan 已提交
84 85
  }

86
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
87
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
88
    rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
S
Shengliang Guan 已提交
89 90
  }

91
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
92
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
93
    rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
S
Shengliang Guan 已提交
94 95
  }

S
Shengliang Guan 已提交
96 97
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
S
Shengliang Guan 已提交
98
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
99 100 101 102
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
S
Shengliang Guan 已提交
103
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
104 105
  }

S
Shengliang Guan 已提交
106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, MND_TRANS_VER_NUMBER, rawDataLen);
S
Shengliang Guan 已提交
107
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
108
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
109 110 111 112
    return NULL;
  }

  int32_t dataPos = 0;
113 114
  SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
115
  SDB_SET_INT8(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
116 117 118 119 120
  SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
121

122
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
123
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
124
    int32_t  len = sdbGetRawTotalSize(pTmp);
125 126
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
127 128
  }

129
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
130
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
131
    int32_t  len = sdbGetRawTotalSize(pTmp);
132 133
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
134 135
  }

136
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
137
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
138
    int32_t  len = sdbGetRawTotalSize(pTmp);
139 140
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
141 142
  }

S
Shengliang Guan 已提交
143 144
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
145 146
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
147
    SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
148 149
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
150 151 152 153
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
154 155
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
156
    SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
157 158 159 160 161 162 163 164 165 166 167 168 169 170
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
  }

  SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)

  terrno = 0;

TRANS_ENCODE_OVER:
  if (terrno != 0) {
    mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
171 172
  }

S
Shengliang Guan 已提交
173
  mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
S
Shengliang Guan 已提交
174 175 176
  return pRaw;
}

S
Shengliang Guan 已提交
177
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
  terrno = TSDB_CODE_OUT_OF_MEMORY;

  SSdbRow     *pRow = NULL;
  STrans      *pTrans = NULL;
  char        *pData = NULL;
  int32_t      dataLen = 0;
  int8_t       sver = 0;
  int32_t      redoLogNum = 0;
  int32_t      undoLogNum = 0;
  int32_t      commitLogNum = 0;
  int32_t      redoActionNum = 0;
  int32_t      undoActionNum = 0;
  int32_t      dataPos = 0;
  STransAction action = {0};

  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
194

S
Shengliang Guan 已提交
195
  if (sver != MND_TRANS_VER_NUMBER) {
S
Shengliang Guan 已提交
196
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
197
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
198 199
  }

200 201 202 203 204
  pRow = sdbAllocRow(sizeof(STrans));
  if (pRow == NULL) goto TRANS_DECODE_OVER;

  pTrans = sdbGetRowObj(pRow);
  if (pTrans == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
205

S
Shengliang Guan 已提交
206 207 208 209 210
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
211

212 213 214 215 216
  if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
217

218 219
  SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
220
  SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->stage, TRANS_DECODE_OVER)
221 222 223 224 225
  SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
226

227
  for (int32_t i = 0; i < redoLogNum; ++i) {
228 229 230
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
231
    mTrace("raw:%p, is created", pData);
232 233 234
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
235 236
  }

S
Shengliang Guan 已提交
237
  for (int32_t i = 0; i < undoLogNum; ++i) {
238 239 240
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
241
    mTrace("raw:%p, is created", pData);
242 243 244
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
245 246 247
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
248 249
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
S
Shengliang Guan 已提交
250 251
    if (pData == NULL) goto TRANS_DECODE_OVER;
    mTrace("raw:%p, is created", pData);
252 253 254
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
255 256 257
  }

  for (int32_t i = 0; i < redoActionNum; ++i) {
258 259
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
260
    SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
261
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
262
    action.pCont = malloc(action.contLen);
263 264 265 266
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
267 268 269
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
270 271
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
272
    SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
273
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
274
    action.pCont = malloc(action.contLen);
275 276 277 278
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
279 280
  }

281 282 283
  SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285
TRANS_DECODE_OVER:
286 287 288 289 290 291
  if (terrno != 0) {
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
    mndTransDropData(pTrans);
    tfree(pRow);
    tfree(pData);
    tfree(action.pCont);
S
Shengliang Guan 已提交
292 293 294
    return NULL;
  }

S
Shengliang Guan 已提交
295
  mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
S
Shengliang Guan 已提交
296 297 298
  return pRow;
}

S
Shengliang Guan 已提交
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
static const char *mndTransStr(ETrnStage stage) {
  switch (stage) {
    case TRN_STAGE_PREPARE:
      return "prepare";
    case TRN_STAGE_REDO_LOG:
      return "redoLog";
    case TRN_STAGE_REDO_ACTION:
      return "redoAction";
    case TRN_STAGE_COMMIT:
      return "commit";
    case TRN_STAGE_COMMIT_LOG:
      return "commitLog";
    case TRN_STAGE_UNDO_ACTION:
      return "undoAction";
    case TRN_STAGE_UNDO_LOG:
      return "undoLog";
    case TRN_STAGE_ROLLBACK:
      return "rollback";
    case TRN_STAGE_FINISHED:
      return "finished";
    default:
      return "invalid";
  }
}

S
Shengliang Guan 已提交
324
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
325 326
  // pTrans->stage = TRN_STAGE_PREPARE;
  mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
S
Shengliang Guan 已提交
327 328 329
  return 0;
}

330
static void mndTransDropData(STrans *pTrans) {
S
Shengliang Guan 已提交
331 332 333 334 335
  mndTransDropLogs(pTrans->redoLogs);
  mndTransDropLogs(pTrans->undoLogs);
  mndTransDropLogs(pTrans->commitLogs);
  mndTransDropActions(pTrans->redoActions);
  mndTransDropActions(pTrans->undoActions);
S
Shengliang Guan 已提交
336 337 338 339 340
  if (pTrans->rpcRsp != NULL) {
    rpcFreeCont(pTrans->rpcRsp);
    pTrans->rpcRsp = NULL;
    pTrans->rpcRspLen = 0;
  }
341
}
S
Shengliang Guan 已提交
342

343
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
344
  mTrace("trans:%d, perform delete action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
345
  mndTransDropData(pTrans);
S
Shengliang Guan 已提交
346 347 348
  return 0;
}

S
Shengliang Guan 已提交
349
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
S
Shengliang Guan 已提交
350 351 352 353
  if (pNew->stage == TRN_STAGE_COMMIT) {
    pNew->stage = TRN_STAGE_COMMIT_LOG;
    mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
  }
354

S
Shengliang Guan 已提交
355 356
  mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
         mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
S
Shengliang Guan 已提交
357
  pOld->stage = pNew->stage;
S
Shengliang Guan 已提交
358 359 360
  return 0;
}

S
Shengliang Guan 已提交
361
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
S
Shengliang Guan 已提交
362 363 364 365 366 367
  SSdb   *pSdb = pMnode->pSdb;
  STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
  if (pTrans == NULL) {
    terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
  }
  return pTrans;
S
Shengliang Guan 已提交
368 369
}

S
Shengliang Guan 已提交
370
static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
371 372 373 374
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

S
Shengliang Guan 已提交
375
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
S
Shengliang Guan 已提交
376 377 378 379 380 381 382
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
383
  pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
S
Shengliang Guan 已提交
384 385
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
S
Shengliang Guan 已提交
386 387
  pTrans->rpcHandle = pReq->handle;
  pTrans->rpcAHandle = pReq->ahandle;
S
Shengliang Guan 已提交
388 389 390 391 392
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
393 394 395 396 397 398 399 400

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
401
  mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
402 403 404
  return pTrans;
}

S
Shengliang Guan 已提交
405
static void mndTransDropLogs(SArray *pArray) {
S
Shengliang Guan 已提交
406 407
  int32_t size = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < size; ++i) {
S
Shengliang Guan 已提交
408
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
409
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
410 411 412 413 414
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
415
static void mndTransDropActions(SArray *pArray) {
S
Shengliang Guan 已提交
416 417
  int32_t size = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < size; ++i) {
S
Shengliang Guan 已提交
418
    STransAction *pAction = taosArrayGet(pArray, i);
S
Shengliang Guan 已提交
419
    tfree(pAction->pCont);
S
Shengliang Guan 已提交
420 421 422 423 424
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
425
void mndTransDrop(STrans *pTrans) {
S
Shengliang 已提交
426 427 428 429 430
  if (pTrans != NULL) {
    mndTransDropData(pTrans);
    mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
    tfree(pTrans);
  }
S
Shengliang Guan 已提交
431 432
}

S
Shengliang Guan 已提交
433
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
434 435 436 437 438
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
439
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
440 441 442 443 444 445 446 447
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
448
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
S
Shengliang Guan 已提交
449

S
Shengliang Guan 已提交
450
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
S
Shengliang Guan 已提交
451

S
Shengliang Guan 已提交
452
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
S
Shengliang Guan 已提交
453

S
Shengliang Guan 已提交
454
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
455
  void *ptr = taosArrayPush(pArray, pAction);
S
Shengliang Guan 已提交
456 457 458 459 460 461 462 463
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
464
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
465
  return mndTransAppendAction(pTrans->redoActions, pAction);
S
Shengliang Guan 已提交
466 467
}

S
Shengliang Guan 已提交
468
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
469
  return mndTransAppendAction(pTrans->undoActions, pAction);
S
Shengliang Guan 已提交
470 471
}

S
Shengliang Guan 已提交
472 473 474 475 476
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
  pTrans->rpcRsp = pCont;
  pTrans->rpcRspLen = contLen;
}

S
Shengliang Guan 已提交
477
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
478
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
479
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
480
    mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
481 482
    return -1;
  }
S
Shengliang Guan 已提交
483
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
484

S
Shengliang Guan 已提交
485
  mDebug("trans:%d, sync to other nodes", pTrans->id);
S
Shengliang Guan 已提交
486 487 488 489
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
490 491 492
    return -1;
  }

S
Shengliang Guan 已提交
493
  mDebug("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
494

S
Shengliang Guan 已提交
495 496 497 498 499 500
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
501 502 503 504 505 506 507 508 509 510 511
  return 0;
}

int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, prepare transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    return -1;
  }
  mDebug("trans:%d, prepare finished", pTrans->id);

S
Shengliang Guan 已提交
512 513
  STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
  if (pNew == NULL) {
S
Shengliang Guan 已提交
514
    mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
515 516 517
    return -1;
  }

S
Shengliang Guan 已提交
518 519
  pNew->rpcHandle = pTrans->rpcHandle;
  pNew->rpcAHandle = pTrans->rpcAHandle;
S
Shengliang Guan 已提交
520 521 522 523 524
  pNew->rpcRsp = pTrans->rpcRsp;
  pNew->rpcRspLen = pTrans->rpcRspLen;
  pTrans->rpcRsp = NULL;
  pTrans->rpcRspLen = 0;

S
Shengliang Guan 已提交
525 526
  mndTransExecute(pMnode, pNew);
  mndReleaseTrans(pMnode, pNew);
S
Shengliang Guan 已提交
527 528 529
  return 0;
}

S
Shengliang Guan 已提交
530 531
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
S
Shengliang Guan 已提交
532

S
Shengliang Guan 已提交
533 534 535
  mDebug("trans:%d, commit transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
536
    return -1;
S
Shengliang Guan 已提交
537 538
  }
  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
539 540 541
  return 0;
}

S
Shengliang Guan 已提交
542
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
543
  mDebug("trans:%d, rollback transaction", pTrans->id);
S
Shengliang Guan 已提交
544 545
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
546
    return -1;
S
Shengliang Guan 已提交
547
  }
S
Shengliang Guan 已提交
548 549
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
550
}
S
Shengliang Guan 已提交
551

S
Shengliang Guan 已提交
552
static void mndTransSendRpcRsp(STrans *pTrans) {
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
  bool sendRsp = false;

  if (pTrans->stage == TRN_STAGE_FINISHED) {
    sendRsp = true;
  }

  if (pTrans->policy == TRN_POLICY_ROLLBACK) {
    if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
        pTrans->stage == TRN_STAGE_ROLLBACK) {
      sendRsp = true;
    }
  }

  if (pTrans->policy == TRN_POLICY_RETRY) {
    if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) {
      sendRsp = true;
569
    }
S
Shengliang Guan 已提交
570
  }
571 572

  if (sendRsp && pTrans->rpcHandle != NULL) {
S
Shengliang Guan 已提交
573 574 575 576 577 578
    void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
    if (rpcCont != NULL) {
      memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
    }
    free(pTrans->rpcRsp);

579 580
    mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
           pTrans->rpcAHandle);
S
Shengliang Guan 已提交
581 582 583
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
                      .code = pTrans->code,
                      .ahandle = pTrans->rpcAHandle,
S
Shengliang Guan 已提交
584
                      .pCont = rpcCont,
S
Shengliang Guan 已提交
585
                      .contLen = pTrans->rpcRspLen};
586 587
    rpcSendResponse(&rspMsg);
    pTrans->rpcHandle = NULL;
S
Shengliang Guan 已提交
588 589
    pTrans->rpcRsp = NULL;
    pTrans->rpcRspLen = 0;
590
  }
S
Shengliang Guan 已提交
591 592
}

S
Shengliang Guan 已提交
593 594 595
void mndTransProcessRsp(SMnodeMsg *pRsp) {
  SMnode *pMnode = pRsp->pMnode;
  int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle);
S
Shengliang Guan 已提交
596 597
  int32_t transId = (int32_t)(signature >> 32);
  int32_t action = (int32_t)((signature << 32) >> 32);
598 599 600 601 602 603 604 605

  STrans *pTrans = mndAcquireTrans(pMnode, transId);
  if (pTrans == NULL) {
    mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
    goto HANDLE_ACTION_RSP_OVER;
  }

  SArray *pArray = NULL;
S
Shengliang Guan 已提交
606
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
607
    pArray = pTrans->redoActions;
S
Shengliang Guan 已提交
608
  } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
609 610
    pArray = pTrans->undoActions;
  } else {
S
Shengliang Guan 已提交
611 612
    mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage);
    goto HANDLE_ACTION_RSP_OVER;
613 614 615
  }

  if (pArray == NULL) {
S
Shengliang Guan 已提交
616
    mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage);
617 618 619 620
    goto HANDLE_ACTION_RSP_OVER;
  }

  int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
S
Shengliang Guan 已提交
621
  if (action < 0 || action >= actionNum) {
622 623 624 625 626 627 628
    mError("trans:%d, invalid action:%d", transId, action);
    goto HANDLE_ACTION_RSP_OVER;
  }

  STransAction *pAction = taosArrayGet(pArray, action);
  if (pAction != NULL) {
    pAction->msgReceived = 1;
S
Shengliang Guan 已提交
629
    pAction->errCode = pRsp->rpcMsg.code;
630 631
  }

S
Shengliang Guan 已提交
632
  mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
633
         pAction->acceptableCode);
634 635 636 637 638 639
  mndTransExecute(pMnode, pTrans);

HANDLE_ACTION_RSP_OVER:
  mndReleaseTrans(pMnode, pTrans);
}

S
Shengliang Guan 已提交
640
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
S
Shengliang Guan 已提交
641 642 643
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

S
Shengliang Guan 已提交
644 645
  if (arraySize == 0) return 0;

S
Shengliang Guan 已提交
646
  for (int32_t i = 0; i < arraySize; ++i) {
647
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
648 649 650
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
651 652 653 654 655 656
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
657
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
658
  return mndTransExecuteLogs(pMnode, pTrans->redoLogs);
S
Shengliang Guan 已提交
659 660 661
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
662
  return mndTransExecuteLogs(pMnode, pTrans->undoLogs);
S
Shengliang Guan 已提交
663 664 665
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
666
  return mndTransExecuteLogs(pMnode, pTrans->commitLogs);
S
Shengliang Guan 已提交
667
}
S
Shengliang Guan 已提交
668

S
Shengliang Guan 已提交
669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;

    pAction->msgSent = 0;
    pAction->msgReceived = 0;
    pAction->errCode = 0;
    mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
  }
}

static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
685 686 687 688 689
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
S
Shengliang Guan 已提交
690
    if (pAction->msgSent) continue;
S
Shengliang Guan 已提交
691

692 693 694 695 696
    int64_t signature = pTrans->id;
    signature = (signature << 32);
    signature += action;

    SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature};
S
Shengliang Guan 已提交
697 698 699 700
    rpcMsg.pCont = rpcMallocCont(pAction->contLen);
    if (rpcMsg.pCont == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
S
Shengliang Guan 已提交
701
    }
S
Shengliang Guan 已提交
702
    memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
703

S
Shengliang Guan 已提交
704 705 706 707 708 709
    if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) {
      mDebug("trans:%d, action:%d is sent", pTrans->id, action);
      pAction->msgSent = 1;
      pAction->msgReceived = 0;
      pAction->errCode = 0;
    } else {
710
      mDebug("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
S
Shengliang Guan 已提交
711 712
      return -1;
    }
S
Shengliang Guan 已提交
713 714
  }

S
Shengliang Guan 已提交
715 716 717 718 719 720 721 722 723 724 725
  return 0;
}

static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);
  if (numOfActions == 0) return 0;

  if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
726 727
  int32_t numOfReceived = 0;
  int32_t errCode = 0;
728 729 730 731
  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived) {
S
Shengliang Guan 已提交
732
      numOfReceived++;
733
      if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
S
Shengliang Guan 已提交
734
        errCode = pAction->errCode;
735 736 737 738
      }
    }
  }

S
Shengliang Guan 已提交
739
  if (numOfReceived == numOfActions) {
S
Shengliang Guan 已提交
740 741 742 743 744 745 746 747 748
    if (errCode == 0) {
      mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
      return 0;
    } else {
      mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
      mndTransResetActions(pMnode, pTrans, pArray);
      terrno = errCode;
      return errCode;
    }
749
  } else {
S
Shengliang Guan 已提交
750
    mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode);
751 752
    return TSDB_CODE_MND_ACTION_IN_PROGRESS;
  }
S
Shengliang Guan 已提交
753 754
}

S
Shengliang Guan 已提交
755
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
756
  return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
S
Shengliang Guan 已提交
757
}
S
Shengliang Guan 已提交
758

S
Shengliang Guan 已提交
759
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
760
  return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
S
Shengliang Guan 已提交
761
}
S
Shengliang Guan 已提交
762

S
Shengliang Guan 已提交
763 764 765 766 767 768 769 770 771
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = true;
  pTrans->stage = TRN_STAGE_REDO_LOG;
  mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
  return continueExec;
}

static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
772
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
773

S
Shengliang Guan 已提交
774
  if (code == 0) {
S
Shengliang Guan 已提交
775 776 777
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_REDO_ACTION;
    mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
S
Shengliang Guan 已提交
778
  } else {
S
Shengliang Guan 已提交
779 780
    pTrans->code = terrno;
    pTrans->stage = TRN_STAGE_UNDO_LOG;
781
    mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
782
  }
S
Shengliang Guan 已提交
783 784

  return continueExec;
S
Shengliang Guan 已提交
785 786
}

S
Shengliang Guan 已提交
787 788
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
789
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
790 791

  if (code == 0) {
S
Shengliang Guan 已提交
792
    pTrans->code = 0;
S
Shengliang Guan 已提交
793
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
794 795
    mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
796
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
797 798
    mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
    continueExec = false;
S
Shengliang Guan 已提交
799
  } else {
S
Shengliang Guan 已提交
800
    pTrans->code = terrno;
S
Shengliang Guan 已提交
801
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
802 803 804
      pTrans->stage = TRN_STAGE_UNDO_ACTION;
      mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr());
      continueExec = true;
S
Shengliang Guan 已提交
805
    } else {
S
Shengliang Guan 已提交
806 807 808
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
S
Shengliang Guan 已提交
809 810 811
    }
  }

S
Shengliang Guan 已提交
812
  return continueExec;
S
Shengliang Guan 已提交
813 814
}

S
Shengliang Guan 已提交
815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransCommit(pMnode, pTrans);

  if (code == 0) {
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_COMMIT_LOG;
    mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
    continueExec = true;
  } else {
    pTrans->code = terrno;
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_REDO_ACTION;
      mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
             pTrans->failedTimes);
      continueExec = true;
    } else {
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
    }
  }

  return continueExec;
S
Shengliang Guan 已提交
839 840
}

S
Shengliang Guan 已提交
841 842 843
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
844 845

  if (code == 0) {
S
Shengliang Guan 已提交
846 847 848 849
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
850
  } else {
S
Shengliang Guan 已提交
851 852 853 854 855 856 857 858 859 860 861 862 863 864
    pTrans->code = terrno;
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);

  if (code == 0) {
S
Shengliang Guan 已提交
865
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
866 867 868
    mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
    continueExec = true;
  } else {
S
Shengliang Guan 已提交
869
    mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
870 871 872 873 874 875 876 877 878 879 880
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
881
    pTrans->stage = TRN_STAGE_UNDO_LOG;
S
Shengliang Guan 已提交
882 883 884
    mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
    continueExec = true;
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
885
    mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
S
Shengliang Guan 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905
    continueExec = false;
  } else {
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransRollback(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from rollback to finished", pTrans->id);
    continueExec = true;
  } else {
    pTrans->failedTimes++;
S
Shengliang Guan 已提交
906
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
907
    continueExec = false;
S
Shengliang Guan 已提交
908 909
  }

S
Shengliang Guan 已提交
910 911 912 913 914 915 916 917
  return continueExec;
}

static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = false;

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
918
    mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
919 920 921 922 923 924 925 926
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  int32_t code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
  }

S
Shengliang Guan 已提交
927
  mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
S
Shengliang Guan 已提交
928
  return continueExec;
S
Shengliang Guan 已提交
929
}
S
Shengliang Guan 已提交
930

S
Shengliang Guan 已提交
931
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
932
  bool continueExec = true;
S
Shengliang Guan 已提交
933

S
Shengliang Guan 已提交
934
  while (continueExec) {
S
Shengliang Guan 已提交
935 936
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
S
Shengliang Guan 已提交
937 938 939 940 941 942 943 944 945 946
        continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_LOG:
        continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_ACTION:
        continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_UNDO_LOG:
        continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
947
        break;
S
Shengliang Guan 已提交
948 949 950 951 952
      case TRN_STAGE_UNDO_ACTION:
        continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT_LOG:
        continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
953 954
        break;
      case TRN_STAGE_COMMIT:
S
Shengliang Guan 已提交
955
        continueExec = mndTransPerformCommitStage(pMnode, pTrans);
S
Shengliang Guan 已提交
956 957
        break;
      case TRN_STAGE_ROLLBACK:
S
Shengliang Guan 已提交
958 959 960 961
        continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
        break;
      case TRN_STAGE_FINISHED:
        continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
S
Shengliang Guan 已提交
962
        break;
S
Shengliang Guan 已提交
963
      default:
S
Shengliang Guan 已提交
964 965
        continueExec = false;
        break;
S
Shengliang Guan 已提交
966 967 968
    }
  }

969
  mndTransSendRpcRsp(pTrans);
S
Shengliang Guan 已提交
970
}
S
Shengliang Guan 已提交
971

S
Shengliang Guan 已提交
972 973
static int32_t mndProcessTransReq(SMnodeMsg *pReq) {
  mndTransPullup(pReq->pMnode);
S
Shengliang Guan 已提交
974 975 976 977
  return 0;
}

void mndTransPullup(SMnode *pMnode) {
S
Shengliang Guan 已提交
978
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
979
  void   *pIter = NULL;
S
Shengliang Guan 已提交
980 981 982 983 984 985 986 987

  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
    if (pIter == NULL) break;

    mndTransExecute(pMnode, pTrans);
    sdbRelease(pMnode->pSdb, pTrans);
  }
S
Shengliang Guan 已提交
988 989

  sdbWriteFile(pMnode->pSdb);
990
}