diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index f302a21c7a4b78af8d0f436796c2dc661fa60bae..6e4e476b3ee43a30fb2e9aea24a94f034c7c367b 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -21,7 +21,7 @@ extern "C" { #endif #define SDB_GET_BINARY_VAL(pData, dataLen, val, valLen, code) \ - { \ + if (code == 0) { \ if ((dataLen) >= (valLen)) { \ memcpy((val), (char *)(pData), (valLen)); \ (dataLen) -= (valLen); \ @@ -32,7 +32,7 @@ extern "C" { } #define SDB_GET_INT32_VAL(pData, dataLen, val, code) \ - { \ + if (code == 0) { \ if (dataLen >= sizeof(int32_t)) { \ *(int32_t *)(pData) = (int32_t)(val); \ (dataLen) -= sizeof(int32_t); \ @@ -43,7 +43,7 @@ extern "C" { } #define SDB_GET_INT64_VAL(pData, dataLen, val, code) \ - { \ + if (code == 0) { \ if (dataLen >= sizeof(int64_t)) { \ *(int64_t *)(pData) = (int64_t)(val); \ (dataLen) -= sizeof(int64_t); \ @@ -87,7 +87,7 @@ typedef enum { SDB_VGROUP = 9, SDB_STABLE = 10, SDB_FUNC = 11, - SDB_OPER = 12, + SDB_TRANS = 12, SDB_MAX = 13 } ESdbType; diff --git a/include/dnode/mnode/transaction/trn.h b/include/dnode/mnode/transaction/trn.h index 62aa326682d44689e4025906769c4bc752147c5d..47ac068b54b9c04cacd0fddbd57d3676d189546b 100644 --- a/include/dnode/mnode/transaction/trn.h +++ b/include/dnode/mnode/transaction/trn.h @@ -29,7 +29,9 @@ int32_t trnInit(); void trnCleanup(); STrans *trnCreate(); +int32_t trnPrepare(STrans *); int32_t trnCommit(STrans *); +int32_t trnExecute(STrans *); void trnDrop(STrans *); int32_t trnAppendRedoLog(STrans *, SSdbRawData *); diff --git a/source/dnode/mnode/transaction/inc/trnInt.h b/source/dnode/mnode/transaction/inc/trnInt.h index c5d57d10200614a350b4eb50a6527d877566e7d0..74c64f673faae6374b63816ae7d37c4c905baeb6 100644 --- a/source/dnode/mnode/transaction/inc/trnInt.h +++ b/source/dnode/mnode/transaction/inc/trnInt.h @@ -41,6 +41,7 @@ typedef struct STrans { SArray *undoActions; } STrans; + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/transaction/src/trn.c b/source/dnode/mnode/transaction/src/trn.c index 850880d16dd5b706686aa4f5f23dd95f9c63e1df..c0c0deae7c7f3e7b574136461223f595442b760f 100644 --- a/source/dnode/mnode/transaction/src/trn.c +++ b/source/dnode/mnode/transaction/src/trn.c @@ -16,11 +16,9 @@ #define _DEFAULT_SOURCE #include "trnInt.h" +#define TRN_VER 1 #define TRN_DEFAULT_ARRAY_SIZE 8 -int32_t trnInit() { return 0; } -void trnCleanup(); - STrans *trnCreate() { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { @@ -36,12 +34,7 @@ STrans *trnCreate() { if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { - taosArrayDestroy(pTrans->redoLogs); - taosArrayDestroy(pTrans->undoLogs); - taosArrayDestroy(pTrans->commitLogs); - taosArrayDestroy(pTrans->redoActions); - taosArrayDestroy(pTrans->undoActions); - free(pTrans); + trnDrop(pTrans); terrno = TSDB_CODE_MND_OUT_OF_MEMORY; return NULL; } @@ -50,7 +43,22 @@ STrans *trnCreate() { } int32_t trnCommit(STrans *pTrans) { return 0; } -void trnDrop(STrans *pTrans) {} + +static void trnDropLogs(SArray *pArray) { + for (int32_t index = 0; index < pArray->size; ++index) { + SSdbRawData *pRaw = taosArrayGetP(pArray, index); + free(pRaw); + } + + taosArrayDestroy(pArray); +} + +void trnDrop(STrans *pTrans) { + trnDropLogs(pTrans->redoLogs); + trnDropLogs(pTrans->undoLogs); + trnDropLogs(pTrans->commitLogs); + free(pTrans); +} int32_t trnAppendRedoLog(STrans *pTrans, SSdbRawData *pRaw) { void *ptr = taosArrayPush(pTrans->redoLogs, &pRaw); @@ -91,3 +99,135 @@ int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { } return 0; } + +static SSdbRawData *trnActionEncode(STrans *pTrans) { + int32_t rawDataLen = 5 * sizeof(int32_t); + int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); + int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); + int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); + int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); + int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); + + for (int32_t index = 0; index < redoLogNum; ++index) { + SSdbRawData *pRawData = taosArrayGet(pTrans->redoLogs, index); + rawDataLen += (sizeof(SSdbRawData) + pRawData->dataLen); + } + + for (int32_t index = 0; index < undoLogNum; ++index) { + SSdbRawData *pRawData = taosArrayGet(pTrans->undoLogs, index); + rawDataLen += (sizeof(SSdbRawData) + pRawData->dataLen); + } + + for (int32_t index = 0; index < commitLogNum; ++index) { + SSdbRawData *pRawData = taosArrayGet(pTrans->commitLogs, index); + rawDataLen += (sizeof(SSdbRawData) + pRawData->dataLen); + } + + SSdbRawData *pRaw = calloc(1, rawDataLen + sizeof(SSdbRawData)); + if (pRaw == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; + } + + int32_t dataLen = 0; + char *pData = pRaw->data; + SDB_SET_INT32_VAL(pData, dataLen, redoLogNum) + SDB_SET_INT32_VAL(pData, dataLen, undoLogNum) + SDB_SET_INT32_VAL(pData, dataLen, commitLogNum) + SDB_SET_INT32_VAL(pData, dataLen, redoActionNum) + SDB_SET_INT32_VAL(pData, dataLen, undoActionNum) + + pRaw->dataLen = dataLen; + pRaw->type = SDB_TRANS; + pRaw->sver = TRN_VER; + return pRaw; +} + +static STrans *trnActionDecode(SSdbRawData *pRaw) { + if (pRaw->sver != TRN_VER) { + terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; + return NULL; + } + + STrans *pTrans = trnCreate(); + if (pTrans == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; + } + + int32_t redoLogNum = 0; + int32_t undoLogNum = 0; + int32_t commitLogNum = 0; + int32_t redoActionNum = 0; + int32_t undoActionNum = 0; + SSdbRawData *pTmp = malloc(sizeof(SSdbRawData)); + + int32_t code = 0; + int32_t dataLen = pRaw->dataLen; + char *pData = pRaw->data; + SDB_GET_INT32_VAL(pData, dataLen, redoLogNum, code) + SDB_GET_INT32_VAL(pData, dataLen, undoLogNum, code) + SDB_GET_INT32_VAL(pData, dataLen, commitLogNum, code) + SDB_GET_INT32_VAL(pData, dataLen, redoActionNum, code) + SDB_GET_INT32_VAL(pData, dataLen, undoActionNum, code) + + for (int32_t index = 0; index < redoLogNum; ++index) { + SDB_GET_BINARY_VAL(pData, dataLen, pTmp, sizeof(SSdbRawData), code); + if (code == 0 && pTmp->dataLen > 0) { + SSdbRawData *pRead = malloc(sizeof(SSdbRawData) + pTmp->dataLen); + if (pRead == NULL) { + code = TSDB_CODE_MND_OUT_OF_MEMORY; + break; + } + memcpy(pRead, pTmp, sizeof(SSdbRawData)); + SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); + void *ret = taosArrayPush(pTrans->redoLogs, &pRead); + if (ret == NULL) { + code = TSDB_CODE_MND_OUT_OF_MEMORY; + break; + } + } + } + + if (code != 0) { + trnDrop(pTrans); + terrno = code; + return NULL; + } + + return pTrans; +} + +static int32_t trnActionInsert(STrans *pTrans) { + SArray *pArray = pTrans->redoLogs; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t index = 0; index < arraySize; ++index) { + SSdbRawData *pRaw = taosArrayGetP(pArray, index); + int32_t code = sdbWrite(pRaw); + if (code != 0) { + return code; + } + } + + return 0; +} + +static int32_t trnActionDelete(STrans *pTrans) { + SArray *pArray = pTrans->redoLogs; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t index = 0; index < arraySize; ++index) { + SSdbRawData *pRaw = taosArrayGetP(pArray, index); + int32_t code = sdbWrite(pRaw); + if (code != 0) { + return code; + } + } + + return 0; +} + +static int32_t trnActionUpdate(STrans *pSrcUser, STrans *pDstUser) { + return 0; +} diff --git a/source/dnode/mnode/transaction/src/trnThread.c b/source/dnode/mnode/transaction/src/trnThread.c new file mode 100644 index 0000000000000000000000000000000000000000..6340f401b121cadb1eb479ac744203f8c0bfe63b --- /dev/null +++ b/source/dnode/mnode/transaction/src/trnThread.c @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "trnInt.h" +#include "tthread.h" + +static struct { + pthread_t *threadId; + bool threadRunning; +} tsTrn; + +static void *trnThreadFunc(void *param) { + while (1) { + pthread_testcancel(); + } + return NULL; +} + +int32_t trnInit() { + tsTrn.threadId = taosCreateThread(trnThreadFunc, NULL); + if (tsTrn.threadId == NULL) { + return TSDB_CODE_MND_OUT_OF_MEMORY; + } + + return 0; +} + +void trnCleanup() { + if (tsTrn.threadId) { + taosDestoryThread(tsTrn.threadId); + tsTrn.threadId = NULL; + } +}