mndTrans.c 30.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define MND_TRANS_VER_NUMBER 1
#define MND_TRANS_ARRAY_SIZE 8
#define MND_TRANS_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
27
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
S
Shengliang Guan 已提交
28 29
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
30
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
S
Shengliang Guan 已提交
32 33
static void    mndTransDropLogs(SArray *pArray);
static void    mndTransDropActions(SArray *pArray);
34
static void    mndTransDropData(STrans *pTrans);
S
Shengliang Guan 已提交
35
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
36
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
S
Shengliang Guan 已提交
37 38 39 40
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50 51
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
52
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
53 54
static void    mndTransSendRpcRsp(STrans *pTrans);
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg);
S
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

S
Shengliang Guan 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg);
S
Shengliang Guan 已提交
66 67 68 69 70 71
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
72 73
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
74
  int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE;
S
Shengliang Guan 已提交
75 76 77 78 79 80
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

81
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
82
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
83
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
84 85
  }

86
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
87
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
88
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
89 90
  }

91
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
92
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
93
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
94 95
  }

S
Shengliang Guan 已提交
96 97
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
S
Shengliang Guan 已提交
98
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
99 100 101 102
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
S
Shengliang Guan 已提交
103
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
104 105
  }

S
Shengliang Guan 已提交
106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, MND_TRANS_VER_NUMBER, rawDataLen);
S
Shengliang Guan 已提交
107
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
108
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
109 110 111 112
    return NULL;
  }

  int32_t dataPos = 0;
113 114 115 116 117 118 119
  SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
120

121
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
122
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
123
    int32_t  len = sdbGetRawTotalSize(pTmp);
124 125
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
126 127
  }

128
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
129
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
130
    int32_t  len = sdbGetRawTotalSize(pTmp);
131 132
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
133 134
  }

135
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
136
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
137
    int32_t  len = sdbGetRawTotalSize(pTmp);
138 139
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
140 141
  }

S
Shengliang Guan 已提交
142 143
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
144 145 146 147
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
148 149 150 151
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
  }

  SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)

  terrno = 0;

TRANS_ENCODE_OVER:
  if (terrno != 0) {
    mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
168 169
  }

S
Shengliang Guan 已提交
170
  mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
S
Shengliang Guan 已提交
171 172 173
  return pRaw;
}

S
Shengliang Guan 已提交
174
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
  terrno = TSDB_CODE_OUT_OF_MEMORY;

  SSdbRow     *pRow = NULL;
  STrans      *pTrans = NULL;
  char        *pData = NULL;
  int32_t      dataLen = 0;
  int8_t       sver = 0;
  int32_t      redoLogNum = 0;
  int32_t      undoLogNum = 0;
  int32_t      commitLogNum = 0;
  int32_t      redoActionNum = 0;
  int32_t      undoActionNum = 0;
  int32_t      dataPos = 0;
  STransAction action = {0};

  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
191

S
Shengliang Guan 已提交
192
  if (sver != MND_TRANS_VER_NUMBER) {
S
Shengliang Guan 已提交
193
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
194
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
195 196
  }

197 198 199 200 201
  pRow = sdbAllocRow(sizeof(STrans));
  if (pRow == NULL) goto TRANS_DECODE_OVER;

  pTrans = sdbGetRowObj(pRow);
  if (pTrans == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
202

S
Shengliang Guan 已提交
203 204 205 206 207
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
208

209 210 211 212 213
  if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
214

215 216 217 218 219 220 221
  SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
222

223
  for (int32_t i = 0; i < redoLogNum; ++i) {
224 225 226
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
227
    mTrace("raw:%p, is created", pData);
228 229 230
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
231 232
  }

S
Shengliang Guan 已提交
233
  for (int32_t i = 0; i < undoLogNum; ++i) {
234 235 236
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
237
    mTrace("raw:%p, is created", pData);
238 239 240
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
241 242 243
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
244 245
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
S
Shengliang Guan 已提交
246 247
    if (pData == NULL) goto TRANS_DECODE_OVER;
    mTrace("raw:%p, is created", pData);
248 249 250
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
251 252 253
  }

  for (int32_t i = 0; i < redoActionNum; ++i) {
254 255 256
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
257
    action.pCont = malloc(action.contLen);
258 259 260 261
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
262 263 264
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
265 266 267
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
268
    action.pCont = malloc(action.contLen);
269 270 271 272
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
273 274
  }

275 276 277
  SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
278

S
Shengliang Guan 已提交
279
TRANS_DECODE_OVER:
280 281 282 283 284 285
  if (terrno != 0) {
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
    mndTransDropData(pTrans);
    tfree(pRow);
    tfree(pData);
    tfree(action.pCont);
S
Shengliang Guan 已提交
286 287 288
    return NULL;
  }

S
Shengliang Guan 已提交
289
  mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
S
Shengliang Guan 已提交
290 291 292
  return pRow;
}

S
Shengliang Guan 已提交
293
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
294
  pTrans->stage = TRN_STAGE_PREPARE;
S
Shengliang Guan 已提交
295
  mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
296 297 298
  return 0;
}

299
static void mndTransDropData(STrans *pTrans) {
S
Shengliang Guan 已提交
300 301 302 303 304
  mndTransDropLogs(pTrans->redoLogs);
  mndTransDropLogs(pTrans->undoLogs);
  mndTransDropLogs(pTrans->commitLogs);
  mndTransDropActions(pTrans->redoActions);
  mndTransDropActions(pTrans->undoActions);
305
}
S
Shengliang Guan 已提交
306

307
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
308
  mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans);
309
  mndTransDropData(pTrans);
S
Shengliang Guan 已提交
310 311 312
  return 0;
}

S
Shengliang Guan 已提交
313
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
S
Shengliang Guan 已提交
314
  mTrace("trans:%d, perform update action, old_row:%p new_row:%p", pOldTrans->id, pOldTrans, pNewTrans);
S
Shengliang Guan 已提交
315
  pOldTrans->stage = pNewTrans->stage;
S
Shengliang Guan 已提交
316 317 318
  return 0;
}

S
Shengliang Guan 已提交
319
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
S
Shengliang Guan 已提交
320 321 322 323 324 325
  SSdb   *pSdb = pMnode->pSdb;
  STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
  if (pTrans == NULL) {
    terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
  }
  return pTrans;
S
Shengliang Guan 已提交
326 327 328 329 330 331 332
}

void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

S
Shengliang Guan 已提交
333
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
334 335 336 337 338 339 340
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
341
  pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
S
Shengliang Guan 已提交
342 343
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
S
Shengliang Guan 已提交
344 345
  pTrans->rpcHandle = pMsg->handle;
  pTrans->rpcAHandle = pMsg->ahandle;
S
Shengliang Guan 已提交
346 347 348 349 350
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
351 352 353 354 355 356 357 358

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
359
  mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
360 361 362
  return pTrans;
}

S
Shengliang Guan 已提交
363
static void mndTransDropLogs(SArray *pArray) {
364
  if (pArray == NULL) return;
365
  for (int32_t i = 0; i < pArray->size; ++i) {
S
Shengliang Guan 已提交
366
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
367
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
368 369 370 371 372
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
373
static void mndTransDropActions(SArray *pArray) {
374
  if (pArray == NULL) return;
S
Shengliang Guan 已提交
375 376
  for (int32_t i = 0; i < pArray->size; ++i) {
    STransAction *pAction = taosArrayGet(pArray, i);
S
Shengliang Guan 已提交
377
    free(pAction->pCont);
S
Shengliang Guan 已提交
378 379 380 381 382
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
383
void mndTransDrop(STrans *pTrans) {
384
  mndTransDropData(pTrans);
S
Shengliang Guan 已提交
385
  mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
386 387 388
  tfree(pTrans);
}

S
Shengliang Guan 已提交
389
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
390 391 392 393 394
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
395
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
396 397 398 399 400 401 402 403
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
404
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
S
Shengliang Guan 已提交
405

S
Shengliang Guan 已提交
406
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
S
Shengliang Guan 已提交
407

S
Shengliang Guan 已提交
408
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
S
Shengliang Guan 已提交
409

S
Shengliang Guan 已提交
410
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
411
  void *ptr = taosArrayPush(pArray, pAction);
S
Shengliang Guan 已提交
412 413 414 415 416 417 418 419
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
420
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
421
  return mndTransAppendAction(pTrans->redoActions, pAction);
S
Shengliang Guan 已提交
422 423
}

S
Shengliang Guan 已提交
424
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
425
  return mndTransAppendAction(pTrans->undoActions, pAction);
S
Shengliang Guan 已提交
426 427
}

S
Shengliang Guan 已提交
428
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
429
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
430
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
431
    mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
432 433
    return -1;
  }
S
Shengliang Guan 已提交
434
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
435

S
Shengliang Guan 已提交
436
  mDebug("trans:%d, sync to other nodes", pTrans->id);
S
Shengliang Guan 已提交
437 438 439 440
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
441 442 443
    return -1;
  }

S
Shengliang Guan 已提交
444
  mDebug("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
445

S
Shengliang Guan 已提交
446 447 448 449 450 451
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
452 453 454 455 456 457 458 459 460 461 462
  return 0;
}

int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, prepare transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    return -1;
  }
  mDebug("trans:%d, prepare finished", pTrans->id);

S
Shengliang Guan 已提交
463 464
  STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
  if (pNewTrans == NULL) {
S
Shengliang Guan 已提交
465
    mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
466 467 468
    return -1;
  }

S
Shengliang Guan 已提交
469
  pNewTrans->rpcHandle = pTrans->rpcHandle;
S
Shengliang Guan 已提交
470
  pNewTrans->rpcAHandle = pTrans->rpcAHandle;
S
Shengliang Guan 已提交
471 472 473 474 475
  mndTransExecute(pMnode, pNewTrans);
  mndReleaseTrans(pMnode, pNewTrans);
  return 0;
}

S
Shengliang Guan 已提交
476 477
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
S
Shengliang Guan 已提交
478

S
Shengliang Guan 已提交
479 480 481
  mDebug("trans:%d, commit transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
482
    return -1;
S
Shengliang Guan 已提交
483 484
  }
  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
485 486 487
  return 0;
}

S
Shengliang Guan 已提交
488
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
489
  mDebug("trans:%d, rollback transaction", pTrans->id);
S
Shengliang Guan 已提交
490 491
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
492
    return -1;
S
Shengliang Guan 已提交
493
  }
S
Shengliang Guan 已提交
494 495
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
496
}
S
Shengliang Guan 已提交
497

S
Shengliang Guan 已提交
498
static void mndTransSendRpcRsp(STrans *pTrans) {
S
Shengliang Guan 已提交
499
  if (pTrans->rpcHandle != NULL) {
S
Shengliang Guan 已提交
500 501
    mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF);
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
S
Shengliang Guan 已提交
502
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
503 504 505
  }
}

S
Shengliang Guan 已提交
506
void mndTransProcessRsp(SMnodeMsg *pMsg) {
507
  SMnode *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
508 509 510
  int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle);
  int32_t transId = (int32_t)(signature >> 32);
  int32_t action = (int32_t)((signature << 32) >> 32);
511 512 513 514 515 516 517 518

  STrans *pTrans = mndAcquireTrans(pMnode, transId);
  if (pTrans == NULL) {
    mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
    goto HANDLE_ACTION_RSP_OVER;
  }

  SArray *pArray = NULL;
S
Shengliang Guan 已提交
519
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
520
    pArray = pTrans->redoActions;
S
Shengliang Guan 已提交
521
  } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
522 523
    pArray = pTrans->undoActions;
  } else {
S
Shengliang Guan 已提交
524 525
    mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage);
    goto HANDLE_ACTION_RSP_OVER;
526 527 528
  }

  if (pArray == NULL) {
S
Shengliang Guan 已提交
529
    mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage);
530 531 532 533
    goto HANDLE_ACTION_RSP_OVER;
  }

  int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
S
Shengliang Guan 已提交
534
  if (action < 0 || action >= actionNum) {
535 536 537 538 539 540 541
    mError("trans:%d, invalid action:%d", transId, action);
    goto HANDLE_ACTION_RSP_OVER;
  }

  STransAction *pAction = taosArrayGet(pArray, action);
  if (pAction != NULL) {
    pAction->msgReceived = 1;
S
Shengliang Guan 已提交
542
    pAction->errCode = pMsg->rpcMsg.code;
543 544
  }

S
Shengliang Guan 已提交
545
  mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code);
546 547 548 549 550 551
  mndTransExecute(pMnode, pTrans);

HANDLE_ACTION_RSP_OVER:
  mndReleaseTrans(pMnode, pTrans);
}

S
Shengliang Guan 已提交
552
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
S
Shengliang Guan 已提交
553 554 555
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

S
Shengliang Guan 已提交
556 557
  if (arraySize == 0) return 0;

S
Shengliang Guan 已提交
558
  for (int32_t i = 0; i < arraySize; ++i) {
559
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
560 561 562
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
563 564 565 566 567 568
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
569
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
570
  return mndTransExecuteLogs(pMnode, pTrans->redoLogs);
S
Shengliang Guan 已提交
571 572 573
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
574
  return mndTransExecuteLogs(pMnode, pTrans->undoLogs);
S
Shengliang Guan 已提交
575 576 577
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
578
  return mndTransExecuteLogs(pMnode, pTrans->commitLogs);
S
Shengliang Guan 已提交
579
}
S
Shengliang Guan 已提交
580

S
Shengliang Guan 已提交
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;

    pAction->msgSent = 0;
    pAction->msgReceived = 0;
    pAction->errCode = 0;
    mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
  }
}

static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
597 598 599 600 601
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
S
Shengliang Guan 已提交
602
    if (pAction->msgSent) continue;
S
Shengliang Guan 已提交
603

604 605 606 607 608
    int64_t signature = pTrans->id;
    signature = (signature << 32);
    signature += action;

    SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature};
S
Shengliang Guan 已提交
609 610 611 612
    rpcMsg.pCont = rpcMallocCont(pAction->contLen);
    if (rpcMsg.pCont == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
S
Shengliang Guan 已提交
613
    }
S
Shengliang Guan 已提交
614
    memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
615

S
Shengliang Guan 已提交
616 617 618 619 620 621 622 623 624
    if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) {
      mDebug("trans:%d, action:%d is sent", pTrans->id, action);
      pAction->msgSent = 1;
      pAction->msgReceived = 0;
      pAction->errCode = 0;
    } else {
      mDebug("trans:%d, action:%d not sent since %s", pTrans->id, action, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
625 626
  }

S
Shengliang Guan 已提交
627 628 629 630 631 632 633 634 635 636 637
  return 0;
}

static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);
  if (numOfActions == 0) return 0;

  if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
638 639
  int32_t numOfReceived = 0;
  int32_t errCode = 0;
640 641 642 643
  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived) {
S
Shengliang Guan 已提交
644
      numOfReceived++;
645
      if (pAction->errCode != 0) {
S
Shengliang Guan 已提交
646
        errCode = pAction->errCode;
647 648 649 650
      }
    }
  }

S
Shengliang Guan 已提交
651
  if (numOfReceived == numOfActions) {
S
Shengliang Guan 已提交
652 653 654 655 656 657 658 659 660
    if (errCode == 0) {
      mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
      return 0;
    } else {
      mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
      mndTransResetActions(pMnode, pTrans, pArray);
      terrno = errCode;
      return errCode;
    }
661
  } else {
S
Shengliang Guan 已提交
662
    mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode);
663 664
    return TSDB_CODE_MND_ACTION_IN_PROGRESS;
  }
S
Shengliang Guan 已提交
665 666
}

S
Shengliang Guan 已提交
667
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
668
  return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
S
Shengliang Guan 已提交
669
}
S
Shengliang Guan 已提交
670

S
Shengliang Guan 已提交
671
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
672
  return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
S
Shengliang Guan 已提交
673
}
S
Shengliang Guan 已提交
674

S
Shengliang Guan 已提交
675 676 677 678 679 680 681 682 683
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = true;
  pTrans->stage = TRN_STAGE_REDO_LOG;
  mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
  return continueExec;
}

static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
684
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
685

S
Shengliang Guan 已提交
686
  if (code == 0) {
S
Shengliang Guan 已提交
687 688 689
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_REDO_ACTION;
    mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
S
Shengliang Guan 已提交
690
  } else {
S
Shengliang Guan 已提交
691 692 693
    pTrans->code = terrno;
    pTrans->stage = TRN_STAGE_UNDO_LOG;
    mError("trans:%d, stage from redoLog to undoLog", pTrans->id);
S
Shengliang Guan 已提交
694
  }
S
Shengliang Guan 已提交
695 696

  return continueExec;
S
Shengliang Guan 已提交
697 698
}

S
Shengliang Guan 已提交
699 700
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
701
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
702 703

  if (code == 0) {
S
Shengliang Guan 已提交
704
    pTrans->code = 0;
S
Shengliang Guan 已提交
705
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
706 707
    mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
708
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
709 710
    mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
    continueExec = false;
S
Shengliang Guan 已提交
711
  } else {
S
Shengliang Guan 已提交
712
    pTrans->code = terrno;
S
Shengliang Guan 已提交
713
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
714 715 716
      pTrans->stage = TRN_STAGE_UNDO_ACTION;
      mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr());
      continueExec = true;
S
Shengliang Guan 已提交
717
    } else {
S
Shengliang Guan 已提交
718 719 720
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
S
Shengliang Guan 已提交
721 722 723
    }
  }

S
Shengliang Guan 已提交
724
  return continueExec;
S
Shengliang Guan 已提交
725 726
}

S
Shengliang Guan 已提交
727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransCommit(pMnode, pTrans);

  if (code == 0) {
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_COMMIT_LOG;
    mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
    continueExec = true;
  } else {
    pTrans->code = terrno;
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_REDO_ACTION;
      mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
             pTrans->failedTimes);
      continueExec = true;
    } else {
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
    }
  }

  return continueExec;
S
Shengliang Guan 已提交
751 752
}

S
Shengliang Guan 已提交
753 754 755
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
756 757

  if (code == 0) {
S
Shengliang Guan 已提交
758 759 760 761
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
762
  } else {
S
Shengliang Guan 已提交
763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
    pTrans->code = terrno;
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
    continueExec = false;
    ;
  }

  return continueExec;
}

static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);

  if (code == 0) {
S
Shengliang Guan 已提交
778
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
    mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
    continueExec = true;
  } else {
    mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_REDO_LOG;
    mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
    continueExec = true;
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
    mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
    continueExec = false;
  } else {
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransRollback(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from rollback to finished", pTrans->id);
    continueExec = true;
    ;
  } else {
    pTrans->failedTimes++;
S
Shengliang Guan 已提交
820
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
821
    continueExec = false;
S
Shengliang Guan 已提交
822 823
  }

S
Shengliang Guan 已提交
824 825 826 827 828 829 830 831
  return continueExec;
}

static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = false;

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
832
    mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
833 834 835 836 837 838 839 840
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  int32_t code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
  }

S
Shengliang Guan 已提交
841
  mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
S
Shengliang Guan 已提交
842
  return continueExec;
S
Shengliang Guan 已提交
843
}
S
Shengliang Guan 已提交
844

S
Shengliang Guan 已提交
845
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
846
  bool continueExec = true;
S
Shengliang Guan 已提交
847

S
Shengliang Guan 已提交
848
  while (continueExec) {
S
Shengliang Guan 已提交
849 850
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
S
Shengliang Guan 已提交
851 852 853 854 855 856 857 858 859 860
        continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_LOG:
        continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_ACTION:
        continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_UNDO_LOG:
        continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
861
        break;
S
Shengliang Guan 已提交
862 863 864 865 866
      case TRN_STAGE_UNDO_ACTION:
        continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT_LOG:
        continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
867 868
        break;
      case TRN_STAGE_COMMIT:
S
Shengliang Guan 已提交
869
        continueExec = mndTransPerformCommitStage(pMnode, pTrans);
S
Shengliang Guan 已提交
870 871
        break;
      case TRN_STAGE_ROLLBACK:
S
Shengliang Guan 已提交
872 873 874 875
        continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
        break;
      case TRN_STAGE_FINISHED:
        continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
S
Shengliang Guan 已提交
876
        break;
S
Shengliang Guan 已提交
877
      default:
S
Shengliang Guan 已提交
878 879
        continueExec = false;
        break;
S
Shengliang Guan 已提交
880 881 882
    }
  }

S
Shengliang Guan 已提交
883 884 885
  if (pTrans->stage == TRN_STAGE_FINISHED) {
    mndTransSendRpcRsp(pTrans);
  }
S
Shengliang Guan 已提交
886
}
S
Shengliang Guan 已提交
887 888

static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
S
Shengliang Guan 已提交
889 890 891 892 893
  mndTransPullup(pMsg->pMnode);
  return 0;
}

void mndTransPullup(SMnode *pMnode) {
S
Shengliang Guan 已提交
894 895 896 897 898 899 900 901 902 903
  STrans *pTrans = NULL;
  void   *pIter = NULL;

  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
    if (pIter == NULL) break;

    mndTransExecute(pMnode, pTrans);
    sdbRelease(pMnode->pSdb, pTrans);
  }
904
}