mndTrans.c 32.6 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define MND_TRANS_VER_NUMBER 1
#define MND_TRANS_ARRAY_SIZE 8
#define MND_TRANS_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
27
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
S
Shengliang Guan 已提交
28 29
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
30
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
31
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
S
Shengliang Guan 已提交
32 33
static void    mndTransDropLogs(SArray *pArray);
static void    mndTransDropActions(SArray *pArray);
34
static void    mndTransDropData(STrans *pTrans);
S
Shengliang Guan 已提交
35
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
36
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
S
Shengliang Guan 已提交
37 38 39 40
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
41 42 43 44 45 46 47 48 49 50 51
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool    mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
52
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);
S
Shengliang Guan 已提交
53
static void    mndTransSendRpcRsp(STrans *pTrans);
S
Shengliang Guan 已提交
54
static int32_t mndProcessTransReq(SMnodeMsg *pMsg);
S
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

S
Shengliang Guan 已提交
65
  mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
S
Shengliang Guan 已提交
66 67 68 69 70 71
  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
72 73
  terrno = TSDB_CODE_OUT_OF_MEMORY;

S
Shengliang Guan 已提交
74
  int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE;
S
Shengliang Guan 已提交
75 76 77 78 79 80
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

81
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
82
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
83
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
84 85
  }

86
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
87
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
88
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
89 90
  }

91
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
92
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
93
    rawDataLen += (sdbGetRawTotalSize(pTmp) + 4);
S
Shengliang Guan 已提交
94 95
  }

S
Shengliang Guan 已提交
96 97
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
S
Shengliang Guan 已提交
98
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
99 100 101 102
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
S
Shengliang Guan 已提交
103
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
104 105
  }

S
Shengliang Guan 已提交
106
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, MND_TRANS_VER_NUMBER, rawDataLen);
S
Shengliang Guan 已提交
107
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
108
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
109 110 111 112
    return NULL;
  }

  int32_t dataPos = 0;
113 114
  SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
115
  SDB_SET_INT8(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
116 117 118 119 120
  SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
121

122
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
123
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
124
    int32_t  len = sdbGetRawTotalSize(pTmp);
125 126
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
127 128
  }

129
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
130
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
131
    int32_t  len = sdbGetRawTotalSize(pTmp);
132 133
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
134 135
  }

136
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
137
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
138
    int32_t  len = sdbGetRawTotalSize(pTmp);
139 140
    SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
141 142
  }

S
Shengliang Guan 已提交
143 144
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
145 146
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
147
    SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
148 149
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
S
Shengliang Guan 已提交
150 151 152 153
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
154 155
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
    SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
156
    SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
157 158 159 160 161 162 163 164 165 166 167 168 169 170
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
  }

  SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
  SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)

  terrno = 0;

TRANS_ENCODE_OVER:
  if (terrno != 0) {
    mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
    sdbFreeRaw(pRaw);
    return NULL;
S
Shengliang Guan 已提交
171 172
  }

S
Shengliang Guan 已提交
173
  mTrace("trans:%d, encode to raw:%p, row:%p len:%d", pTrans->id, pRaw, pTrans, dataPos);
S
Shengliang Guan 已提交
174 175 176
  return pRaw;
}

S
Shengliang Guan 已提交
177
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
  terrno = TSDB_CODE_OUT_OF_MEMORY;

  SSdbRow     *pRow = NULL;
  STrans      *pTrans = NULL;
  char        *pData = NULL;
  int32_t      dataLen = 0;
  int8_t       sver = 0;
  int32_t      redoLogNum = 0;
  int32_t      undoLogNum = 0;
  int32_t      commitLogNum = 0;
  int32_t      redoActionNum = 0;
  int32_t      undoActionNum = 0;
  int32_t      dataPos = 0;
  STransAction action = {0};

  if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
194

S
Shengliang Guan 已提交
195
  if (sver != MND_TRANS_VER_NUMBER) {
S
Shengliang Guan 已提交
196
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
197
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
198 199
  }

200 201 202 203 204
  pRow = sdbAllocRow(sizeof(STrans));
  if (pRow == NULL) goto TRANS_DECODE_OVER;

  pTrans = sdbGetRowObj(pRow);
  if (pTrans == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
205

S
Shengliang Guan 已提交
206 207 208 209 210
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
211

212 213 214 215 216
  if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
  if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
217

218 219
  SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
  SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
220
  SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->stage, TRANS_DECODE_OVER)
221 222 223 224 225
  SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &commitLogNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
  SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
226

227
  for (int32_t i = 0; i < redoLogNum; ++i) {
228 229 230
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
231
    mTrace("raw:%p, is created", pData);
232 233 234
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
235 236
  }

S
Shengliang Guan 已提交
237
  for (int32_t i = 0; i < undoLogNum; ++i) {
238 239 240
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
    if (pData == NULL) goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
241
    mTrace("raw:%p, is created", pData);
242 243 244
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
245 246 247
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
248 249
    SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
    pData = malloc(dataLen);
S
Shengliang Guan 已提交
250 251
    if (pData == NULL) goto TRANS_DECODE_OVER;
    mTrace("raw:%p, is created", pData);
252 253 254
    SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto TRANS_DECODE_OVER;
    pData = NULL;
S
Shengliang Guan 已提交
255 256 257
  }

  for (int32_t i = 0; i < redoActionNum; ++i) {
258 259
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
260
    SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
261
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
262
    action.pCont = malloc(action.contLen);
263 264 265 266
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
267 268 269
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
270 271
    SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
    SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
272
    SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
273
    SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
S
Shengliang Guan 已提交
274
    action.pCont = malloc(action.contLen);
275 276 277 278
    if (action.pCont == NULL) goto TRANS_DECODE_OVER;
    SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, TRANS_DECODE_OVER);
    if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto TRANS_DECODE_OVER;
    action.pCont = NULL;
S
Shengliang Guan 已提交
279 280
  }

281 282 283
  SDB_GET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE, TRANS_DECODE_OVER)

  terrno = 0;
S
Shengliang Guan 已提交
284

S
Shengliang Guan 已提交
285
TRANS_DECODE_OVER:
286 287 288 289 290 291
  if (terrno != 0) {
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
    mndTransDropData(pTrans);
    tfree(pRow);
    tfree(pData);
    tfree(action.pCont);
S
Shengliang Guan 已提交
292 293 294
    return NULL;
  }

S
Shengliang Guan 已提交
295
  mTrace("trans:%d, decode from raw:%p, row:%p", pTrans->id, pRaw, pTrans);
S
Shengliang Guan 已提交
296 297 298
  return pRow;
}

S
Shengliang Guan 已提交
299
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
300
  pTrans->stage = TRN_STAGE_PREPARE;
S
Shengliang Guan 已提交
301
  mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
302 303 304
  return 0;
}

305
static void mndTransDropData(STrans *pTrans) {
S
Shengliang Guan 已提交
306 307 308 309 310
  mndTransDropLogs(pTrans->redoLogs);
  mndTransDropLogs(pTrans->undoLogs);
  mndTransDropLogs(pTrans->commitLogs);
  mndTransDropActions(pTrans->redoActions);
  mndTransDropActions(pTrans->undoActions);
S
Shengliang Guan 已提交
311 312 313 314 315
  if (pTrans->rpcRsp != NULL) {
    rpcFreeCont(pTrans->rpcRsp);
    pTrans->rpcRsp = NULL;
    pTrans->rpcRspLen = 0;
  }
316
}
S
Shengliang Guan 已提交
317

318
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
319
  mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans);
320
  mndTransDropData(pTrans);
S
Shengliang Guan 已提交
321 322 323
  return 0;
}

S
Shengliang Guan 已提交
324
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
325 326
  if (pNew->stage == TRN_STAGE_COMMIT) pNew->stage = TRN_STAGE_COMMIT_LOG;

S
Shengliang Guan 已提交
327 328 329
  mTrace("trans:%d, perform update action, old row:%p stage:%d, new row:%p stage:%d", pOld->id, pOld, pOld->stage, pNew,
         pNew->stage);
  pOld->stage = pNew->stage;
S
Shengliang Guan 已提交
330 331 332
  return 0;
}

S
Shengliang Guan 已提交
333
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
S
Shengliang Guan 已提交
334 335 336 337 338 339
  SSdb   *pSdb = pMnode->pSdb;
  STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
  if (pTrans == NULL) {
    terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
  }
  return pTrans;
S
Shengliang Guan 已提交
340 341 342 343 344 345 346
}

void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

S
Shengliang Guan 已提交
347
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
S
Shengliang Guan 已提交
348 349 350 351 352 353 354
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
355
  pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
S
Shengliang Guan 已提交
356 357
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
S
Shengliang Guan 已提交
358 359
  pTrans->rpcHandle = pReq->handle;
  pTrans->rpcAHandle = pReq->ahandle;
S
Shengliang Guan 已提交
360 361 362 363 364
  pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
365 366 367 368 369 370 371 372

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
373
  mDebug("trans:%d, is created, data:%p", pTrans->id, pTrans);
S
Shengliang Guan 已提交
374 375 376
  return pTrans;
}

S
Shengliang Guan 已提交
377
static void mndTransDropLogs(SArray *pArray) {
378
  if (pArray == NULL) return;
379
  for (int32_t i = 0; i < pArray->size; ++i) {
S
Shengliang Guan 已提交
380
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
381
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
382 383 384 385 386
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
387
static void mndTransDropActions(SArray *pArray) {
388
  if (pArray == NULL) return;
S
Shengliang Guan 已提交
389 390
  for (int32_t i = 0; i < pArray->size; ++i) {
    STransAction *pAction = taosArrayGet(pArray, i);
S
Shengliang Guan 已提交
391
    free(pAction->pCont);
S
Shengliang Guan 已提交
392 393 394 395 396
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
397
void mndTransDrop(STrans *pTrans) {
S
Shengliang 已提交
398 399 400 401 402
  if (pTrans != NULL) {
    mndTransDropData(pTrans);
    mDebug("trans:%d, is dropped, data:%p", pTrans->id, pTrans);
    tfree(pTrans);
  }
S
Shengliang Guan 已提交
403 404
}

S
Shengliang Guan 已提交
405
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
406 407 408 409 410
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
411
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
412 413 414 415 416 417 418 419
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
420
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
S
Shengliang Guan 已提交
421

S
Shengliang Guan 已提交
422
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
S
Shengliang Guan 已提交
423

S
Shengliang Guan 已提交
424
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
S
Shengliang Guan 已提交
425

S
Shengliang Guan 已提交
426
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
427
  void *ptr = taosArrayPush(pArray, pAction);
S
Shengliang Guan 已提交
428 429 430 431 432 433 434 435
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
436
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
437
  return mndTransAppendAction(pTrans->redoActions, pAction);
S
Shengliang Guan 已提交
438 439
}

S
Shengliang Guan 已提交
440
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
S
Shengliang Guan 已提交
441
  return mndTransAppendAction(pTrans->undoActions, pAction);
S
Shengliang Guan 已提交
442 443
}

S
Shengliang Guan 已提交
444 445 446 447 448
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen) {
  pTrans->rpcRsp = pCont;
  pTrans->rpcRspLen = contLen;
}

S
Shengliang Guan 已提交
449
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
450
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
451
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
452
    mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
453 454
    return -1;
  }
S
Shengliang Guan 已提交
455
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
456

S
Shengliang Guan 已提交
457
  mDebug("trans:%d, sync to other nodes", pTrans->id);
S
Shengliang Guan 已提交
458 459 460 461
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
462 463 464
    return -1;
  }

S
Shengliang Guan 已提交
465
  mDebug("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
466

S
Shengliang Guan 已提交
467 468 469 470 471 472
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

S
Shengliang Guan 已提交
473 474 475 476 477 478 479 480 481 482 483
  return 0;
}

int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, prepare transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
    return -1;
  }
  mDebug("trans:%d, prepare finished", pTrans->id);

S
Shengliang Guan 已提交
484 485
  STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
  if (pNew == NULL) {
S
Shengliang Guan 已提交
486
    mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
487 488 489
    return -1;
  }

S
Shengliang Guan 已提交
490 491
  pNew->rpcHandle = pTrans->rpcHandle;
  pNew->rpcAHandle = pTrans->rpcAHandle;
S
Shengliang Guan 已提交
492 493 494 495 496
  pNew->rpcRsp = pTrans->rpcRsp;
  pNew->rpcRspLen = pTrans->rpcRspLen;
  pTrans->rpcRsp = NULL;
  pTrans->rpcRspLen = 0;

S
Shengliang Guan 已提交
497 498
  mndTransExecute(pMnode, pNew);
  mndReleaseTrans(pMnode, pNew);
S
Shengliang Guan 已提交
499 500 501
  return 0;
}

S
Shengliang Guan 已提交
502 503
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
S
Shengliang Guan 已提交
504

S
Shengliang Guan 已提交
505 506 507
  mDebug("trans:%d, commit transaction", pTrans->id);
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
508
    return -1;
S
Shengliang Guan 已提交
509 510
  }
  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
511 512 513
  return 0;
}

S
Shengliang Guan 已提交
514
static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
515
  mDebug("trans:%d, rollback transaction", pTrans->id);
S
Shengliang Guan 已提交
516 517
  if (mndTransSync(pMnode, pTrans) != 0) {
    mError("trans:%d, failed to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
518
    return -1;
S
Shengliang Guan 已提交
519
  }
S
Shengliang Guan 已提交
520 521
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
522
}
S
Shengliang Guan 已提交
523

S
Shengliang Guan 已提交
524
static void mndTransSendRpcRsp(STrans *pTrans) {
525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540
  bool sendRsp = false;

  if (pTrans->stage == TRN_STAGE_FINISHED) {
    sendRsp = true;
  }

  if (pTrans->policy == TRN_POLICY_ROLLBACK) {
    if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
        pTrans->stage == TRN_STAGE_ROLLBACK) {
      sendRsp = true;
    }
  }

  if (pTrans->policy == TRN_POLICY_RETRY) {
    if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) {
      sendRsp = true;
541
    }
S
Shengliang Guan 已提交
542
  }
543 544

  if (sendRsp && pTrans->rpcHandle != NULL) {
S
Shengliang Guan 已提交
545 546 547 548 549 550
    void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
    if (rpcCont != NULL) {
      memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
    }
    free(pTrans->rpcRsp);

551 552
    mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
           pTrans->rpcAHandle);
S
Shengliang Guan 已提交
553 554 555
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle,
                      .code = pTrans->code,
                      .ahandle = pTrans->rpcAHandle,
S
Shengliang Guan 已提交
556
                      .pCont = rpcCont,
S
Shengliang Guan 已提交
557
                      .contLen = pTrans->rpcRspLen};
558 559
    rpcSendResponse(&rspMsg);
    pTrans->rpcHandle = NULL;
S
Shengliang Guan 已提交
560 561
    pTrans->rpcRsp = NULL;
    pTrans->rpcRspLen = 0;
562
  }
S
Shengliang Guan 已提交
563 564
}

S
Shengliang Guan 已提交
565 566 567
void mndTransProcessRsp(SMnodeMsg *pRsp) {
  SMnode *pMnode = pRsp->pMnode;
  int64_t signature = (int64_t)(pRsp->rpcMsg.ahandle);
S
Shengliang Guan 已提交
568 569
  int32_t transId = (int32_t)(signature >> 32);
  int32_t action = (int32_t)((signature << 32) >> 32);
570 571 572 573 574 575 576 577

  STrans *pTrans = mndAcquireTrans(pMnode, transId);
  if (pTrans == NULL) {
    mError("trans:%d, failed to get transId from vnode rsp since %s", transId, terrstr());
    goto HANDLE_ACTION_RSP_OVER;
  }

  SArray *pArray = NULL;
S
Shengliang Guan 已提交
578
  if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
579
    pArray = pTrans->redoActions;
S
Shengliang Guan 已提交
580
  } else if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
581 582
    pArray = pTrans->undoActions;
  } else {
S
Shengliang Guan 已提交
583 584
    mError("trans:%d, invalid trans stage:%d while recv action rsp", pTrans->id, pTrans->stage);
    goto HANDLE_ACTION_RSP_OVER;
585 586 587
  }

  if (pArray == NULL) {
S
Shengliang Guan 已提交
588
    mError("trans:%d, invalid trans stage:%d", transId, pTrans->stage);
589 590 591 592
    goto HANDLE_ACTION_RSP_OVER;
  }

  int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
S
Shengliang Guan 已提交
593
  if (action < 0 || action >= actionNum) {
594 595 596 597 598 599 600
    mError("trans:%d, invalid action:%d", transId, action);
    goto HANDLE_ACTION_RSP_OVER;
  }

  STransAction *pAction = taosArrayGet(pArray, action);
  if (pAction != NULL) {
    pAction->msgReceived = 1;
S
Shengliang Guan 已提交
601
    pAction->errCode = pRsp->rpcMsg.code;
602 603
  }

S
Shengliang Guan 已提交
604
  mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pRsp->rpcMsg.code,
605
         pAction->acceptableCode);
606 607 608 609 610 611
  mndTransExecute(pMnode, pTrans);

HANDLE_ACTION_RSP_OVER:
  mndReleaseTrans(pMnode, pTrans);
}

S
Shengliang Guan 已提交
612
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
S
Shengliang Guan 已提交
613 614 615
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

S
Shengliang Guan 已提交
616 617
  if (arraySize == 0) return 0;

S
Shengliang Guan 已提交
618
  for (int32_t i = 0; i < arraySize; ++i) {
619
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
620 621 622
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
623 624 625 626 627 628
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
629
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
630
  return mndTransExecuteLogs(pMnode, pTrans->redoLogs);
S
Shengliang Guan 已提交
631 632 633
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
634
  return mndTransExecuteLogs(pMnode, pTrans->undoLogs);
S
Shengliang Guan 已提交
635 636 637
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
638
  return mndTransExecuteLogs(pMnode, pTrans->commitLogs);
S
Shengliang Guan 已提交
639
}
S
Shengliang Guan 已提交
640

S
Shengliang Guan 已提交
641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;

    pAction->msgSent = 0;
    pAction->msgReceived = 0;
    pAction->errCode = 0;
    mDebug("trans:%d, action:%d is reset and will be re-executed", pTrans->id, action);
  }
}

static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
657 658 659 660 661
  int32_t numOfActions = taosArrayGetSize(pArray);

  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
S
Shengliang Guan 已提交
662
    if (pAction->msgSent) continue;
S
Shengliang Guan 已提交
663

664 665 666 667 668
    int64_t signature = pTrans->id;
    signature = (signature << 32);
    signature += action;

    SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .ahandle = (void *)signature};
S
Shengliang Guan 已提交
669 670 671 672
    rpcMsg.pCont = rpcMallocCont(pAction->contLen);
    if (rpcMsg.pCont == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
S
Shengliang Guan 已提交
673
    }
S
Shengliang Guan 已提交
674
    memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
675

S
Shengliang Guan 已提交
676 677 678 679 680 681
    if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) {
      mDebug("trans:%d, action:%d is sent", pTrans->id, action);
      pAction->msgSent = 1;
      pAction->msgReceived = 0;
      pAction->errCode = 0;
    } else {
682
      mDebug("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
S
Shengliang Guan 已提交
683 684
      return -1;
    }
S
Shengliang Guan 已提交
685 686
  }

S
Shengliang Guan 已提交
687 688 689 690 691 692 693 694 695 696 697
  return 0;
}

static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
  int32_t numOfActions = taosArrayGetSize(pArray);
  if (numOfActions == 0) return 0;

  if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
    return -1;
  }

S
Shengliang Guan 已提交
698 699
  int32_t numOfReceived = 0;
  int32_t errCode = 0;
700 701 702 703
  for (int32_t action = 0; action < numOfActions; ++action) {
    STransAction *pAction = taosArrayGet(pArray, action);
    if (pAction == NULL) continue;
    if (pAction->msgSent && pAction->msgReceived) {
S
Shengliang Guan 已提交
704
      numOfReceived++;
705
      if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
S
Shengliang Guan 已提交
706
        errCode = pAction->errCode;
707 708 709 710
      }
    }
  }

S
Shengliang Guan 已提交
711
  if (numOfReceived == numOfActions) {
S
Shengliang Guan 已提交
712 713 714 715 716 717 718 719 720
    if (errCode == 0) {
      mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
      return 0;
    } else {
      mError("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode);
      mndTransResetActions(pMnode, pTrans, pArray);
      terrno = errCode;
      return errCode;
    }
721
  } else {
S
Shengliang Guan 已提交
722
    mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode);
723 724
    return TSDB_CODE_MND_ACTION_IN_PROGRESS;
  }
S
Shengliang Guan 已提交
725 726
}

S
Shengliang Guan 已提交
727
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
728
  return mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions);
S
Shengliang Guan 已提交
729
}
S
Shengliang Guan 已提交
730

S
Shengliang Guan 已提交
731
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
732
  return mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions);
S
Shengliang Guan 已提交
733
}
S
Shengliang Guan 已提交
734

S
Shengliang Guan 已提交
735 736 737 738 739 740 741 742 743
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = true;
  pTrans->stage = TRN_STAGE_REDO_LOG;
  mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
  return continueExec;
}

static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
744
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
745

S
Shengliang Guan 已提交
746
  if (code == 0) {
S
Shengliang Guan 已提交
747 748 749
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_REDO_ACTION;
    mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
S
Shengliang Guan 已提交
750
  } else {
S
Shengliang Guan 已提交
751 752
    pTrans->code = terrno;
    pTrans->stage = TRN_STAGE_UNDO_LOG;
753
    mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
754
  }
S
Shengliang Guan 已提交
755 756

  return continueExec;
S
Shengliang Guan 已提交
757 758
}

S
Shengliang Guan 已提交
759 760
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
S
Shengliang Guan 已提交
761
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
762 763

  if (code == 0) {
S
Shengliang Guan 已提交
764
    pTrans->code = 0;
S
Shengliang Guan 已提交
765
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
766 767
    mDebug("trans:%d, stage from redoAction to commit", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
768
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
769 770
    mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
    continueExec = false;
S
Shengliang Guan 已提交
771
  } else {
S
Shengliang Guan 已提交
772
    pTrans->code = terrno;
S
Shengliang Guan 已提交
773
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
774 775 776
      pTrans->stage = TRN_STAGE_UNDO_ACTION;
      mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr());
      continueExec = true;
S
Shengliang Guan 已提交
777
    } else {
S
Shengliang Guan 已提交
778 779 780
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
S
Shengliang Guan 已提交
781 782 783
    }
  }

S
Shengliang Guan 已提交
784
  return continueExec;
S
Shengliang Guan 已提交
785 786
}

S
Shengliang Guan 已提交
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransCommit(pMnode, pTrans);

  if (code == 0) {
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_COMMIT_LOG;
    mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
    continueExec = true;
  } else {
    pTrans->code = terrno;
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_REDO_ACTION;
      mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
             pTrans->failedTimes);
      continueExec = true;
    } else {
      pTrans->failedTimes++;
      mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
      continueExec = false;
    }
  }

  return continueExec;
S
Shengliang Guan 已提交
811 812
}

S
Shengliang Guan 已提交
813 814 815
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
816 817

  if (code == 0) {
S
Shengliang Guan 已提交
818 819 820 821
    pTrans->code = 0;
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
    continueExec = true;
S
Shengliang Guan 已提交
822
  } else {
S
Shengliang Guan 已提交
823 824 825 826 827 828 829 830 831 832 833 834 835 836
    pTrans->code = terrno;
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);

  if (code == 0) {
S
Shengliang Guan 已提交
837
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
838 839 840
    mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
    continueExec = true;
  } else {
S
Shengliang Guan 已提交
841
    mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
842 843 844 845 846 847 848 849 850 851 852
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
853
    pTrans->stage = TRN_STAGE_UNDO_LOG;
S
Shengliang Guan 已提交
854 855 856
    mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
    continueExec = true;
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
857
    mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
S
Shengliang Guan 已提交
858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877
    continueExec = false;
  } else {
    pTrans->failedTimes++;
    mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
    continueExec = false;
  }

  return continueExec;
}

static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  bool    continueExec = true;
  int32_t code = mndTransRollback(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_FINISHED;
    mDebug("trans:%d, stage from rollback to finished", pTrans->id);
    continueExec = true;
  } else {
    pTrans->failedTimes++;
S
Shengliang Guan 已提交
878
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
879
    continueExec = false;
S
Shengliang Guan 已提交
880 881
  }

S
Shengliang Guan 已提交
882 883 884 885 886 887 888 889
  return continueExec;
}

static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
  bool continueExec = false;

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
890
    mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
891 892 893 894 895 896 897 898
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  int32_t code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
  }

S
Shengliang Guan 已提交
899
  mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
S
Shengliang Guan 已提交
900
  return continueExec;
S
Shengliang Guan 已提交
901
}
S
Shengliang Guan 已提交
902

S
Shengliang Guan 已提交
903
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
904
  bool continueExec = true;
S
Shengliang Guan 已提交
905

S
Shengliang Guan 已提交
906
  while (continueExec) {
S
Shengliang Guan 已提交
907 908
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
S
Shengliang Guan 已提交
909 910 911 912 913 914 915 916 917 918
        continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_LOG:
        continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
        break;
      case TRN_STAGE_REDO_ACTION:
        continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_UNDO_LOG:
        continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
919
        break;
S
Shengliang Guan 已提交
920 921 922 923 924
      case TRN_STAGE_UNDO_ACTION:
        continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT_LOG:
        continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
S
Shengliang Guan 已提交
925 926
        break;
      case TRN_STAGE_COMMIT:
S
Shengliang Guan 已提交
927
        continueExec = mndTransPerformCommitStage(pMnode, pTrans);
S
Shengliang Guan 已提交
928 929
        break;
      case TRN_STAGE_ROLLBACK:
S
Shengliang Guan 已提交
930 931 932 933
        continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
        break;
      case TRN_STAGE_FINISHED:
        continueExec = mndTransPerfromFinishedStage(pMnode, pTrans);
S
Shengliang Guan 已提交
934
        break;
S
Shengliang Guan 已提交
935
      default:
S
Shengliang Guan 已提交
936 937
        continueExec = false;
        break;
S
Shengliang Guan 已提交
938 939 940
    }
  }

941
  mndTransSendRpcRsp(pTrans);
S
Shengliang Guan 已提交
942
}
S
Shengliang Guan 已提交
943

S
Shengliang Guan 已提交
944 945
static int32_t mndProcessTransReq(SMnodeMsg *pReq) {
  mndTransPullup(pReq->pMnode);
S
Shengliang Guan 已提交
946 947 948 949
  return 0;
}

void mndTransPullup(SMnode *pMnode) {
S
Shengliang Guan 已提交
950
  STrans *pTrans = NULL;
S
Shengliang Guan 已提交
951
  void   *pIter = NULL;
S
Shengliang Guan 已提交
952 953 954 955 956 957 958 959

  while (1) {
    pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
    if (pIter == NULL) break;

    mndTransExecute(pMnode, pTrans);
    sdbRelease(pMnode->pSdb, pTrans);
  }
S
Shengliang Guan 已提交
960 961

  sdbWriteFile(pMnode->pSdb);
962
}