mndTrans.c 21.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define TSDB_TRANS_VER 1
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
27
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
S
Shengliang Guan 已提交
28 29
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
static void    mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
static void    mndTransSendRpcRsp(STrans *pTrans, int32_t code);
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw);
static void    mndTransDropArray(SArray *pArray);
static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
  int32_t rawDataLen = 16 * sizeof(int32_t);
S
Shengliang Guan 已提交
62 63 64 65 66 67
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

68
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
69
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
70 71 72
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

73
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
74
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
75 76 77
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

78
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
79
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
80 81 82
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

S
Shengliang Guan 已提交
83
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen);
S
Shengliang Guan 已提交
84
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
85
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98
    return NULL;
  }

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, pTrans->id)
  SDB_SET_INT8(pRaw, dataPos, pTrans->stage)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
  SDB_SET_INT32(pRaw, dataPos, redoLogNum)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum)

99
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
100
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
101 102 103 104 105
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

106
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
107
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
108 109 110 111 112
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

113
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
114
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
115 116 117 118 119
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

S
Shengliang Guan 已提交
120 121
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE)
  SDB_SET_DATALEN(pRaw, dataPos);
S
Shengliang Guan 已提交
122
  mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
S
Shengliang Guan 已提交
123 124 125
  return pRaw;
}

S
Shengliang Guan 已提交
126 127 128
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
  int32_t code = 0;

S
Shengliang Guan 已提交
129 130 131 132 133 134
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
135
  if (sver != TSDB_TRANS_VER) {
S
Shengliang Guan 已提交
136 137 138 139 140
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
141 142
  SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
  STrans  *pTrans = sdbGetRowObj(pRow);
S
Shengliang Guan 已提交
143 144 145 146 147
  if (pTrans == NULL) {
    mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
148 149 150 151 152
  pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
S
Shengliang Guan 已提交
153 154 155

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
S
Shengliang Guan 已提交
156 157 158
    mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
  }

  int32_t redoLogNum = 0;
  int32_t undoLogNum = 0;
  int32_t commitLogNum = 0;
  int32_t redoActionNum = 0;
  int32_t undoActionNum = 0;

  int32_t dataPos = 0;
  SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
  SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->stage)
  SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
  SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)

177
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
178 179 180 181 182
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)

    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
S
Shengliang Guan 已提交
183
    void *ret = taosArrayPush(pTrans->redoLogs, &pData);
S
Shengliang Guan 已提交
184 185
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
186
      goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
187 188 189 190
      break;
    }
  }

S
Shengliang Guan 已提交
191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
  for (int32_t i = 0; i < undoLogNum; ++i) {
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)

    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
    void *ret = taosArrayPush(pTrans->undoLogs, &pData);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
      break;
    }
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)

    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
    void *ret = taosArrayPush(pTrans->commitLogs, &pData);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
      break;
    }
  }

S
Shengliang Guan 已提交
219 220
  SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TRN_RESERVE_SIZE)

S
Shengliang Guan 已提交
221
TRANS_DECODE_OVER:
S
Shengliang Guan 已提交
222
  if (code != 0) {
S
Shengliang Guan 已提交
223
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno));
S
Shengliang Guan 已提交
224
    mndTransDrop(pTrans);
S
Shengliang Guan 已提交
225
    terrno = code;
S
Shengliang Guan 已提交
226 227 228
    return NULL;
  }

S
Shengliang Guan 已提交
229
  mTrace("trans:%d, decode from raw:%p", pTrans->id, pRaw);
S
Shengliang Guan 已提交
230 231 232
  return pRow;
}

S
Shengliang Guan 已提交
233
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
234
  mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
S
Shengliang Guan 已提交
235 236 237
  return 0;
}

S
Shengliang Guan 已提交
238
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
239
  mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
S
Shengliang Guan 已提交
240

S
Shengliang Guan 已提交
241 242 243 244 245
  mndTransDropArray(pTrans->redoLogs);
  mndTransDropArray(pTrans->undoLogs);
  mndTransDropArray(pTrans->commitLogs);
  mndTransDropArray(pTrans->redoActions);
  mndTransDropArray(pTrans->undoActions);
S
Shengliang Guan 已提交
246 247 248 249

  return 0;
}

S
Shengliang Guan 已提交
250
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
S
Shengliang Guan 已提交
251
  mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage));
S
Shengliang Guan 已提交
252
  pOldTrans->stage = pNewTrans->stage;
S
Shengliang Guan 已提交
253 254 255
  return 0;
}

S
Shengliang Guan 已提交
256 257 258 259 260 261 262 263 264 265
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
  SSdb *pSdb = pMnode->pSdb;
  return sdbAcquire(pSdb, SDB_TRANS, &transId);
}

void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

S
Shengliang Guan 已提交
266 267 268 269
static int32_t trnGenerateTransId() {
  static int32_t tmp = 0;
  return ++tmp;
}
S
Shengliang Guan 已提交
270

S
Shengliang Guan 已提交
271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
char *mndTransStageStr(ETrnStage stage) {
  switch (stage) {
    case TRN_STAGE_PREPARE:
      return "prepare";
    case TRN_STAGE_EXECUTE:
      return "execute";
    case TRN_STAGE_COMMIT:
      return "commit";
    case TRN_STAGE_ROLLBACK:
      return "rollback";
    case TRN_STAGE_RETRY:
      return "retry";
    default:
      return "undefined";
  }
}

char *mndTransPolicyStr(ETrnPolicy policy) {
  switch (policy) {
    case TRN_POLICY_ROLLBACK:
      return "prepare";
    case TRN_POLICY_RETRY:
      return "retry";
    default:
      return "undefined";
  }
}

S
Shengliang Guan 已提交
299
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
S
Shengliang Guan 已提交
300 301 302 303 304 305 306 307 308 309 310
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

  pTrans->id = trnGenerateTransId();
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
  pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
311 312 313 314 315
  pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
S
Shengliang Guan 已提交
316 317 318 319 320 321 322 323

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
324
  mDebug("trans:%d, data:%p is created", pTrans->id, pTrans);
S
Shengliang Guan 已提交
325 326 327
  return pTrans;
}

S
Shengliang Guan 已提交
328
static void mndTransDropArray(SArray *pArray) {
329
  for (int32_t i = 0; i < pArray->size; ++i) {
S
Shengliang Guan 已提交
330
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
331 332 333 334 335 336
    tfree(pRaw);
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
337
void mndTransDrop(STrans *pTrans) {
S
Shengliang Guan 已提交
338 339 340 341 342
  mndTransDropArray(pTrans->redoLogs);
  mndTransDropArray(pTrans->undoLogs);
  mndTransDropArray(pTrans->commitLogs);
  mndTransDropArray(pTrans->redoActions);
  mndTransDropArray(pTrans->undoActions);
S
Shengliang Guan 已提交
343

S
Shengliang Guan 已提交
344
  mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
S
Shengliang Guan 已提交
345 346 347
  tfree(pTrans);
}

S
Shengliang Guan 已提交
348
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
S
Shengliang Guan 已提交
349
  pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
350
  mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
S
Shengliang Guan 已提交
351 352
}

S
Shengliang Guan 已提交
353
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
354 355 356 357 358
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
359
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
360 361 362 363 364 365 366 367
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
368 369
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
  int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw);
S
Shengliang Guan 已提交
370
  mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
371 372 373
  return code;
}

S
Shengliang Guan 已提交
374 375
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
  int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw);
S
Shengliang Guan 已提交
376
  mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
377 378 379
  return code;
}

S
Shengliang Guan 已提交
380 381
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
  int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw);
S
Shengliang Guan 已提交
382
  mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
383 384 385
  return code;
}

S
Shengliang Guan 已提交
386 387
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
  int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg);
S
Shengliang Guan 已提交
388
  mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg);
S
Shengliang Guan 已提交
389 390 391
  return code;
}

S
Shengliang Guan 已提交
392 393
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
  int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg);
S
Shengliang Guan 已提交
394
  mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg);
S
Shengliang Guan 已提交
395 396 397
  return code;
}

S
Shengliang Guan 已提交
398
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
399
  mDebug("trans:%d, prepare transaction", pTrans->id);
S
Shengliang Guan 已提交
400

S
Shengliang Guan 已提交
401
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
402
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
403
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
404 405
    return -1;
  }
S
Shengliang Guan 已提交
406
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
407

S
Shengliang Guan 已提交
408 409 410 411 412
  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
413 414 415
    return -1;
  }

S
Shengliang Guan 已提交
416
  mTrace("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
417

S
Shengliang Guan 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
  if (pNewTrans == NULL) {
    mError("trans:%d, failed to ready from sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  mDebug("trans:%d, prepare finished", pNewTrans->id);
  mndTransExecute(pMnode, pNewTrans);
  mndReleaseTrans(pMnode, pNewTrans);
  return 0;
}

int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, commit transaction", pTrans->id);

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
S
Shengliang Guan 已提交
449 450
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
451 452 453
    return -1;
  }

S
Shengliang Guan 已提交
454 455 456 457 458 459 460 461
  mTrace("trans:%d, sync finished", pTrans->id);
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
462 463 464
  return 0;
}

S
Shengliang Guan 已提交
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, rollback transaction", pTrans->id);

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
    return -1;
  }

  mTrace("trans:%d, sync finished", pTrans->id);
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
S
Shengliang Guan 已提交
488 489
  }

S
Shengliang Guan 已提交
490 491
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
492
}
S
Shengliang Guan 已提交
493

S
Shengliang Guan 已提交
494 495 496 497 498 499 500
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) {
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
  mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF);

  if (pTrans->rpcHandle != NULL) {
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code};
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
501 502 503
  }
}

S
Shengliang Guan 已提交
504 505 506 507 508 509 510 511 512
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) {
  // todo
}

static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

  for (int32_t i = 0; i < arraySize; ++i) {
513
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
514 515 516
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
517 518 519 520 521 522
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteArray(pMnode, pTrans->redoLogs);
  if (code != 0) {
    mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
  } else {
    mTrace("trans:%d, execute redo logs finished", pTrans->id)
  }

  return code;
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteArray(pMnode, pTrans->undoLogs);
  if (code != 0) {
    mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
  } else {
    mTrace("trans:%d, execute undo logs finished", pTrans->id)
  }

  return code;
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteArray(pMnode, pTrans->commitLogs);
  if (code != 0) {
    mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
  } else {
    mTrace("trans:%d, execute commit logs finished", pTrans->id)
  }
S
Shengliang Guan 已提交
552

S
Shengliang Guan 已提交
553 554
  return code;
}
S
Shengliang Guan 已提交
555

S
Shengliang Guan 已提交
556 557 558 559
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
  mTrace("trans:%d, execute redo actions finished", pTrans->id);
  return 0;
}
S
Shengliang Guan 已提交
560

S
Shengliang Guan 已提交
561 562 563 564
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
  mTrace("trans:%d, execute undo actions finished", pTrans->id);
  return 0;
}
S
Shengliang Guan 已提交
565

S
Shengliang Guan 已提交
566 567
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
568

S
Shengliang Guan 已提交
569
  if (code == 0) {
S
Shengliang Guan 已提交
570
    pTrans->stage = TRN_STAGE_EXECUTE;
S
Shengliang Guan 已提交
571
    mTrace("trans:%d, stage from prepare to execute", pTrans->id);
S
Shengliang Guan 已提交
572 573
  } else {
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
574
    mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
575
  }
S
Shengliang Guan 已提交
576 577

  return 0;
S
Shengliang Guan 已提交
578 579
}

S
Shengliang Guan 已提交
580 581
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
582 583 584

  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
585
    mTrace("trans:%d, stage from execute to commit", pTrans->id);
S
Shengliang Guan 已提交
586
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
587 588
    mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code));
    return code;
S
Shengliang Guan 已提交
589
  } else {
S
Shengliang Guan 已提交
590
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
591
      pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
592 593 594 595
      mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
    } else {
      pTrans->stage = TRN_STAGE_RETRY;
      mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
596 597 598
    }
  }

S
Shengliang Guan 已提交
599
  return 0;
S
Shengliang Guan 已提交
600 601
}

S
Shengliang Guan 已提交
602 603 604 605 606 607
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);

  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
    mTrace("trans:%d, commit stage finished", pTrans->id);
S
Shengliang Guan 已提交
608
  } else {
S
Shengliang Guan 已提交
609 610 611 612 613 614 615
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_ROLLBACK;
      mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr());
    } else {
      pTrans->stage = TRN_STAGE_RETRY;
      mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr());
    }
S
Shengliang Guan 已提交
616
  }
S
Shengliang Guan 已提交
617 618

  return code;
S
Shengliang Guan 已提交
619 620
}

S
Shengliang Guan 已提交
621 622 623 624 625
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
    mTrace("trans:%d, rollbacked", pTrans->id);
S
Shengliang Guan 已提交
626 627
  } else {
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
628
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
629 630
  }

S
Shengliang Guan 已提交
631 632
  return code;
}
S
Shengliang Guan 已提交
633

S
Shengliang Guan 已提交
634 635
static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
636

S
Shengliang Guan 已提交
637 638 639 640 641 642
  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
    mTrace("trans:%d, stage from retry to commit", pTrans->id);
  } else {
    pTrans->stage = TRN_STAGE_RETRY;
    mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
643 644
  }

S
Shengliang Guan 已提交
645 646
  return code;
}
S
Shengliang Guan 已提交
647

S
Shengliang Guan 已提交
648 649
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
  int32_t code = 0;
S
Shengliang Guan 已提交
650

S
Shengliang Guan 已提交
651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673
  while (code == 0) {
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
        code = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_EXECUTE:
        code = mndTransPerformExecuteStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT:
        code = mndTransCommit(pMnode, pTrans);
        if (code == 0) {
          code = mndTransPerformCommitStage(pMnode, pTrans);
        }
        break;
      case TRN_STAGE_ROLLBACK:
        code = mndTransPerformRollbackStage(pMnode, pTrans);
        if (code == 0) {
          code = mndTransRollback(pMnode, pTrans);
        }
        break;
      case TRN_STAGE_RETRY:
        code = mndTransPerformRetryStage(pMnode, pTrans);
        break;
S
Shengliang Guan 已提交
674 675 676
    }
  }

S
Shengliang Guan 已提交
677 678
  mndTransSendRpcRsp(pTrans, code);
}