mndTrans.c 30.7 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define MND_TRANS_VER_NUMBER 1
#define MND_TRANS_ARRAY_SIZE 8
#define MND_TRANS_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
27
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
S
Shengliang Guan 已提交
28 29
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
30
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
S
Shengliang Guan 已提交
32 33
static void    mndTransDropLogs(SArray *pArray);
static void    mndTransDropActions(SArray *pArray);
34
static void    mndTransDropData(STrans *pTrans);
S
Shengliang Guan 已提交
35
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
36
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
S
Shengliang Guan 已提交
37 38 39 40
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50 51
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
52
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
53 54
static void    mndTransSendRpcRsp(STrans *pTrans);
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg);
55
static int32_t mndProcessTransRsp(SMnodeMsg *pMsg);
S
Shengliang Guan 已提交
56

S
Shengliang Guan 已提交
57 58 59 60 61 62 63 64 65
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

S
Shengliang Guan 已提交
66
  mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg);
67
  mndSetMsgHandle(pMnode, TDMT_MND_TRANS_RSP, mndProcessTransRsp);
S
Shengliang Guan 已提交
68 69 70 71 72 73
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
74 75
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
76
  int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE;
S
Shengliang Guan 已提交
77 78 79 80 81 82
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

83
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
84
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
85
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
86 87
  }

88
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
89
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
90
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
91 92
  }

93
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
94
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
95
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
96 97
  }

S
Shengliang Guan 已提交
98 99
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
S
Shengliang Guan 已提交
100
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
101 102 103 104
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
S
Shengliang Guan 已提交
105
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
106 107
  }

S
Shengliang Guan 已提交
108
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, MND_TRANS_VER_NUMBER, rawDataLen);
S
Shengliang Guan 已提交
109
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
110
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
111 112 113 114
    return NULL;
  }

  int32_t dataPos = 0;
115 116 117 118 119 120 121
  SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
122

123
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
124
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
125
    int32_t  len = sdbGetRawTotalSize(pTmp);
126 127
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
128 129
  }

130
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
131
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
132
    int32_t  len = sdbGetRawTotalSize(pTmp);
133 134
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
135 136
  }

137
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
138
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
139
    int32_t  len = sdbGetRawTotalSize(pTmp);
140 141
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
142 143
  }

S
Shengliang Guan 已提交
144 145
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
146 147 148 149
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
150 151 152 153
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
  }

  SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)

  terrno = 0;

TRANS_ENCODE_OVER:
  if (terrno != 0) {
    mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
170 171
  }

S
Shengliang Guan 已提交
172
  mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
S
Shengliang Guan 已提交
173 174 175
  return pRaw;
}

S
Shengliang Guan 已提交
176
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
  terrno = TSDB_CODE_OUT_OF_MEMORY;

  SSdbRow     *pRow = NULL;
  STrans      *pTrans = NULL;
  char        *pData = NULL;
  int32_t      dataLen = 0;
  int8_t       sver = 0;
  int32_t      redoLogNum = 0;
  int32_t      undoLogNum = 0;
  int32_t      commitLogNum = 0;
  int32_t      redoActionNum = 0;
  int32_t      undoActionNum = 0;
  int32_t      dataPos = 0;
  STransAction action = {0};

  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
193

S
Shengliang Guan 已提交
194
  if (sver != MND_TRANS_VER_NUMBER) {
S
Shengliang Guan 已提交
195
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
196
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
197 198
  }

199 200 201 202 203
  pRow = sdbAllocRow(sizeof(STrans));
  if (pRow == NULL) goto TRANS_DECODE_OVER;

  pTrans = sdbGetRowObj(pRow);
  if (pTrans == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
204

S
Shengliang Guan 已提交
205 206 207 208 209
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
210

211 212 213 214 215
  if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
216

217 218 219 220 221 222 223
  SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
224

225
  for (int32_t i = 0; i < redoLogNum; ++i) {
226 227 228
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
229
    mTrace("raw:%p, is created", pData);
230 231 232
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
233 234
  }

S
Shengliang Guan 已提交
235
  for (int32_t i = 0; i < undoLogNum; ++i) {
236 237 238
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
239
    mTrace("raw:%p, is created", pData);
240 241 242
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
243 244 245
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
246 247
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
S
Shengliang Guan 已提交
248 249
    if (pData == NULL) goto TRANS_DECODE_OVER;
    mTrace("raw:%p, is created", pData);
250 251 252
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
253 254 255
  }

  for (int32_t i = 0; i < redoActionNum; ++i) {
256 257 258
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
259
    action.pCont = malloc(action.contLen);
260 261 262 263
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
264 265 266
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
267 268 269
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
270
    action.pCont = malloc(action.contLen);
271 272 273 274
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
275 276
  }

277 278 279
  SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
280

S
Shengliang Guan 已提交
281
TRANS_DECODE_OVER:
282 283 284 285 286 287
  if (terrno != 0) {
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
    mndTransDropData(pTrans);
    tfree(pRow);
    tfree(pData);
    tfree(action.pCont);
S
Shengliang Guan 已提交
288 289 290
    return NULL;
  }

S
Shengliang Guan 已提交
291
  mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
S
Shengliang Guan 已提交
292 293 294
  return pRow;
}

S
Shengliang Guan 已提交
295
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
296
  pTrans->stage = TRN_STAGE_PREPARE;
S
Shengliang Guan 已提交
297
  mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
298 299 300
  return 0;
}

301
static void mndTransDropData(STrans *pTrans) {
S
Shengliang Guan 已提交
302 303 304 305 306
  mndTransDropLogs(pTrans->redoLogs);
  mndTransDropLogs(pTrans->undoLogs);
  mndTransDropLogs(pTrans->commitLogs);
  mndTransDropActions(pTrans->redoActions);
  mndTransDropActions(pTrans->undoActions);
307
}
S
Shengliang Guan 已提交
308

309
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
310
  mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans);
311
  mndTransDropData(pTrans);
S
Shengliang Guan 已提交
312 313 314
  return 0;
}

S
Shengliang Guan 已提交
315
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
S
Shengliang Guan 已提交
316
  mTrace("trans:%d, perform update action, old_row:%p new_row:%p", pOldTrans->id, pOldTrans, pNewTrans);
S
Shengliang Guan 已提交
317
  pOldTrans->stage = pNewTrans->stage;
S
Shengliang Guan 已提交
318 319 320
  return 0;
}

S
Shengliang Guan 已提交
321
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
S
Shengliang Guan 已提交
322 323 324 325 326 327
  SSdb   *pSdb = pMnode->pSdb;
  STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
  if (pTrans == NULL) {
    terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
  }
  return pTrans;
S
Shengliang Guan 已提交
328 329 330 331 332 333 334
}

void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

S
Shengliang Guan 已提交
335
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) {
S
Shengliang Guan 已提交
336 337 338 339 340 341 342
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
343
  pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
S
Shengliang Guan 已提交
344 345
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
S
Shengliang Guan 已提交
346 347
  pTrans->rpcHandle = pMsg->handle;
  pTrans->rpcAHandle = pMsg->ahandle;
S
Shengliang Guan 已提交
348 349 350 351 352
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
353 354 355 356 357 358 359 360

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
361
  mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
362 363 364
  return pTrans;
}

S
Shengliang Guan 已提交
365
static void mndTransDropLogs(SArray *pArray) {
366
  if (pArray == NULL) return;
367
  for (int32_t i = 0; i < pArray->size; ++i) {
S
Shengliang Guan 已提交
368
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
369
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
370 371 372 373 374
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
375
static void mndTransDropActions(SArray *pArray) {
376
  if (pArray == NULL) return;
S
Shengliang Guan 已提交
377 378
  for (int32_t i = 0; i < pArray->size; ++i) {
    STransAction *pAction = taosArrayGet(pArray, i);
S
Shengliang Guan 已提交
379
    free(pAction->pCont);
S
Shengliang Guan 已提交
380 381 382 383 384
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
385
void mndTransDrop(STrans *pTrans) {
386
  mndTransDropData(pTrans);
S
Shengliang Guan 已提交
387
  mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
388 389 390
  tfree(pTrans);
}

S
Shengliang Guan 已提交
391
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
392 393 394 395 396
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
397
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
398 399 400 401 402 403 404 405
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
406
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
S
Shengliang Guan 已提交
407

S
Shengliang Guan 已提交
408
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
S
Shengliang Guan 已提交
409

S
Shengliang Guan 已提交
410
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
S
Shengliang Guan 已提交
411

S
Shengliang Guan 已提交
412
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
413
  void *ptr = taosArrayPush(pArray, pAction);
S
Shengliang Guan 已提交
414 415 416 417 418 419 420 421
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
422
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
423
  return mndTransAppendAction(pTrans->redoActions, pAction);
S
Shengliang Guan 已提交
424 425
}

S
Shengliang Guan 已提交
426
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
427
  return mndTransAppendAction(pTrans->undoActions, pAction);
S
Shengliang Guan 已提交
428 429
}

S
Shengliang Guan 已提交
430
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
431
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
432
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
433
    mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
434 435
    return -1;
  }
S
Shengliang Guan 已提交
436
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
437

S
Shengliang Guan 已提交
438
  mDebug("trans:%d, sync to other nodes", pTrans->id);
S
Shengliang Guan 已提交
439 440 441 442
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
443 444 445
    return -1;
  }

S
Shengliang Guan 已提交
446
  mDebug("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
447

S
Shengliang Guan 已提交
448 449 450 451 452 453
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
454 455 456 457 458 459 460 461 462 463 464
  return 0;
}

int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, prepare transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    return -1;
  }
  mDebug("trans:%d, prepare finished", pTrans->id);

S
Shengliang Guan 已提交
465 466
  STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
  if (pNewTrans == NULL) {
S
Shengliang Guan 已提交
467
    mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
468 469 470
    return -1;
  }

S
Shengliang Guan 已提交
471
  pNewTrans->rpcHandle = pTrans->rpcHandle;
S
Shengliang Guan 已提交
472
  pNewTrans->rpcAHandle = pTrans->rpcAHandle;
S
Shengliang Guan 已提交
473 474 475 476 477
  mndTransExecute(pMnode, pNewTrans);
  mndReleaseTrans(pMnode, pNewTrans);
  return 0;
}

S
Shengliang Guan 已提交
478 479
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
S
Shengliang Guan 已提交
480

S
Shengliang Guan 已提交
481 482 483
  mDebug("trans:%d, commit transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
484
    return -1;
S
Shengliang Guan 已提交
485 486
  }
  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
487 488 489
  return 0;
}

S
Shengliang Guan 已提交
490
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
491
  mDebug("trans:%d, rollback transaction", pTrans->id);
S
Shengliang Guan 已提交
492 493
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
494
    return -1;
S
Shengliang Guan 已提交
495
  }
S
Shengliang Guan 已提交
496 497
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
498
}
S
Shengliang Guan 已提交
499

S
Shengliang Guan 已提交
500
static void mndTransSendRpcRsp(STrans *pTrans) {
S
Shengliang Guan 已提交
501
  if (pTrans->rpcHandle != NULL) {
S
Shengliang Guan 已提交
502 503
    mDebug("trans:%d, send rsp, ahandle:%p code:0x%x", pTrans->id, pTrans->rpcAHandle, pTrans->code & 0xFFFF);
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
S
Shengliang Guan 已提交
504
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
505 506 507
  }
}

S
Shengliang Guan 已提交
508
void mndTransProcessRsp(SMnodeMsg *pMsg) {
509
  SMnode *pMnode = pMsg->pMnode;
S
Shengliang Guan 已提交
510 511 512
  int64_t signature = (int64_t)(pMsg->rpcMsg.ahandle);
  int32_t transId = (int32_t)(signature >> 32);
  int32_t action = (int32_t)((signature << 32) >> 32);
513 514 515 516 517 518 519 520

  STrans *pTrans = mndAcquireTrans(pMnode, transId);
  if (pTrans == NULL) {
    mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
    goto HANDLE_ACTION_RSP_OVER;
  }

  SArray *pArray = NULL;
S
Shengliang Guan 已提交
521
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
522
    pArray = pTrans->redoActions;
S
Shengliang Guan 已提交
523
  } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
524 525
    pArray = pTrans->undoActions;
  } else {
S
Shengliang Guan 已提交
526 527
    mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage);
    goto HANDLE_ACTION_RSP_OVER;
528 529 530
  }

  if (pArray == NULL) {
S
Shengliang Guan 已提交
531
    mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage);
532 533 534 535
    goto HANDLE_ACTION_RSP_OVER;
  }

  int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
S
Shengliang Guan 已提交
536
  if (action < 0 || action >= actionNum) {
537 538 539 540 541 542 543
    mError("trans:%d, invalid action:%d", transId, action);
    goto HANDLE_ACTION_RSP_OVER;
  }

  STransAction *pAction = taosArrayGet(pArray, action);
  if (pAction != NULL) {
    pAction->msgReceived = 1;
S
Shengliang Guan 已提交
544
    pAction->errCode = pMsg->rpcMsg.code;
545 546
  }

S
Shengliang Guan 已提交
547
  mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code);
548 549 550 551 552 553
  mndTransExecute(pMnode, pTrans);

HANDLE_ACTION_RSP_OVER:
  mndReleaseTrans(pMnode, pTrans);
}

S
Shengliang Guan 已提交
554
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
S
Shengliang Guan 已提交
555 556 557
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

S
Shengliang Guan 已提交
558 559
  if (arraySize == 0) return 0;

S
Shengliang Guan 已提交
560
  for (int32_t i = 0; i < arraySize; ++i) {
561
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
562 563 564
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
565 566 567 568 569 570
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
571
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
572
  return mndTransExecuteLogs(pMnode, pTrans->redoLogs);
S
Shengliang Guan 已提交
573 574 575
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
576
  return mndTransExecuteLogs(pMnode, pTrans->undoLogs);
S
Shengliang Guan 已提交
577 578 579
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
580
  return mndTransExecuteLogs(pMnode, pTrans->commitLogs);
S
Shengliang Guan 已提交
581
}
S
Shengliang Guan 已提交
582

S
Shengliang Guan 已提交
583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;

    pAction->msgSent = 0;
    pAction->msgReceived = 0;
    pAction->errCode = 0;
    mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
  }
}

static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
599 600 601 602 603
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
S
Shengliang Guan 已提交
604
    if (pAction->msgSent) continue;
S
Shengliang Guan 已提交
605

606 607 608 609 610
    int64_t signature = pTrans->id;
    signature = (signature << 32);
    signature += action;

    SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature};
S
Shengliang Guan 已提交
611 612 613 614
    rpcMsg.pCont = rpcMallocCont(pAction->contLen);
    if (rpcMsg.pCont == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
S
Shengliang Guan 已提交
615
    }
S
Shengliang Guan 已提交
616
    memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
617 618 619 620 621 622

    pAction->msgSent = 1;
    pAction->msgReceived = 0;
    pAction->errCode = 0;

    mDebug("trans:%d, action:%d is sent", pTrans->id, action);
S
Shengliang Guan 已提交
623
    mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
S
Shengliang Guan 已提交
624 625
  }

S
Shengliang Guan 已提交
626 627 628 629 630 631 632 633 634 635 636
  return 0;
}

static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);
  if (numOfActions == 0) return 0;

  if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
637 638
  int32_t numOfReceived = 0;
  int32_t errCode = 0;
639 640 641 642
  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived) {
S
Shengliang Guan 已提交
643
      numOfReceived++;
644
      if (pAction->errCode != 0) {
S
Shengliang Guan 已提交
645
        errCode = pAction->errCode;
646 647 648 649
      }
    }
  }

S
Shengliang Guan 已提交
650
  if (numOfReceived == numOfActions) {
S
Shengliang Guan 已提交
651 652 653 654 655 656 657 658 659
    if (errCode == 0) {
      mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
      return 0;
    } else {
      mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
      mndTransResetActions(pMnode, pTrans, pArray);
      terrno = errCode;
      return errCode;
    }
660
  } else {
S
Shengliang Guan 已提交
661
    mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode);
662 663
    return TSDB_CODE_MND_ACTION_IN_PROGRESS;
  }
S
Shengliang Guan 已提交
664 665
}

S
Shengliang Guan 已提交
666
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
667
  return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
S
Shengliang Guan 已提交
668
}
S
Shengliang Guan 已提交
669

S
Shengliang Guan 已提交
670
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
671
  return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
S
Shengliang Guan 已提交
672
}
S
Shengliang Guan 已提交
673

S
Shengliang Guan 已提交
674 675 676 677 678 679 680 681 682
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = true;
  pTrans->stage = TRN_STAGE_REDO_LOG;
  mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
  return continueExec;
}

static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
683
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
684

S
Shengliang Guan 已提交
685
  if (code == 0) {
S
Shengliang Guan 已提交
686 687 688
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_REDO_ACTION;
    mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
S
Shengliang Guan 已提交
689
  } else {
S
Shengliang Guan 已提交
690 691 692
    pTrans->code = terrno;
    pTrans->stage = TRN_STAGE_UNDO_LOG;
    mError("trans:%d, stage from redoLog to undoLog", pTrans->id);
S
Shengliang Guan 已提交
693
  }
S
Shengliang Guan 已提交
694 695

  return continueExec;
S
Shengliang Guan 已提交
696 697
}

S
Shengliang Guan 已提交
698 699
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
700
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
701 702

  if (code == 0) {
S
Shengliang Guan 已提交
703
    pTrans->code = 0;
S
Shengliang Guan 已提交
704
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
705 706
    mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
707
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
708 709
    mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
    continueExec = false;
S
Shengliang Guan 已提交
710
  } else {
S
Shengliang Guan 已提交
711
    pTrans->code = terrno;
S
Shengliang Guan 已提交
712
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
713 714 715
      pTrans->stage = TRN_STAGE_UNDO_ACTION;
      mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr());
      continueExec = true;
S
Shengliang Guan 已提交
716
    } else {
S
Shengliang Guan 已提交
717 718 719
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
S
Shengliang Guan 已提交
720 721 722
    }
  }

S
Shengliang Guan 已提交
723
  return continueExec;
S
Shengliang Guan 已提交
724 725
}

S
Shengliang Guan 已提交
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransCommit(pMnode, pTrans);

  if (code == 0) {
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_COMMIT_LOG;
    mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
    continueExec = true;
  } else {
    pTrans->code = terrno;
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_REDO_ACTION;
      mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
             pTrans->failedTimes);
      continueExec = true;
    } else {
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
    }
  }

  return continueExec;
S
Shengliang Guan 已提交
750 751
}

S
Shengliang Guan 已提交
752 753 754
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
755 756

  if (code == 0) {
S
Shengliang Guan 已提交
757 758 759 760
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
761
  } else {
S
Shengliang Guan 已提交
762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
    pTrans->code = terrno;
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
    continueExec = false;
    ;
  }

  return continueExec;
}

static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);

  if (code == 0) {
S
Shengliang Guan 已提交
777
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818
    mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
    continueExec = true;
  } else {
    mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_REDO_LOG;
    mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
    continueExec = true;
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
    mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
    continueExec = false;
  } else {
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransRollback(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from rollback to finished", pTrans->id);
    continueExec = true;
    ;
  } else {
    pTrans->failedTimes++;
S
Shengliang Guan 已提交
819
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
820
    continueExec = false;
S
Shengliang Guan 已提交
821 822
  }

S
Shengliang Guan 已提交
823 824 825 826 827 828 829 830
  return continueExec;
}

static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = false;

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
831
    mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
832 833 834 835 836 837 838 839
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  int32_t code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
  }

S
Shengliang Guan 已提交
840
  mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
S
Shengliang Guan 已提交
841
  return continueExec;
S
Shengliang Guan 已提交
842
}
S
Shengliang Guan 已提交
843

S
Shengliang Guan 已提交
844
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
845
  bool continueExec = true;
S
Shengliang Guan 已提交
846

S
Shengliang Guan 已提交
847
  while (continueExec) {
S
Shengliang Guan 已提交
848 849
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
S
Shengliang Guan 已提交
850 851 852 853 854 855 856 857 858 859
        continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_LOG:
        continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_ACTION:
        continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_UNDO_LOG:
        continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
860
        break;
S
Shengliang Guan 已提交
861 862 863 864 865
      case TRN_STAGE_UNDO_ACTION:
        continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT_LOG:
        continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
866 867
        break;
      case TRN_STAGE_COMMIT:
S
Shengliang Guan 已提交
868
        continueExec = mndTransPerformCommitStage(pMnode, pTrans);
S
Shengliang Guan 已提交
869 870
        break;
      case TRN_STAGE_ROLLBACK:
S
Shengliang Guan 已提交
871 872 873 874
        continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
        break;
      case TRN_STAGE_FINISHED:
        continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
S
Shengliang Guan 已提交
875
        break;
S
Shengliang Guan 已提交
876
      default:
S
Shengliang Guan 已提交
877 878
        continueExec = false;
        break;
S
Shengliang Guan 已提交
879 880 881
    }
  }

S
Shengliang Guan 已提交
882 883 884
  if (pTrans->stage == TRN_STAGE_FINISHED) {
    mndTransSendRpcRsp(pTrans);
  }
S
Shengliang Guan 已提交
885
}
S
Shengliang Guan 已提交
886 887 888 889 890 891 892 893 894 895 896 897 898

static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) {
  SMnode *pMnode = pMsg->pMnode;
  STrans *pTrans = NULL;
  void   *pIter = NULL;

  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
    if (pIter == NULL) break;

    mndTransExecute(pMnode, pTrans);
    sdbRelease(pMnode->pSdb, pTrans);
  }
899 900 901
}

static int32_t mndProcessTransRsp(SMnodeMsg *pMsg) { return 0; }