trnMain.c 5.3 KB
Newer Older
S
Shengliang Guan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#define _DEFAULT_SOURCE
#include "trnInt.h"
S
Shengliang Guan 已提交
18
#include "trpc.h"
S
Shengliang Guan 已提交
19

S
Shengliang Guan 已提交
20
STrans *trnCreate(ETrnPolicy policy) {
S
Shengliang Guan 已提交
21 22 23 24 25 26
  STrans *pTrans = calloc(1, sizeof(STrans));
  if (pTrans == NULL) {
    terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
    return NULL;
  }

S
Shengliang Guan 已提交
27 28 29
  pTrans->id = trnGenerateTransId();
  pTrans->stage = TRN_STAGE_PREPARE;
  pTrans->policy = policy;
S
Shengliang Guan 已提交
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
  pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
  pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
  pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
  pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
  pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));

  if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
      pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
    terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
    return NULL;
  }

  return pTrans;
}

S
Shengliang Guan 已提交
45
static void trnDropArray(SArray *pArray) {
S
Shengliang Guan 已提交
46
  for (int32_t index = 0; index < pArray->size; ++index) {
S
Shengliang Guan 已提交
47 48
    SSdbRaw *pRaw = taosArrayGetP(pArray, index);
    tfree(pRaw);
S
Shengliang Guan 已提交
49 50 51 52 53 54
  }

  taosArrayDestroy(pArray);
}

void trnDrop(STrans *pTrans) {
S
Shengliang Guan 已提交
55 56 57 58 59 60
  trnDropArray(pTrans->redoLogs);
  trnDropArray(pTrans->undoLogs);
  trnDropArray(pTrans->commitLogs);
  trnDropArray(pTrans->redoActions);
  trnDropArray(pTrans->undoActions);
  tfree(pTrans);
S
Shengliang Guan 已提交
61
}
S
Shengliang Guan 已提交
62

S
Shengliang Guan 已提交
63 64 65
void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) {
  if (pTrans != NULL) {
    pTrans->rpcHandle = rpcHandle;
S
Shengliang Guan 已提交
66 67 68
  }
}

S
Shengliang Guan 已提交
69 70 71 72
static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
  if (pArray == NULL || pRaw == NULL) {
    terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
73 74
  }

S
Shengliang Guan 已提交
75
  void *ptr = taosArrayPush(pArray, &pRaw);
S
Shengliang Guan 已提交
76
  if (ptr == NULL) {
S
Shengliang Guan 已提交
77 78
    terrno = TSDB_CODE_MND_OUT_OF_MEMORY;
    return -1;
S
Shengliang Guan 已提交
79
  }
S
Shengliang Guan 已提交
80

S
Shengliang Guan 已提交
81 82 83
  return 0;
}

S
Shengliang Guan 已提交
84 85 86 87 88 89
int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->redoLogs, pRaw); }

int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->undoLogs, pRaw); }

int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->commitLogs, pRaw); }

S
Shengliang Guan 已提交
90
int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
S
Shengliang Guan 已提交
91
  return trnAppendArray(pTrans->redoActions, pMsg);
S
Shengliang Guan 已提交
92 93 94
}

int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
S
Shengliang Guan 已提交
95
  return trnAppendArray(pTrans->undoActions, pMsg);
S
Shengliang Guan 已提交
96
}
S
Shengliang Guan 已提交
97

S
Shengliang Guan 已提交
98 99
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
  if (syncfp == NULL) return -1;
S
Shengliang Guan 已提交
100

S
Shengliang Guan 已提交
101 102 103 104
  SSdbRaw *pRaw = trnActionEncode(pTrans);
  if (pRaw == NULL) {
    mError("tranId:%d, failed to decode trans since %s", pTrans->id, terrstr());
    return -1;
S
Shengliang Guan 已提交
105 106
  }

S
Shengliang Guan 已提交
107 108 109
  if (sdbWrite(pRaw) != 0) {
    mError("tranId:%d, failed to write trans since %s", pTrans->id, terrstr());
    return -1;
S
Shengliang Guan 已提交
110 111
  }

S
Shengliang Guan 已提交
112 113 114
  if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) {
    mError("tranId:%d, failed to sync trans since %s", pTrans->id, terrstr());
    return -1;
S
Shengliang Guan 已提交
115 116
  }

S
Shengliang Guan 已提交
117
  return 0;
S
Shengliang Guan 已提交
118 119
}

S
Shengliang Guan 已提交
120 121 122 123
static void trnSendRpcRsp(void *rpcHandle, int32_t code) {
  if (rpcHandle != NULL) {
    SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno};
    rpcSendResponse(&rspMsg);
S
Shengliang Guan 已提交
124
  }
S
Shengliang Guan 已提交
125
}
S
Shengliang Guan 已提交
126

S
Shengliang Guan 已提交
127 128 129 130
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
  if (code != 0) {
    trnSendRpcRsp(pData, terrno);
    return 0;
S
Shengliang Guan 已提交
131 132
  }

S
Shengliang Guan 已提交
133 134 135
  if (sdbWrite(pRaw) != 0) {
    code = terrno;
    trnSendRpcRsp(pData, code);
S
Shengliang Guan 已提交
136
    terrno = code;
S
Shengliang Guan 已提交
137
    return -1;
S
Shengliang Guan 已提交
138 139
  }

S
Shengliang Guan 已提交
140
  return 0;
S
Shengliang Guan 已提交
141 142
}

S
Shengliang Guan 已提交
143 144 145 146 147 148
int32_t trnExecuteRedoLogs(STrans *pTrans) {return 0;}
int32_t trnExecuteUndoLogs(STrans *pTrans) {return 0;}
int32_t trnExecuteCommitLogs(STrans *pTrans) {return 0;}
int32_t trnExecuteRedoActions(STrans *pTrans) {return 0;}
int32_t trnExecuteUndoActions(STrans *pTrans) {return 0;}
static int32_t trnPerfomRollbackStage(STrans *pTrans) { return 0; }
S
Shengliang Guan 已提交
149

S
Shengliang Guan 已提交
150 151 152 153 154 155 156
int32_t trnExecute(int32_t tranId) {
  int32_t code = 0;

  STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
  if (pTrans == NULL) {
    code = terrno;
    return code;
S
Shengliang Guan 已提交
157 158
  }

S
Shengliang Guan 已提交
159 160 161 162 163 164 165 166
  if (pTrans->stage == TRN_STAGE_PREPARE) {
    code = trnExecuteRedoLogs(pTrans);
    if (code == 0) {
      pTrans->stage = TRN_STAGE_EXECUTE;
    } else {
      pTrans->stage = TRN_STAGE_ROLLBACK;
    }
  }
S
Shengliang Guan 已提交
167

S
Shengliang Guan 已提交
168 169 170 171 172 173 174 175 176 177 178 179 180 181
  if (pTrans->stage == TRN_STAGE_EXECUTE) {
    code = trnExecuteRedoActions(pTrans);
    if (code == 0) {
      pTrans->stage = TRN_STAGE_COMMIT;
    } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
      // do nothing
    } else {
      if (pTrans->policy == TRN_POLICY_RETRY) {
        pTrans->stage = TRN_STAGE_RETRY;
      } else {
        pTrans->stage = TRN_STAGE_ROLLBACK;
      }
    }
  }
S
Shengliang Guan 已提交
182

S
Shengliang Guan 已提交
183 184 185 186
  if (pTrans->stage == TRN_STAGE_COMMIT) {
    code = trnExecuteCommitLogs(pTrans);
    if (code == 0) {
      trnDrop(pTrans);
S
Shengliang Guan 已提交
187 188 189
    }
  }

S
Shengliang Guan 已提交
190 191 192 193 194
  if (pTrans->stage == TRN_STAGE_ROLLBACK) {
  }

  if (pTrans->stage == TRN_STAGE_RETRY) {
  }
S
Shengliang Guan 已提交
195 196

  return 0;
S
Shengliang Guan 已提交
197
}