mndTrans.c 21.8 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
S
Shengliang Guan 已提交
17
#include "mndTrans.h"
S
Shengliang Guan 已提交
18
#include "mndSync.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20 21 22
#define TSDB_TRANS_VER 1
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
S
Shengliang Guan 已提交
23

S
Shengliang Guan 已提交
24 25 26
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static int32_t  mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
S
Shengliang Guan 已提交
27
static int32_t  mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans);
S
Shengliang Guan 已提交
28 29
static int32_t  mndTransActionDelete(SSdb *pSdb, STrans *pTrans);

S
Shengliang Guan 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
static void    mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
static void    mndTransSendRpcRsp(STrans *pTrans, int32_t code);
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw);
static void    mndTransDropArray(SArray *pArray);
static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static void    mndTransExecute(SMnode *pMnode, STrans *pTrans);

S
Shengliang Guan 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
int32_t mndInitTrans(SMnode *pMnode) {
  SSdbTable table = {.sdbType = SDB_TRANS,
                     .keyType = SDB_KEY_INT32,
                     .encodeFp = (SdbEncodeFp)mndTransActionEncode,
                     .decodeFp = (SdbDecodeFp)mndTransActionDecode,
                     .insertFp = (SdbInsertFp)mndTransActionInsert,
                     .updateFp = (SdbUpdateFp)mndTransActionUpdate,
                     .deleteFp = (SdbDeleteFp)mndTransActionDelete};

  return sdbSetTable(pMnode->pSdb, table);
}

void mndCleanupTrans(SMnode *pMnode) {}

static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
S
Shengliang Guan 已提交
61
  int32_t rawDataLen = 16 * sizeof(int32_t) + TSDB_TRN_RESERVE_SIZE;
S
Shengliang Guan 已提交
62 63 64 65 66 67
  int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
  int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
  int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
  int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
  int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);

68
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
69
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
70 71 72
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

73
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
74
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
75 76 77
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

78
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
79
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
80 81 82
    rawDataLen += sdbGetRawTotalSize(pTmp);
  }

S
Shengliang Guan 已提交
83
  SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen);
S
Shengliang Guan 已提交
84
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
85
    mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
86 87 88 89 90 91 92 93 94 95 96 97
    return NULL;
  }

  int32_t dataPos = 0;
  SDB_SET_INT32(pRaw, dataPos, pTrans->id)
  SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
  SDB_SET_INT32(pRaw, dataPos, redoLogNum)
  SDB_SET_INT32(pRaw, dataPos, undoLogNum)
  SDB_SET_INT32(pRaw, dataPos, commitLogNum)
  SDB_SET_INT32(pRaw, dataPos, redoActionNum)
  SDB_SET_INT32(pRaw, dataPos, undoActionNum)

98
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
99
    SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
S
Shengliang Guan 已提交
100 101 102 103 104
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

105
  for (int32_t i = 0; i < undoLogNum; ++i) {
S
Shengliang Guan 已提交
106
    SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
S
Shengliang Guan 已提交
107 108 109 110 111
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

112
  for (int32_t i = 0; i < commitLogNum; ++i) {
S
Shengliang Guan 已提交
113
    SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
S
Shengliang Guan 已提交
114 115 116 117 118
    int32_t  len = sdbGetRawTotalSize(pTmp);
    SDB_SET_INT32(pRaw, dataPos, len)
    SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
  }

S
Shengliang Guan 已提交
119 120
  SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE)
  SDB_SET_DATALEN(pRaw, dataPos);
S
Shengliang Guan 已提交
121
  mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos);
S
Shengliang Guan 已提交
122 123 124
  return pRaw;
}

S
Shengliang Guan 已提交
125 126 127
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
  int32_t code = 0;

S
Shengliang Guan 已提交
128 129 130 131 132 133
  int8_t sver = 0;
  if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
    mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
134
  if (sver != TSDB_TRANS_VER) {
S
Shengliang Guan 已提交
135 136 137 138 139
    terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
    mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
140 141
  SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
  STrans  *pTrans = sdbGetRowObj(pRow);
S
Shengliang Guan 已提交
142 143 144 145 146
  if (pTrans == NULL) {
    mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
147 148 149 150 151
  pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
S
Shengliang Guan 已提交
152 153 154

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
S
Shengliang Guan 已提交
155 156 157
    mDebug("trans:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
  }

  int32_t redoLogNum = 0;
  int32_t undoLogNum = 0;
  int32_t commitLogNum = 0;
  int32_t redoActionNum = 0;
  int32_t undoActionNum = 0;

  int32_t dataPos = 0;
  SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
  SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
  SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
  SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)

175
  for (int32_t i = 0; i < redoLogNum; ++i) {
S
Shengliang Guan 已提交
176 177 178 179 180
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)

    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
S
Shengliang Guan 已提交
181
    void *ret = taosArrayPush(pTrans->redoLogs, &pData);
S
Shengliang Guan 已提交
182 183
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
S
Shengliang Guan 已提交
184
      goto TRANS_DECODE_OVER;
S
Shengliang Guan 已提交
185 186 187 188
      break;
    }
  }

S
Shengliang Guan 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
  for (int32_t i = 0; i < undoLogNum; ++i) {
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)

    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
    void *ret = taosArrayPush(pTrans->undoLogs, &pData);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
      break;
    }
  }

  for (int32_t i = 0; i < commitLogNum; ++i) {
    int32_t dataLen = 0;
    SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)

    char *pData = malloc(dataLen);
    SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
    void *ret = taosArrayPush(pTrans->commitLogs, &pData);
    if (ret == NULL) {
      code = TSDB_CODE_OUT_OF_MEMORY;
      goto TRANS_DECODE_OVER;
      break;
    }
  }

S
Shengliang Guan 已提交
217 218
  SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TRN_RESERVE_SIZE)

S
Shengliang Guan 已提交
219
TRANS_DECODE_OVER:
S
Shengliang Guan 已提交
220
  if (code != 0) {
S
Shengliang Guan 已提交
221
    mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno));
S
Shengliang Guan 已提交
222
    mndTransDrop(pTrans);
S
Shengliang Guan 已提交
223
    terrno = code;
S
Shengliang Guan 已提交
224 225 226
    return NULL;
  }

S
Shengliang Guan 已提交
227
  mTrace("trans:%d, decode from raw:%p", pTrans->id, pRaw);
S
Shengliang Guan 已提交
228 229 230
  return pRow;
}

S
Shengliang Guan 已提交
231
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
232
  pTrans->stage = TRN_STAGE_PREPARE;
S
Shengliang Guan 已提交
233
  mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
S
Shengliang Guan 已提交
234 235 236
  return 0;
}

S
Shengliang Guan 已提交
237
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
S
Shengliang Guan 已提交
238
  mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
S
Shengliang Guan 已提交
239

S
Shengliang Guan 已提交
240 241 242 243 244
  mndTransDropArray(pTrans->redoLogs);
  mndTransDropArray(pTrans->undoLogs);
  mndTransDropArray(pTrans->commitLogs);
  mndTransDropArray(pTrans->redoActions);
  mndTransDropArray(pTrans->undoActions);
S
Shengliang Guan 已提交
245 246 247 248

  return 0;
}

S
Shengliang Guan 已提交
249
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) {
S
Shengliang Guan 已提交
250
  mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage));
S
Shengliang Guan 已提交
251
  pOldTrans->stage = pNewTrans->stage;
S
Shengliang Guan 已提交
252 253 254
  return 0;
}

S
Shengliang Guan 已提交
255 256 257 258 259 260 261 262 263 264
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
  SSdb *pSdb = pMnode->pSdb;
  return sdbAcquire(pSdb, SDB_TRANS, &transId);
}

void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
  SSdb *pSdb = pMnode->pSdb;
  sdbRelease(pSdb, pTrans);
}

S
Shengliang Guan 已提交
265
static int32_t mndGenerateTransId() {
S
Shengliang Guan 已提交
266 267 268
  static int32_t tmp = 0;
  return ++tmp;
}
S
Shengliang Guan 已提交
269

S
Shengliang Guan 已提交
270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
char *mndTransStageStr(ETrnStage stage) {
  switch (stage) {
    case TRN_STAGE_PREPARE:
      return "prepare";
    case TRN_STAGE_EXECUTE:
      return "execute";
    case TRN_STAGE_COMMIT:
      return "commit";
    case TRN_STAGE_ROLLBACK:
      return "rollback";
    case TRN_STAGE_RETRY:
      return "retry";
    default:
      return "undefined";
  }
}

char *mndTransPolicyStr(ETrnPolicy policy) {
  switch (policy) {
    case TRN_POLICY_ROLLBACK:
      return "prepare";
    case TRN_POLICY_RETRY:
      return "retry";
    default:
      return "undefined";
  }
}

S
Shengliang Guan 已提交
298
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
S
Shengliang Guan 已提交
299 300 301 302 303 304 305
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
306
  pTrans->id = mndGenerateTransId();
S
Shengliang Guan 已提交
307 308 309
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
  pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
310 311 312 313 314
  pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
  pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *));
S
Shengliang Guan 已提交
315 316 317 318 319 320 321 322

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    mError("failed to create transaction since %s", terrstr());
    return NULL;
  }

S
Shengliang Guan 已提交
323
  mDebug("trans:%d, data:%p is created", pTrans->id, pTrans);
S
Shengliang Guan 已提交
324 325 326
  return pTrans;
}

S
Shengliang Guan 已提交
327
static void mndTransDropArray(SArray *pArray) {
328
  for (int32_t i = 0; i < pArray->size; ++i) {
S
Shengliang Guan 已提交
329
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
330 331 332 333 334 335
    tfree(pRaw);
  }

  taosArrayDestroy(pArray);
}

S
Shengliang Guan 已提交
336
void mndTransDrop(STrans *pTrans) {
S
Shengliang Guan 已提交
337 338 339 340 341
  mndTransDropArray(pTrans->redoLogs);
  mndTransDropArray(pTrans->undoLogs);
  mndTransDropArray(pTrans->commitLogs);
  mndTransDropArray(pTrans->redoActions);
  mndTransDropArray(pTrans->undoActions);
S
Shengliang Guan 已提交
342

S
Shengliang Guan 已提交
343
  mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
S
Shengliang Guan 已提交
344 345 346
  tfree(pTrans);
}

S
Shengliang Guan 已提交
347
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
S
Shengliang Guan 已提交
348
  pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
349
  mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle);
S
Shengliang Guan 已提交
350 351
}

S
Shengliang Guan 已提交
352
static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
S
Shengliang Guan 已提交
353 354 355 356 357
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

S
Shengliang Guan 已提交
358
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
359 360 361 362 363 364 365 366
  if (ptr == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }

  return 0;
}

S
Shengliang Guan 已提交
367 368
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
  int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw);
S
Shengliang Guan 已提交
369
  mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
370 371 372
  return code;
}

S
Shengliang Guan 已提交
373 374
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
  int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw);
S
Shengliang Guan 已提交
375
  mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
376 377 378
  return code;
}

S
Shengliang Guan 已提交
379 380
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
  int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw);
S
Shengliang Guan 已提交
381
  mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code);
S
Shengliang Guan 已提交
382 383 384
  return code;
}

S
Shengliang Guan 已提交
385 386
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
  int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg);
S
Shengliang Guan 已提交
387
  mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg);
S
Shengliang Guan 已提交
388 389 390
  return code;
}

S
Shengliang Guan 已提交
391 392
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
  int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg);
S
Shengliang Guan 已提交
393
  mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg);
S
Shengliang Guan 已提交
394 395 396
  return code;
}

S
Shengliang Guan 已提交
397
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
398
  mDebug("trans:%d, prepare transaction", pTrans->id);
S
Shengliang Guan 已提交
399

S
Shengliang Guan 已提交
400
  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
S
Shengliang Guan 已提交
401
  if (pRaw == NULL) {
S
Shengliang Guan 已提交
402
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
403 404
    return -1;
  }
S
Shengliang Guan 已提交
405
  sdbSetRawStatus(pRaw, SDB_STATUS_READY);
S
Shengliang Guan 已提交
406

S
Shengliang Guan 已提交
407 408 409 410 411
  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
S
Shengliang Guan 已提交
412 413 414
    return -1;
  }

S
Shengliang Guan 已提交
415
  mTrace("trans:%d, sync finished", pTrans->id);
S
Shengliang Guan 已提交
416

S
Shengliang Guan 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id);
  if (pNewTrans == NULL) {
    mError("trans:%d, failed to ready from sdb since %s", pTrans->id, terrstr());
    return -1;
  }

  mDebug("trans:%d, prepare finished", pNewTrans->id);
S
Shengliang Guan 已提交
430
  pNewTrans->rpcHandle = pTrans->rpcHandle;
S
Shengliang Guan 已提交
431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
  mndTransExecute(pMnode, pNewTrans);
  mndReleaseTrans(pMnode, pNewTrans);
  return 0;
}

int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, commit transaction", pTrans->id);

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

S
Shengliang Guan 已提交
446 447 448 449 450 451 452 453
  if (taosArrayGetSize(pTrans->commitLogs) != 0) {
    mTrace("trans:%d, start sync", pTrans->id);
    int32_t code = mndSyncPropose(pMnode, pRaw);
    if (code != 0) {
      mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
      sdbFreeRaw(pRaw);
      return -1;
    }
S
Shengliang Guan 已提交
454

S
Shengliang Guan 已提交
455 456 457 458 459 460
    mTrace("trans:%d, sync finished", pTrans->id);
    code = sdbWrite(pMnode->pSdb, pRaw);
    if (code != 0) {
      mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
      return -1;
    }
S
Shengliang Guan 已提交
461 462 463
  }

  mDebug("trans:%d, commit finished", pTrans->id);
S
Shengliang Guan 已提交
464 465 466
  return 0;
}

S
Shengliang Guan 已提交
467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489
int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
  mDebug("trans:%d, rollback transaction", pTrans->id);

  SSdbRaw *pRaw = mndTransActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
  }
  sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);

  mTrace("trans:%d, start sync", pTrans->id);
  int32_t code = mndSyncPropose(pMnode, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
    sdbFreeRaw(pRaw);
    return -1;
  }

  mTrace("trans:%d, sync finished", pTrans->id);
  code = sdbWrite(pMnode->pSdb, pRaw);
  if (code != 0) {
    mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
    return -1;
S
Shengliang Guan 已提交
490 491
  }

S
Shengliang Guan 已提交
492 493
  mDebug("trans:%d, rollback finished", pTrans->id);
  return 0;
S
Shengliang Guan 已提交
494
}
S
Shengliang Guan 已提交
495

S
Shengliang Guan 已提交
496 497 498 499 500 501 502
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) {
  if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return;
  mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF);

  if (pTrans->rpcHandle != NULL) {
    SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code};
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
503 504 505
  }
}

S
Shengliang Guan 已提交
506 507 508 509 510 511 512 513 514
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) {
  // todo
}

static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
  SSdb   *pSdb = pMnode->pSdb;
  int32_t arraySize = taosArrayGetSize(pArray);

  for (int32_t i = 0; i < arraySize; ++i) {
515
    SSdbRaw *pRaw = taosArrayGetP(pArray, i);
S
Shengliang Guan 已提交
516 517 518
    int32_t  code = sdbWriteNotFree(pSdb, pRaw);
    if (code != 0) {
      return code;
S
Shengliang Guan 已提交
519 520 521 522 523 524
    }
  }

  return 0;
}

S
Shengliang Guan 已提交
525
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
526 527 528 529 530 531 532 533
  int32_t code = 0;
  if (taosArrayGetSize(pTrans->redoLogs) != 0) {
    code = mndTransExecuteArray(pMnode, pTrans->redoLogs);
    if (code != 0) {
      mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
    } else {
      mTrace("trans:%d, execute redo logs finished", pTrans->id)
    }
S
Shengliang Guan 已提交
534 535 536 537 538 539
  }

  return code;
}

static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
540 541 542 543 544 545 546 547
  int32_t code = 0;
  if (taosArrayGetSize(pTrans->undoLogs) != 0) {
    code = mndTransExecuteArray(pMnode, pTrans->undoLogs);
    if (code != 0) {
      mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
    } else {
      mTrace("trans:%d, execute undo logs finished", pTrans->id)
    }
S
Shengliang Guan 已提交
548 549 550 551 552 553
  }

  return code;
}

static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
554 555 556 557 558 559 560 561
  int32_t code = 0;
  if (taosArrayGetSize(pTrans->commitLogs) != 0) {
    code = mndTransExecuteArray(pMnode, pTrans->commitLogs);
    if (code != 0) {
      mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
    } else {
      mTrace("trans:%d, execute commit logs finished", pTrans->id)
    }
S
Shengliang Guan 已提交
562
  }
S
Shengliang Guan 已提交
563

S
Shengliang Guan 已提交
564 565
  return code;
}
S
Shengliang Guan 已提交
566

S
Shengliang Guan 已提交
567
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
568 569 570
  if (taosArrayGetSize(pTrans->redoActions) != 0) {
    mTrace("trans:%d, execute redo actions finished", pTrans->id);
  }
S
Shengliang Guan 已提交
571 572
  return 0;
}
S
Shengliang Guan 已提交
573

S
Shengliang Guan 已提交
574
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
S
Shengliang Guan 已提交
575 576 577
  if (taosArrayGetSize(pTrans->undoActions) != 0) {
    mTrace("trans:%d, execute undo actions finished", pTrans->id);
  }
S
Shengliang Guan 已提交
578 579
  return 0;
}
S
Shengliang Guan 已提交
580

S
Shengliang Guan 已提交
581 582
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
S
Shengliang Guan 已提交
583

S
Shengliang Guan 已提交
584
  if (code == 0) {
S
Shengliang Guan 已提交
585
    pTrans->stage = TRN_STAGE_EXECUTE;
S
Shengliang Guan 已提交
586
    mTrace("trans:%d, stage from prepare to execute", pTrans->id);
S
Shengliang Guan 已提交
587 588
  } else {
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
589
    mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
590
  }
S
Shengliang Guan 已提交
591 592

  return 0;
S
Shengliang Guan 已提交
593 594
}

S
Shengliang Guan 已提交
595 596
static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
597 598 599

  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
S
Shengliang Guan 已提交
600
    mTrace("trans:%d, stage from execute to commit", pTrans->id);
S
Shengliang Guan 已提交
601
  } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
S
Shengliang Guan 已提交
602 603
    mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code));
    return code;
S
Shengliang Guan 已提交
604
  } else {
S
Shengliang Guan 已提交
605
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
S
Shengliang Guan 已提交
606
      pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
607 608 609 610
      mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr());
    } else {
      pTrans->stage = TRN_STAGE_RETRY;
      mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
611 612 613
    }
  }

S
Shengliang Guan 已提交
614
  return 0;
S
Shengliang Guan 已提交
615 616
}

S
Shengliang Guan 已提交
617 618 619 620
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);

  if (code == 0) {
S
Shengliang Guan 已提交
621
    pTrans->stage = TRN_STAGE_OVER;
S
Shengliang Guan 已提交
622
    mTrace("trans:%d, commit stage finished", pTrans->id);
S
Shengliang Guan 已提交
623
  } else {
S
Shengliang Guan 已提交
624 625 626 627 628 629 630
    if (pTrans->policy == TRN_POLICY_ROLLBACK) {
      pTrans->stage = TRN_STAGE_ROLLBACK;
      mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr());
    } else {
      pTrans->stage = TRN_STAGE_RETRY;
      mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr());
    }
S
Shengliang Guan 已提交
631
  }
S
Shengliang Guan 已提交
632 633

  return code;
S
Shengliang Guan 已提交
634 635
}

S
Shengliang Guan 已提交
636 637 638 639 640
static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);

  if (code == 0) {
    mTrace("trans:%d, rollbacked", pTrans->id);
S
Shengliang Guan 已提交
641 642
  } else {
    pTrans->stage = TRN_STAGE_ROLLBACK;
S
Shengliang Guan 已提交
643
    mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
644 645
  }

S
Shengliang Guan 已提交
646 647
  return code;
}
S
Shengliang Guan 已提交
648

S
Shengliang Guan 已提交
649 650
static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) {
  int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
S
Shengliang Guan 已提交
651

S
Shengliang Guan 已提交
652 653 654 655 656 657
  if (code == 0) {
    pTrans->stage = TRN_STAGE_COMMIT;
    mTrace("trans:%d, stage from retry to commit", pTrans->id);
  } else {
    pTrans->stage = TRN_STAGE_RETRY;
    mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr());
S
Shengliang Guan 已提交
658 659
  }

S
Shengliang Guan 已提交
660 661
  return code;
}
S
Shengliang Guan 已提交
662

S
Shengliang Guan 已提交
663 664
static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
  int32_t code = 0;
S
Shengliang Guan 已提交
665

S
Shengliang Guan 已提交
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
  while (code == 0) {
    switch (pTrans->stage) {
      case TRN_STAGE_PREPARE:
        code = mndTransPerformPrepareStage(pMnode, pTrans);
        break;
      case TRN_STAGE_EXECUTE:
        code = mndTransPerformExecuteStage(pMnode, pTrans);
        break;
      case TRN_STAGE_COMMIT:
        code = mndTransCommit(pMnode, pTrans);
        if (code == 0) {
          code = mndTransPerformCommitStage(pMnode, pTrans);
        }
        break;
      case TRN_STAGE_ROLLBACK:
        code = mndTransPerformRollbackStage(pMnode, pTrans);
        if (code == 0) {
          code = mndTransRollback(pMnode, pTrans);
        }
        break;
      case TRN_STAGE_RETRY:
        code = mndTransPerformRetryStage(pMnode, pTrans);
        break;
S
Shengliang Guan 已提交
689 690 691
      default:
        mndTransSendRpcRsp(pTrans, 0);
        return;
S
Shengliang Guan 已提交
692 693 694
    }
  }

S
Shengliang Guan 已提交
695 696
  mndTransSendRpcRsp(pTrans, code);
}