mndTrans.c 25.9 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define TSDB_TRANS_VER 1
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25
typedef struct {
  SEpSet  epSet;
S
Shengliang Guan 已提交
26 27 28
  int8_t  msgType;
  int32_t contLen;
  void   *pCont;
S
Shengliang Guan 已提交
29 30
} STransAction;

S
Shengliang Guan 已提交
31 32 33
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
34
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
S
Shengliang Guan 已提交
35 36
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
37 38
static void    mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
static void    mndTransSendRpcRsp(STrans *pTrans, int32_t code);
S
Shengliang Guan 已提交
39
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
S
Shengliang Guan 已提交
40 41 42 43 44
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
static void    mndTransDropLogs(SArray *pArray);
static void    mndTransDropActions(SArray *pArray);
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray);
S
Shengliang Guan 已提交
45 46 47 48 49 50 51 52 53 54 55
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
S
Shengliang Guan 已提交
71
  int32_t rawDataLen = sizeof(STrans) + TSDB_TRN_RESERVE_SIZE;
S
Shengliang Guan 已提交
72 73 74 75 76 77
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

78
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
79
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
80 81 82
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

83
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
84
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
85 86 87
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

88
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
89
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
90 91 92
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

S
Shengliang Guan 已提交
93 94
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
S
Shengliang Guan 已提交
95
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
96 97 98 99
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
S
Shengliang Guan 已提交
100
    rawDataLen += (sizeof(STransAction) + pAction->contLen);
S
Shengliang Guan 已提交
101 102
  }

S
Shengliang Guan 已提交
103
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen);
S
Shengliang Guan 已提交
104
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
105
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
106 107 108 109 110 111 112 113 114 115 116 117
    return NULL;
  }

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, pTrans->id)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
  SDB_SET_INT32(pRaw, dataPos, redoLogNum)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum)

118
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
119
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
120 121 122 123 124
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

125
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
126
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
127 128 129 130 131
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

132
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
133
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
134 135 136 137 138
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

S
Shengliang Guan 已提交
139 140 141
  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
142 143 144
    SDB_SET_INT8(pRaw, dataPos, pAction->msgType)
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
    SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen);
S
Shengliang Guan 已提交
145 146 147 148 149
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
    SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
150 151 152
    SDB_SET_INT8(pRaw, dataPos, pAction->msgType)
    SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen);
S
Shengliang Guan 已提交
153 154
  }

S
Shengliang Guan 已提交
155 156
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE)
  SDB_SET_DATALEN(pRaw, dataPos);
S
Shengliang Guan 已提交
157
  mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
S
Shengliang Guan 已提交
158 159 160
  return pRaw;
}

S
Shengliang Guan 已提交
161 162 163
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
  int32_t code = 0;

S
Shengliang Guan 已提交
164 165 166 167 168 169
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
170
  if (sver != TSDB_TRANS_VER) {
S
Shengliang Guan 已提交
171 172 173 174 175
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
176 177
  SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
  STrans  *pTrans = sdbGetRowObj(pRow);
S
Shengliang Guan 已提交
178 179 180 181 182
  if (pTrans == NULL) {
    mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
183 184 185
  pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
S
Shengliang Guan 已提交
186 187
  pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
188 189 190

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
S
Shengliang Guan 已提交
191 192 193
    mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
  }

  int32_t redoLogNum = 0;
  int32_t undoLogNum = 0;
  int32_t commitLogNum = 0;
  int32_t redoActionNum = 0;
  int32_t undoActionNum = 0;

  int32_t dataPos = 0;
  SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
  SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
  SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)

211
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
212 213 214 215
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
S
Shengliang Guan 已提交
216

S
Shengliang Guan 已提交
217
    void *ret = taosArrayPush(pTrans->redoLogs, &pData);
S
Shengliang Guan 已提交
218 219
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
220
      goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
221 222 223
    }
  }

S
Shengliang Guan 已提交
224 225 226 227 228
  for (int32_t i = 0; i < undoLogNum; ++i) {
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
S
Shengliang Guan 已提交
229

S
Shengliang Guan 已提交
230 231 232 233 234 235 236 237 238 239 240 241
    void *ret = taosArrayPush(pTrans->undoLogs, &pData);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
    }
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
S
Shengliang Guan 已提交
242

S
Shengliang Guan 已提交
243 244 245 246
    void *ret = taosArrayPush(pTrans->commitLogs, &pData);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
247 248 249 250 251 252
    }
  }

  for (int32_t i = 0; i < redoActionNum; ++i) {
    STransAction action = {0};
    SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
253 254 255 256
    SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType)
    SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
    action.pCont = malloc(action.contLen);
    if (action.pCont == NULL) {
S
Shengliang Guan 已提交
257 258 259
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
    }
S
Shengliang Guan 已提交
260
    SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen);
S
Shengliang Guan 已提交
261 262 263 264 265 266 267 268 269 270 271

    void *ret = taosArrayPush(pTrans->redoActions, &action);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
    }
  }

  for (int32_t i = 0; i < undoActionNum; ++i) {
    STransAction action = {0};
    SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
S
Shengliang Guan 已提交
272 273 274 275
    SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType)
    SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
    action.pCont = malloc(action.contLen);
    if (action.pCont == NULL) {
S
Shengliang Guan 已提交
276 277 278
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
    }
S
Shengliang Guan 已提交
279
    SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen);
S
Shengliang Guan 已提交
280 281 282 283 284

    void *ret = taosArrayPush(pTrans->undoActions, &action);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
285 286 287
    }
  }

S
Shengliang Guan 已提交
288 289
  SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TRN_RESERVE_SIZE)

S
Shengliang Guan 已提交
290
TRANS_DECODE_OVER:
S
Shengliang Guan 已提交
291
  if (code != 0) {
S
Shengliang Guan 已提交
292
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno));
S
Shengliang Guan 已提交
293
    mndTransDrop(pTrans);
S
Shengliang Guan 已提交
294
    terrno = code;
S
Shengliang Guan 已提交
295 296 297
    return NULL;
  }

S
Shengliang Guan 已提交
298
  mTrace("trans:%d, decode from raw:%p", pTrans->id, pRaw);
S
Shengliang Guan 已提交
299 300 301
  return pRow;
}

S
Shengliang Guan 已提交
302
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
303
  pTrans->stage = TRN_STAGE_PREPARE;
S
Shengliang Guan 已提交
304
  mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
S
Shengliang Guan 已提交
305 306 307
  return 0;
}

S
Shengliang Guan 已提交
308
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
309
  mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
S
Shengliang Guan 已提交
310

S
Shengliang Guan 已提交
311 312 313 314 315
  mndTransDropLogs(pTrans->redoLogs);
  mndTransDropLogs(pTrans->undoLogs);
  mndTransDropLogs(pTrans->commitLogs);
  mndTransDropActions(pTrans->redoActions);
  mndTransDropActions(pTrans->undoActions);
S
Shengliang Guan 已提交
316 317 318 319

  return 0;
}

S
Shengliang Guan 已提交
320
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
S
Shengliang Guan 已提交
321
  mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage));
S
Shengliang Guan 已提交
322
  pOldTrans->stage = pNewTrans->stage;
S
Shengliang Guan 已提交
323 324 325
  return 0;
}

S
Shengliang Guan 已提交
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
  SSdb *pSdb = pMnode->pSdb;
  return sdbAcquire(pSdb, SDB_TRANS, &transId);
}

void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

char *mndTransStageStr(ETrnStage stage) {
  switch (stage) {
    case TRN_STAGE_PREPARE:
      return "prepare";
    case TRN_STAGE_EXECUTE:
      return "execute";
    case TRN_STAGE_COMMIT:
      return "commit";
    case TRN_STAGE_ROLLBACK:
      return "rollback";
    case TRN_STAGE_RETRY:
      return "retry";
S
Shengliang Guan 已提交
348 349
    case TRN_STAGE_OVER:
      return "stop";
S
Shengliang Guan 已提交
350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
    default:
      return "undefined";
  }
}

char *mndTransPolicyStr(ETrnPolicy policy) {
  switch (policy) {
    case TRN_POLICY_ROLLBACK:
      return "prepare";
    case TRN_POLICY_RETRY:
      return "retry";
    default:
      return "undefined";
  }
}

S
Shengliang Guan 已提交
366
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
S
Shengliang Guan 已提交
367 368 369 370 371 372 373
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
374
  pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
S
Shengliang Guan 已提交
375 376 377
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
  pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
378 379 380
  pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
S
Shengliang Guan 已提交
381 382
  pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
  pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction));
S
Shengliang Guan 已提交
383 384 385 386 387 388 389 390

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
391
  mDebug("trans:%d, data:%p is created", pTrans->id, pTrans);
S
Shengliang Guan 已提交
392 393 394
  return pTrans;
}

S
Shengliang Guan 已提交
395
static void mndTransDropLogs(SArray *pArray) {
396
  for (int32_t i = 0; i < pArray->size; ++i) {
S
Shengliang Guan 已提交
397
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
398 399 400 401 402 403
    tfree(pRaw);
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
404
static void mndTransDropActions(SArray *pArray) {
S
Shengliang Guan 已提交
405 406
  for (int32_t i = 0; i < pArray->size; ++i) {
    STransAction *pAction = taosArrayGet(pArray, i);
S
Shengliang Guan 已提交
407
    free(pAction->pCont);
S
Shengliang Guan 已提交
408 409 410 411 412
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
413
void mndTransDrop(STrans *pTrans) {
S
Shengliang Guan 已提交
414 415 416 417 418
  mndTransDropLogs(pTrans->redoLogs);
  mndTransDropLogs(pTrans->undoLogs);
  mndTransDropLogs(pTrans->commitLogs);
  mndTransDropActions(pTrans->redoActions);
  mndTransDropActions(pTrans->undoActions);
S
Shengliang Guan 已提交
419

S
Shengliang Guan 已提交
420
  mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
S
Shengliang Guan 已提交
421 422 423
  tfree(pTrans);
}

S
Shengliang Guan 已提交
424
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
S
Shengliang Guan 已提交
425
  pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
426
  mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
S
Shengliang Guan 已提交
427 428
}

S
Shengliang Guan 已提交
429
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
430 431 432 433 434
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
435
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
436 437 438 439 440 441 442 443
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
444
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
445
  int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw);
S
Shengliang Guan 已提交
446
  mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
447 448 449
  return code;
}

S
Shengliang Guan 已提交
450
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
451
  int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw);
S
Shengliang Guan 已提交
452
  mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
453 454 455
  return code;
}

S
Shengliang Guan 已提交
456
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
457
  int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw);
S
Shengliang Guan 已提交
458
  mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
459 460 461
  return code;
}

S
Shengliang Guan 已提交
462 463
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
  STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont};
S
Shengliang Guan 已提交
464 465 466 467 468 469 470 471 472 473

  void *ptr = taosArrayPush(pArray, &action);
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
474 475 476
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
  int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont);
  mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen);
S
Shengliang Guan 已提交
477 478 479
  return code;
}

S
Shengliang Guan 已提交
480 481 482
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
  int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont);
  mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen);
S
Shengliang Guan 已提交
483 484 485
  return code;
}

S
Shengliang Guan 已提交
486
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
487
  mDebug("trans:%d, prepare transaction", pTrans->id);
S
Shengliang Guan 已提交
488

S
Shengliang Guan 已提交
489
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
490
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
491
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
492 493
    return -1;
  }
S
Shengliang Guan 已提交
494
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
495

S
Shengliang Guan 已提交
496 497 498 499 500
  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
501 502 503
    return -1;
  }

S
Shengliang Guan 已提交
504
  mTrace("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
505

S
Shengliang Guan 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
  if (pNewTrans == NULL) {
    mError("trans:%d, failed to ready from sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  mDebug("trans:%d, prepare finished", pNewTrans->id);
S
Shengliang Guan 已提交
519
  pNewTrans->rpcHandle = pTrans->rpcHandle;
S
Shengliang Guan 已提交
520 521 522 523 524 525 526 527 528 529 530 531 532 533 534
  mndTransExecute(pMnode, pNewTrans);
  mndReleaseTrans(pMnode, pNewTrans);
  return 0;
}

int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, commit transaction", pTrans->id);

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

S
Shengliang Guan 已提交
535 536 537 538 539 540 541 542
  if (taosArrayGetSize(pTrans->commitLogs) != 0) {
    mTrace("trans:%d, start sync", pTrans->id);
    int32_t code = mndSyncPropose(pMnode, pRaw);
    if (code != 0) {
      mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
      sdbFreeRaw(pRaw);
      return -1;
    }
S
Shengliang Guan 已提交
543

S
Shengliang Guan 已提交
544 545 546 547 548 549
    mTrace("trans:%d, sync finished", pTrans->id);
    code = sdbWrite(pMnode->pSdb, pRaw);
    if (code != 0) {
      mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
550 551 552
  }

  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
553 554 555
  return 0;
}

S
Shengliang Guan 已提交
556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, rollback transaction", pTrans->id);

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
    return -1;
  }

  mTrace("trans:%d, sync finished", pTrans->id);
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
S
Shengliang Guan 已提交
579 580
  }

S
Shengliang Guan 已提交
581 582
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
583
}
S
Shengliang Guan 已提交
584

S
Shengliang Guan 已提交
585 586 587 588 589 590 591
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) {
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
  mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF);

  if (pTrans->rpcHandle != NULL) {
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code};
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
592 593 594
  }
}

S
Shengliang Guan 已提交
595 596 597 598
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) {
  // todo
}

S
Shengliang Guan 已提交
599
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
S
Shengliang Guan 已提交
600 601 602 603
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

  for (int32_t i = 0; i < arraySize; ++i) {
604
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
605 606 607
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
608 609 610 611 612 613
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
614
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
615 616
  int32_t code = 0;
  if (taosArrayGetSize(pTrans->redoLogs) != 0) {
S
Shengliang Guan 已提交
617
    code = mndTransExecuteLogs(pMnode, pTrans->redoLogs);
S
Shengliang Guan 已提交
618 619 620 621 622
    if (code != 0) {
      mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
    } else {
      mTrace("trans:%d, execute redo logs finished", pTrans->id)
    }
S
Shengliang Guan 已提交
623 624 625 626 627 628
  }

  return code;
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
629 630
  int32_t code = 0;
  if (taosArrayGetSize(pTrans->undoLogs) != 0) {
S
Shengliang Guan 已提交
631
    code = mndTransExecuteLogs(pMnode, pTrans->undoLogs);
S
Shengliang Guan 已提交
632 633 634 635 636
    if (code != 0) {
      mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
    } else {
      mTrace("trans:%d, execute undo logs finished", pTrans->id)
    }
S
Shengliang Guan 已提交
637 638 639 640 641 642
  }

  return code;
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
643 644
  int32_t code = 0;
  if (taosArrayGetSize(pTrans->commitLogs) != 0) {
S
Shengliang Guan 已提交
645
    code = mndTransExecuteLogs(pMnode, pTrans->commitLogs);
S
Shengliang Guan 已提交
646 647 648 649 650
    if (code != 0) {
      mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
    } else {
      mTrace("trans:%d, execute commit logs finished", pTrans->id)
    }
S
Shengliang Guan 已提交
651
  }
S
Shengliang Guan 已提交
652

S
Shengliang Guan 已提交
653 654
  return code;
}
S
Shengliang Guan 已提交
655

S
Shengliang Guan 已提交
656
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) {
S
Shengliang Guan 已提交
657
#if 0
S
Shengliang Guan 已提交
658 659
  int32_t arraySize = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < arraySize; ++i) {
S
Shengliang Guan 已提交
660 661 662 663 664 665 666
    STransAction *pAction = taosArrayGet(pArray, i);

    SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen};
    rpcMsg.pCont = rpcMallocCont(pAction->contLen);
    if (rpcMsg.pCont == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
S
Shengliang Guan 已提交
667
    }
S
Shengliang Guan 已提交
668 669
    memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
    mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
S
Shengliang Guan 已提交
670 671
  }

S
Shengliang Guan 已提交
672 673
  return TSDB_CODE_MND_ACTION_IN_PROGRESS;
#else
S
Shengliang Guan 已提交
674
  return 0;
S
Shengliang Guan 已提交
675
#endif
S
Shengliang Guan 已提交
676 677
}

S
Shengliang Guan 已提交
678
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
679
  if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0;
S
Shengliang Guan 已提交
680

S
Shengliang Guan 已提交
681 682
  mTrace("trans:%d, start to execute redo actions", pTrans->id);
  return mndTransExecuteActions(pMnode, pTrans->redoActions);
S
Shengliang Guan 已提交
683
}
S
Shengliang Guan 已提交
684

S
Shengliang Guan 已提交
685
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
686
  if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0;
S
Shengliang Guan 已提交
687

S
Shengliang Guan 已提交
688 689
  mTrace("trans:%d, start to execute undo actions", pTrans->id);
  return mndTransExecuteActions(pMnode, pTrans->undoActions);
S
Shengliang Guan 已提交
690
}
S
Shengliang Guan 已提交
691

S
Shengliang Guan 已提交
692 693
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
694

S
Shengliang Guan 已提交
695
  if (code == 0) {
S
Shengliang Guan 已提交
696
    pTrans->stage = TRN_STAGE_EXECUTE;
S
Shengliang Guan 已提交
697
    mTrace("trans:%d, stage from prepare to execute", pTrans->id);
S
Shengliang Guan 已提交
698 699
  } else {
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
700
    mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
701
  }
S
Shengliang Guan 已提交
702 703

  return 0;
S
Shengliang Guan 已提交
704 705
}

S
Shengliang Guan 已提交
706 707
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
708 709 710

  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
711
    mTrace("trans:%d, stage from execute to commit", pTrans->id);
S
Shengliang Guan 已提交
712
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
713 714
    mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code));
    return code;
S
Shengliang Guan 已提交
715
  } else {
S
Shengliang Guan 已提交
716
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
717
      pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
718 719 720 721
      mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
    } else {
      pTrans->stage = TRN_STAGE_RETRY;
      mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
722 723 724
    }
  }

S
Shengliang Guan 已提交
725
  return 0;
S
Shengliang Guan 已提交
726 727
}

S
Shengliang Guan 已提交
728 729 730 731
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);

  if (code == 0) {
S
Shengliang Guan 已提交
732
    pTrans->stage = TRN_STAGE_OVER;
S
Shengliang Guan 已提交
733
    mTrace("trans:%d, commit stage finished", pTrans->id);
S
Shengliang Guan 已提交
734
  } else {
S
Shengliang Guan 已提交
735 736 737 738 739 740 741
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_ROLLBACK;
      mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr());
    } else {
      pTrans->stage = TRN_STAGE_RETRY;
      mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr());
    }
S
Shengliang Guan 已提交
742
  }
S
Shengliang Guan 已提交
743 744

  return code;
S
Shengliang Guan 已提交
745 746
}

S
Shengliang Guan 已提交
747 748 749 750 751
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
    mTrace("trans:%d, rollbacked", pTrans->id);
S
Shengliang Guan 已提交
752 753
  } else {
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
754
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
755 756
  }

S
Shengliang Guan 已提交
757 758
  return code;
}
S
Shengliang Guan 已提交
759

S
Shengliang Guan 已提交
760 761
static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
762

S
Shengliang Guan 已提交
763 764 765 766 767 768
  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
    mTrace("trans:%d, stage from retry to commit", pTrans->id);
  } else {
    pTrans->stage = TRN_STAGE_RETRY;
    mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
769 770
  }

S
Shengliang Guan 已提交
771 772
  return code;
}
S
Shengliang Guan 已提交
773

S
Shengliang Guan 已提交
774 775
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
  int32_t code = 0;
S
Shengliang Guan 已提交
776

S
Shengliang Guan 已提交
777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
  while (code == 0) {
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
        code = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_EXECUTE:
        code = mndTransPerformExecuteStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT:
        code = mndTransCommit(pMnode, pTrans);
        if (code == 0) {
          code = mndTransPerformCommitStage(pMnode, pTrans);
        }
        break;
      case TRN_STAGE_ROLLBACK:
        code = mndTransPerformRollbackStage(pMnode, pTrans);
        if (code == 0) {
          code = mndTransRollback(pMnode, pTrans);
        }
        break;
      case TRN_STAGE_RETRY:
        code = mndTransPerformRetryStage(pMnode, pTrans);
        break;
S
Shengliang Guan 已提交
800 801 802
      default:
        mndTransSendRpcRsp(pTrans, 0);
        return;
S
Shengliang Guan 已提交
803 804 805
    }
  }

S
Shengliang Guan 已提交
806 807
  mndTransSendRpcRsp(pTrans, code);
}