diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 6e4e476b3ee43a30fb2e9aea24a94f034c7c367b..344dc6922d6c8181bc2edf2ff1e0cdd3a6dbbbb1 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -76,18 +76,18 @@ extern "C" { typedef enum { SDB_START = 0, - SDB_VERSION = 1, - SDB_CLUSTER = 2, - SDB_DNODE = 3, - SDB_MNODE = 4, - SDB_ACCT = 5, - SDB_AUTH = 6, - SDB_USER = 7, - SDB_DB = 8, - SDB_VGROUP = 9, - SDB_STABLE = 10, - SDB_FUNC = 11, - SDB_TRANS = 12, + SDB_TRANS = 1, + SDB_VERSION = 2, + SDB_CLUSTER = 3, + SDB_DNODE = 4, + SDB_MNODE = 5, + SDB_ACCT = 6, + SDB_AUTH = 7, + SDB_USER = 8, + SDB_DB = 9, + SDB_VGROUP = 10, + SDB_STABLE = 11, + SDB_FUNC = 12, SDB_MAX = 13 } ESdbType; @@ -104,14 +104,14 @@ typedef struct { int32_t cksum; int32_t dataLen; char data[]; -} SSdbRawData; +} SSdbRaw; typedef int32_t (*SdbInsertFp)(void *pObj); typedef int32_t (*SdbUpdateFp)(void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(void *pObj); typedef int32_t (*SdbDeployFp)(); -typedef void *(*SdbDecodeFp)(SSdbRawData *pRaw); -typedef SSdbRawData *(*SdbEncodeFp)(void *pObj); +typedef void *(*SdbDecodeFp)(SSdbRaw *pRaw); +typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef struct { ESdbType sdbType; @@ -129,7 +129,7 @@ void sdbCleanup(); void sdbSetHandler(SSdbDesc desc); int32_t sdbRead(); -int32_t sdbWrite(SSdbRawData *pRawData); +int32_t sdbWrite(SSdbRaw *pRaw); int32_t sdbCommit(); int32_t sdbDeploy(); diff --git a/include/dnode/mnode/transaction/trn.h b/include/dnode/mnode/transaction/trn.h index 47ac068b54b9c04cacd0fddbd57d3676d189546b..8ba043de1269938e0f651d7d0743a3f5bf4e576a 100644 --- a/include/dnode/mnode/transaction/trn.h +++ b/include/dnode/mnode/transaction/trn.h @@ -24,21 +24,23 @@ extern "C" { #endif typedef struct STrans STrans; +typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; int32_t trnInit(); void trnCleanup(); -STrans *trnCreate(); -int32_t trnPrepare(STrans *); -int32_t trnCommit(STrans *); -int32_t trnExecute(STrans *); -void trnDrop(STrans *); - -int32_t trnAppendRedoLog(STrans *, SSdbRawData *); -int32_t trnAppendUndoLog(STrans *, SSdbRawData *); -int32_t trnAppendCommitLog(STrans *, SSdbRawData *); -int32_t trnAppendRedoAction(STrans *, SEpSet *, void *pMsg); -int32_t trnAppendUndoAction(STrans *, SEpSet *, void *pMsg); +STrans *trnCreate(ETrnPolicy); +void trnDrop(STrans *pTrans); +void trnSetRpcHandle(STrans *pTrans, void *rpcHandle); +int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw); +int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw); +int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw); +int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg); +int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg); + +int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)); +int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code); +int32_t trnExecute(int32_t tranId); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ba3dad63aff745459947ab32ca1ef96cf15f62f7..8c4a726bf61c62f4a9ae624020521d28596e407e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -27,6 +27,7 @@ extern "C" { #define TAOS_FAILED(err) ((err) < 0) const char* tstrerror(int32_t err); +const char* terrstr(); int32_t* taosGetErrno(); #define terrno (*taosGetErrno()) diff --git a/source/dnode/mnode/impl/inc/mnodeSync.h b/source/dnode/mnode/impl/inc/mnodeSync.h index 714d6ed1a8f856d5fb7290b3791254735082d9eb..380ad36e23926af69934ba83333149325c11ce8a 100644 --- a/source/dnode/mnode/impl/inc/mnodeSync.h +++ b/source/dnode/mnode/impl/inc/mnodeSync.h @@ -24,6 +24,7 @@ extern "C" { int32_t mnodeInitSync(); void mnodeCleanUpSync(); +int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData); bool mnodeIsMaster(); diff --git a/source/dnode/mnode/impl/src/mnodeAcct.c b/source/dnode/mnode/impl/src/mnodeAcct.c index 0811a64e926ed8f856f7d6b45c67e437f88c2bb7..e4538fa3912d6aba14058d3827f2967bb6d69c78 100644 --- a/source/dnode/mnode/impl/src/mnodeAcct.c +++ b/source/dnode/mnode/impl/src/mnodeAcct.c @@ -18,8 +18,8 @@ #define ACCT_VER 1 -static SSdbRawData *mnodeAcctActionEncode(SAcctObj *pAcct) { - SSdbRawData *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRawData)); +static SSdbRaw *mnodeAcctActionEncode(SAcctObj *pAcct) { + SSdbRaw *pRaw = calloc(1, sizeof(SAcctObj) + sizeof(SSdbRaw)); if (pRaw == NULL) { terrno = TSDB_CODE_MND_OUT_OF_MEMORY; return NULL; @@ -45,7 +45,7 @@ static SSdbRawData *mnodeAcctActionEncode(SAcctObj *pAcct) { return pRaw; } -static SAcctObj *mnodeAcctActionDecode(SSdbRawData *pRaw) { +static SAcctObj *mnodeAcctActionDecode(SSdbRaw *pRaw) { if (pRaw->sver != ACCT_VER) { terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; return NULL; @@ -96,23 +96,21 @@ static int32_t mnodeCreateDefaultAcct() { SAcctObj acctObj = {0}; tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN); acctObj.createdTime = taosGetTimestampMs(); - acctObj.updateTime = taosGetTimestampMs(); + acctObj.updateTime = acctObj.createdTime; acctObj.acctId = 1; - acctObj.cfg = (SAcctCfg){.maxUsers = 128, - .maxDbs = 128, + acctObj.cfg = (SAcctCfg){.maxUsers = 1024, + .maxDbs = 1024, .maxTimeSeries = INT32_MAX, - .maxStreams = 1000, + .maxStreams = 8092, .maxStorage = INT64_MAX, .accessState = TSDB_VN_ALL_ACCCESS}; - SSdbRawData *pRaw = mnodeAcctActionEncode(&acctObj); + SSdbRaw *pRaw = mnodeAcctActionEncode(&acctObj); if (pRaw != NULL) { - code = sdbWrite(pRaw); - } else { - code = terrno; + return -1; } - return code; + return sdbWrite(pRaw); } int32_t mnodeInitAcct() { diff --git a/source/dnode/mnode/impl/src/mnodeSync.c b/source/dnode/mnode/impl/src/mnodeSync.c index c161bb971ac7c4a0a74e39acd915bd4a6eb209db..7541ab6b595a2206c0d4c18441355454f7c26bf6 100644 --- a/source/dnode/mnode/impl/src/mnodeSync.c +++ b/source/dnode/mnode/impl/src/mnodeSync.c @@ -20,4 +20,10 @@ int32_t mnodeInitSync() { return 0; } void mnodeCleanUpSync() {} +int32_t mnodeSyncPropose(SSdbRaw *pRaw, void *pData) { + trnApply(pRaw, pData, 0); + free(pRaw); + return 0; +} + bool mnodeIsMaster() { return true; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c index cc321f0ef0be35165b97f10a7dbc6291e94f8c2c..3b5d40d807090ca51554f891729a712c05e51430 100644 --- a/source/dnode/mnode/impl/src/mnodeUser.c +++ b/source/dnode/mnode/impl/src/mnodeUser.c @@ -14,15 +14,15 @@ */ #define _DEFAULT_SOURCE +#include "mnodeSync.h" #include "os.h" -#include "tkey.h" #include "tglobal.h" -#include "mnodeInt.h" +#include "tkey.h" #define USER_VER 1 -static SSdbRawData *mnodeUserActionEncode(SUserObj *pUser) { - SSdbRawData *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRawData)); +static SSdbRaw *mnodeUserActionEncode(SUserObj *pUser) { + SSdbRaw *pRaw = calloc(1, sizeof(SUserObj) + sizeof(SSdbRaw)); if (pRaw == NULL) { terrno = TSDB_CODE_MND_OUT_OF_MEMORY; return NULL; @@ -43,7 +43,7 @@ static SSdbRawData *mnodeUserActionEncode(SUserObj *pUser) { return pRaw; } -static SUserObj *mnodeUserActionDecode(SSdbRawData *pRaw) { +static SUserObj *mnodeUserActionDecode(SSdbRaw *pRaw) { if (pRaw->sver != USER_VER) { terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; return NULL; @@ -77,12 +77,14 @@ static SUserObj *mnodeUserActionDecode(SSdbRawData *pRaw) { static int32_t mnodeUserActionInsert(SUserObj *pUser) { pUser->prohibitDbHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (pUser->prohibitDbHash == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return -1; } pUser->pAcct = sdbAcquire(SDB_ACCT, pUser->acct); if (pUser->pAcct == NULL) { - return TSDB_CODE_MND_ACCT_NOT_EXIST; + terrno = TSDB_CODE_MND_ACCT_NOT_EXIST; + return -1; } return 0; @@ -108,8 +110,6 @@ static int32_t mnodeUserActionUpdate(SUserObj *pSrcUser, SUserObj *pDstUser) { } static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { - int32_t code = 0; - SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -121,35 +121,31 @@ static int32_t mnodeCreateDefaultUser(char *acct, char *user, char *pass) { userObj.rootAuth = 1; } - SSdbRawData *pRaw = mnodeUserActionEncode(&userObj); - if (pRaw != NULL) { - code = sdbWrite(pRaw); - } else { - code = terrno; + SSdbRaw *pRaw = mnodeUserActionEncode(&userObj); + if (pRaw == NULL) { + return -1; } - return code; + return sdbWrite(pRaw); } static int32_t mnodeCreateDefaultUsers() { - int32_t code = mnodeCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS); - if (code != 0) return code; + if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { + return -1; + } - code = mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "monitor", tsInternalPass); - if (code != 0) return code; + if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "monitor", tsInternalPass) != 0) { + return -1; + } - code = mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, tsInternalPass); - if (code != 0) return code; + if (mnodeCreateDefaultUser(TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, tsInternalPass) != 0) { + return -1; + } - return code; + return 0; } static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg) { - int32_t code = 0; - STrans *pTrans = NULL; - SSdbRawData *pUndoRaw = NULL; - SSdbRawData *pRedoRaw = NULL; - SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -158,79 +154,80 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnMsg *pMsg) userObj.updateTime = userObj.createdTime; userObj.rootAuth = 0; - pRedoRaw = mnodeUserActionEncode(&userObj); - if (pRedoRaw == NULL) { - code = terrno; - goto CREATE_USER_OVER; + STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK); + if (pTrans == NULL) return -1; + + SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj); + if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) { + trnDrop(pTrans); + return -1; } - pRedoRaw->status = SDB_STATUS_READY; + pRedoRaw->status = SDB_STATUS_CREATING; pRedoRaw->action = SDB_ACTION_INSERT; - pUndoRaw = mnodeUserActionEncode(&userObj); - if (pUndoRaw == NULL) { - code = terrno; - goto CREATE_USER_OVER; + SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); + if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { + trnDrop(pTrans); + return -1; } pUndoRaw->status = SDB_STATUS_DROPPING; pUndoRaw->action = SDB_ACTION_DELETE; - pTrans = trnCreate(); - if (pTrans == NULL) { - code = TSDB_CODE_MND_OUT_OF_MEMORY; - goto CREATE_USER_OVER; + SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); + if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { + trnDrop(pTrans); + return -1; } - trnAppendRedoLog(pTrans, pRedoRaw); - trnAppendUndoLog(pTrans, pUndoRaw); + pCommitRaw->status = SDB_STATUS_READY; + pCommitRaw->action = SDB_ACTION_UPDATE; - code = trnCommit(pTrans); + trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle); -CREATE_USER_OVER: - if (code != 0) { + if (trnPrepare(pTrans, mnodeSyncPropose) != 0) { trnDrop(pTrans); - free(pRedoRaw); - free(pUndoRaw); + return -1; } - return code; + trnDrop(pTrans); + return 0; } static int32_t mnodeProcessCreateUserMsg(SMnMsg *pMsg) { SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont; - int32_t code = TSDB_CODE_SUCCESS; if (pCreate->user[0] == 0) { - code = TSDB_CODE_MND_INVALID_USER_FORMAT; - mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code)); - return code; + terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; + mError("user:%s, failed to create since %s", pCreate->user, terrstr()); + return -1; } if (pCreate->pass[0] == 0) { - code = TSDB_CODE_MND_INVALID_PASS_FORMAT; - mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code)); - return code; + terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT; + mError("user:%s, failed to create since %s", pCreate->user, terrstr()); + return -1; } SUserObj *pUser = sdbAcquire(SDB_USER, pCreate->user); if (pUser != NULL) { sdbRelease(pUser); - code = TSDB_CODE_MND_USER_ALREADY_EXIST; - mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code)); - return code; + terrno = TSDB_CODE_MND_USER_ALREADY_EXIST; + mError("user:%s, failed to create since %s", pCreate->user, terrstr()); + return -1; } SUserObj *pOperUser = sdbAcquire(SDB_USER, pMsg->user); if (pOperUser == NULL) { - code = TSDB_CODE_MND_NO_USER_FROM_CONN; - mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code)); - return code; + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + mError("user:%s, failed to create since %s", pCreate->user, terrstr()); + return -1; } - code = mnodeCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg); + int32_t code = mnodeCreateUser(pOperUser->acct, pCreate->user, pCreate->pass, pMsg); sdbRelease(pOperUser); if (code != 0) { - mError("user:%s, failed to create since %s", pCreate->user, tstrerror(code)); - return code; + mError("user:%s, failed to create since %s", pCreate->user, terrstr()); + return -1; } return TSDB_CODE_MND_ACTION_IN_PROGRESS; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index a47ed8ec20598ca3e37f526677077ec92c0b32d3..d3b825e190453572b9fbd518c9718bffd14d0f26 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -61,7 +61,7 @@ static SHashObj *sdbGetHash(int32_t sdb) { return hash; } -int32_t sdbWrite(SSdbRawData *pRaw) { +int32_t sdbWrite(SSdbRaw *pRaw) { SHashObj *hash = sdbGetHash(pRaw->type); switch (pRaw->action) { case SDB_ACTION_INSERT: @@ -85,7 +85,7 @@ static int32_t sdbReadVersion(FileFd fd) { return 0; } static int32_t sdbReadDataFile() { int32_t code = 0; - SSdbRawData *pRaw = malloc(SDB_MAX_SIZE); + SSdbRaw *pRaw = malloc(SDB_MAX_SIZE); if (pRaw == NULL) { return TSDB_CODE_MND_OUT_OF_MEMORY; } @@ -101,7 +101,7 @@ static int32_t sdbReadDataFile() { int64_t offset = 0; while (1) { - int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRawData)); + int32_t ret = (int32_t)taosReadFile(fd, pRaw, sizeof(SSdbRaw)); if (ret == 0) break; if (ret < 0) { @@ -110,7 +110,7 @@ static int32_t sdbReadDataFile() { break; } - if (ret < sizeof(SSdbRawData)) { + if (ret < sizeof(SSdbRaw)) { code = TSDB_CODE_SDB_INTERNAL_ERROR; mError("failed to read file:%s since %s", file, tstrerror(code)); break; @@ -143,7 +143,7 @@ static int32_t sdbWriteDataFile() { return code; } - for (int32_t i = SDB_START; i < SDB_MAX; ++i) { + for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) { SHashObj *hash = tsSdb.hashObjs[i]; if (!hash) continue; @@ -153,9 +153,9 @@ static int32_t sdbWriteDataFile() { SSdbRow *pRow = taosHashIterate(hash, NULL); while (pRow != NULL) { if (pRow->status == SDB_STATUS_READY) continue; - SSdbRawData *pRaw = (*encodeFp)(pRow->data); + SSdbRaw *pRaw = (*encodeFp)(pRow->data); if (pRaw != NULL) { - taosWriteFile(fd, pRaw, sizeof(SSdbRawData) + pRaw->dataLen); + taosWriteFile(fd, pRaw, sizeof(SSdbRaw) + pRaw->dataLen); } else { taosHashCancelIterate(hash, pRow); code = TSDB_CODE_SDB_INTERNAL_ERROR; diff --git a/source/dnode/mnode/transaction/CMakeLists.txt b/source/dnode/mnode/transaction/CMakeLists.txt index 6b01f5bd18d809f4f222818005a9978ec1bcada8..d35a8c9b3ff045a4e36af3e261576c3d29ba3b57 100644 --- a/source/dnode/mnode/transaction/CMakeLists.txt +++ b/source/dnode/mnode/transaction/CMakeLists.txt @@ -11,4 +11,5 @@ target_link_libraries( PRIVATE common PRIVATE util PRIVATE sdb + PRIVATE transport ) diff --git a/source/dnode/mnode/transaction/inc/trnInt.h b/source/dnode/mnode/transaction/inc/trnInt.h index 74c64f673faae6374b63816ae7d37c4c905baeb6..03860734ee00fa91c3963a41de6beafcdcb6b169 100644 --- a/source/dnode/mnode/transaction/inc/trnInt.h +++ b/source/dnode/mnode/transaction/inc/trnInt.h @@ -33,14 +33,35 @@ extern "C" { #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} +#define TRN_VER 1 +#define TRN_DEFAULT_ARRAY_SIZE 8 + +typedef enum { + TRN_STAGE_PREPARE = 1, + TRN_STAGE_EXECUTE = 2, + TRN_STAGE_COMMIT = 3, + TRN_STAGE_ROLLBACK = 4, + TRN_STAGE_RETRY = 5 +} ETrnStage; + typedef struct STrans { - SArray *redoLogs; - SArray *undoLogs; - SArray *commitLogs; - SArray *redoActions; - SArray *undoActions; + int32_t id; + ETrnStage stage; + ETrnPolicy policy; + void *rpcHandle; + SArray *redoLogs; + SArray *undoLogs; + SArray *commitLogs; + SArray *redoActions; + SArray *undoActions; } STrans; +SSdbRaw *trnActionEncode(STrans *pTrans); +STrans *trnActionDecode(SSdbRaw *pRaw); +int32_t trnActionInsert(STrans *pTrans); +int32_t trnActionDelete(STrans *pTrans); +int32_t trnActionUpdate(STrans *pSrcTrans, STrans *pDstTrans); +int32_t trnGenerateTransId(); #ifdef __cplusplus } diff --git a/source/dnode/mnode/transaction/src/trnInt.c b/source/dnode/mnode/transaction/src/trnInt.c new file mode 100644 index 0000000000000000000000000000000000000000..4a69007de5e7d9996a489a7af050cfb528e39654 --- /dev/null +++ b/source/dnode/mnode/transaction/src/trnInt.c @@ -0,0 +1,164 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "trnInt.h" + +SSdbRaw *trnActionEncode(STrans *pTrans) { + int32_t rawDataLen = 5 * sizeof(int32_t); + int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); + int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); + int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); + int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); + int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); + + for (int32_t index = 0; index < redoLogNum; ++index) { + SSdbRaw *pRawData = taosArrayGet(pTrans->redoLogs, index); + rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + } + + for (int32_t index = 0; index < undoLogNum; ++index) { + SSdbRaw *pRawData = taosArrayGet(pTrans->undoLogs, index); + rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + } + + for (int32_t index = 0; index < commitLogNum; ++index) { + SSdbRaw *pRawData = taosArrayGet(pTrans->commitLogs, index); + rawDataLen += (sizeof(SSdbRaw) + pRawData->dataLen); + } + + SSdbRaw *pRaw = calloc(1, rawDataLen + sizeof(SSdbRaw)); + if (pRaw == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; + } + + int32_t dataLen = 0; + char *pData = pRaw->data; + SDB_SET_INT32_VAL(pData, dataLen, redoLogNum) + SDB_SET_INT32_VAL(pData, dataLen, undoLogNum) + SDB_SET_INT32_VAL(pData, dataLen, commitLogNum) + SDB_SET_INT32_VAL(pData, dataLen, redoActionNum) + SDB_SET_INT32_VAL(pData, dataLen, undoActionNum) + + pRaw->dataLen = dataLen; + pRaw->type = SDB_TRANS; + pRaw->sver = TRN_VER; + return pRaw; +} + +STrans *trnActionDecode(SSdbRaw *pRaw) { + if (pRaw->sver != TRN_VER) { + terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; + return NULL; + } + + STrans *pTrans = NULL; + if (pTrans == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return NULL; + } + + int32_t redoLogNum = 0; + int32_t undoLogNum = 0; + int32_t commitLogNum = 0; + int32_t redoActionNum = 0; + int32_t undoActionNum = 0; + SSdbRaw *pTmp = malloc(sizeof(SSdbRaw)); + + int32_t code = 0; + int32_t dataLen = pRaw->dataLen; + char *pData = pRaw->data; + SDB_GET_INT32_VAL(pData, dataLen, redoLogNum, code) + SDB_GET_INT32_VAL(pData, dataLen, undoLogNum, code) + SDB_GET_INT32_VAL(pData, dataLen, commitLogNum, code) + SDB_GET_INT32_VAL(pData, dataLen, redoActionNum, code) + SDB_GET_INT32_VAL(pData, dataLen, undoActionNum, code) + + for (int32_t index = 0; index < redoLogNum; ++index) { + SDB_GET_BINARY_VAL(pData, dataLen, pTmp, sizeof(SSdbRaw), code); + if (code == 0 && pTmp->dataLen > 0) { + SSdbRaw *pRead = malloc(sizeof(SSdbRaw) + pTmp->dataLen); + if (pRead == NULL) { + code = TSDB_CODE_MND_OUT_OF_MEMORY; + break; + } + memcpy(pRead, pTmp, sizeof(SSdbRaw)); + SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); + void *ret = taosArrayPush(pTrans->redoLogs, &pRead); + if (ret == NULL) { + code = TSDB_CODE_MND_OUT_OF_MEMORY; + break; + } + } + } + + if (code != 0) { + trnDrop(pTrans); + terrno = code; + return NULL; + } + + return pTrans; +} + +int32_t trnActionInsert(STrans *pTrans) { + SArray *pArray = pTrans->redoLogs; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t index = 0; index < arraySize; ++index) { + SSdbRaw *pRaw = taosArrayGetP(pArray, index); + int32_t code = sdbWrite(pRaw); + if (code != 0) { + return code; + } + } + + return 0; +} + +int32_t trnActionDelete(STrans *pTrans) { + SArray *pArray = pTrans->redoLogs; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t index = 0; index < arraySize; ++index) { + SSdbRaw *pRaw = taosArrayGetP(pArray, index); + int32_t code = sdbWrite(pRaw); + if (code != 0) { + return code; + } + } + + return 0; +} + +int32_t trnActionUpdate(STrans *pSrcTrans, STrans *pDstTrans) { return 0; } + +int32_t trnGenerateTransId() { return 1; } + +int32_t trnInit() { + SSdbDesc desc = {.sdbType = SDB_TRANS, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)trnActionEncode, + .decodeFp = (SdbDecodeFp)trnActionDecode, + .insertFp = (SdbInsertFp)trnActionInsert, + .updateFp = (SdbUpdateFp)trnActionUpdate, + .deleteFp = (SdbDeleteFp)trnActionDelete}; + sdbSetHandler(desc); + + return 0; +} + +void trnCleanup() {} diff --git a/source/dnode/mnode/transaction/src/trnMain.c b/source/dnode/mnode/transaction/src/trnMain.c index c0c0deae7c7f3e7b574136461223f595442b760f..3ff991a676e71e8ff5e6f4bfad7410e30e2d0f8a 100644 --- a/source/dnode/mnode/transaction/src/trnMain.c +++ b/source/dnode/mnode/transaction/src/trnMain.c @@ -15,17 +15,18 @@ #define _DEFAULT_SOURCE #include "trnInt.h" +#include "trpc.h" -#define TRN_VER 1 -#define TRN_DEFAULT_ARRAY_SIZE 8 - -STrans *trnCreate() { +STrans *trnCreate(ETrnPolicy policy) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { terrno = TSDB_CODE_MND_OUT_OF_MEMORY; return NULL; } + pTrans->id = trnGenerateTransId(); + pTrans->stage = TRN_STAGE_PREPARE; + pTrans->policy = policy; pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); @@ -34,7 +35,6 @@ STrans *trnCreate() { if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { - trnDrop(pTrans); terrno = TSDB_CODE_MND_OUT_OF_MEMORY; return NULL; } @@ -42,192 +42,156 @@ STrans *trnCreate() { return pTrans; } -int32_t trnCommit(STrans *pTrans) { return 0; } - -static void trnDropLogs(SArray *pArray) { +static void trnDropArray(SArray *pArray) { for (int32_t index = 0; index < pArray->size; ++index) { - SSdbRawData *pRaw = taosArrayGetP(pArray, index); - free(pRaw); + SSdbRaw *pRaw = taosArrayGetP(pArray, index); + tfree(pRaw); } taosArrayDestroy(pArray); } void trnDrop(STrans *pTrans) { - trnDropLogs(pTrans->redoLogs); - trnDropLogs(pTrans->undoLogs); - trnDropLogs(pTrans->commitLogs); - free(pTrans); + trnDropArray(pTrans->redoLogs); + trnDropArray(pTrans->undoLogs); + trnDropArray(pTrans->commitLogs); + trnDropArray(pTrans->redoActions); + trnDropArray(pTrans->undoActions); + tfree(pTrans); } -int32_t trnAppendRedoLog(STrans *pTrans, SSdbRawData *pRaw) { - void *ptr = taosArrayPush(pTrans->redoLogs, &pRaw); - if (ptr == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; +void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { + if (pTrans != NULL) { + pTrans->rpcHandle = rpcHandle; } - return 0; } -int32_t trnAppendUndoLog(STrans *pTrans, SSdbRawData *pRaw) { - void *ptr = taosArrayPush(pTrans->undoLogs, &pRaw); - if (ptr == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; +static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) { + if (pArray == NULL || pRaw == NULL) { + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return -1; } - return 0; -} -int32_t trnAppendCommitLog(STrans *pTrans, SSdbRawData *pRaw) { - void *ptr = taosArrayPush(pTrans->commitLogs, &pRaw); + void *ptr = taosArrayPush(pArray, &pRaw); if (ptr == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; + terrno = TSDB_CODE_MND_OUT_OF_MEMORY; + return -1; } + return 0; } +int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->redoLogs, pRaw); } + +int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->undoLogs, pRaw); } + +int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->commitLogs, pRaw); } + int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { - void *ptr = taosArrayPush(pTrans->redoActions, &pMsg); - if (ptr == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } - return 0; + return trnAppendArray(pTrans->redoActions, pMsg); } int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { - void *ptr = taosArrayPush(pTrans->undoActions, &pMsg); - if (ptr == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } - return 0; + return trnAppendArray(pTrans->undoActions, pMsg); } -static SSdbRawData *trnActionEncode(STrans *pTrans) { - int32_t rawDataLen = 5 * sizeof(int32_t); - int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); - int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); - int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); - int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); - int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); +int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) { + if (syncfp == NULL) return -1; - for (int32_t index = 0; index < redoLogNum; ++index) { - SSdbRawData *pRawData = taosArrayGet(pTrans->redoLogs, index); - rawDataLen += (sizeof(SSdbRawData) + pRawData->dataLen); + SSdbRaw *pRaw = trnActionEncode(pTrans); + if (pRaw == NULL) { + mError("tranId:%d, failed to decode trans since %s", pTrans->id, terrstr()); + return -1; } - for (int32_t index = 0; index < undoLogNum; ++index) { - SSdbRawData *pRawData = taosArrayGet(pTrans->undoLogs, index); - rawDataLen += (sizeof(SSdbRawData) + pRawData->dataLen); + if (sdbWrite(pRaw) != 0) { + mError("tranId:%d, failed to write trans since %s", pTrans->id, terrstr()); + return -1; } - for (int32_t index = 0; index < commitLogNum; ++index) { - SSdbRawData *pRawData = taosArrayGet(pTrans->commitLogs, index); - rawDataLen += (sizeof(SSdbRawData) + pRawData->dataLen); + if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) { + mError("tranId:%d, failed to sync trans since %s", pTrans->id, terrstr()); + return -1; } - SSdbRawData *pRaw = calloc(1, rawDataLen + sizeof(SSdbRawData)); - if (pRaw == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; - return NULL; - } - - int32_t dataLen = 0; - char *pData = pRaw->data; - SDB_SET_INT32_VAL(pData, dataLen, redoLogNum) - SDB_SET_INT32_VAL(pData, dataLen, undoLogNum) - SDB_SET_INT32_VAL(pData, dataLen, commitLogNum) - SDB_SET_INT32_VAL(pData, dataLen, redoActionNum) - SDB_SET_INT32_VAL(pData, dataLen, undoActionNum) - - pRaw->dataLen = dataLen; - pRaw->type = SDB_TRANS; - pRaw->sver = TRN_VER; - return pRaw; + return 0; } -static STrans *trnActionDecode(SSdbRawData *pRaw) { - if (pRaw->sver != TRN_VER) { - terrno = TSDB_CODE_SDB_INVAID_RAW_DATA_VER; - return NULL; +static void trnSendRpcRsp(void *rpcHandle, int32_t code) { + if (rpcHandle != NULL) { + SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno}; + rpcSendResponse(&rspMsg); } +} - STrans *pTrans = trnCreate(); - if (pTrans == NULL) { - terrno = TSDB_CODE_MND_OUT_OF_MEMORY; - return NULL; - } - - int32_t redoLogNum = 0; - int32_t undoLogNum = 0; - int32_t commitLogNum = 0; - int32_t redoActionNum = 0; - int32_t undoActionNum = 0; - SSdbRawData *pTmp = malloc(sizeof(SSdbRawData)); - - int32_t code = 0; - int32_t dataLen = pRaw->dataLen; - char *pData = pRaw->data; - SDB_GET_INT32_VAL(pData, dataLen, redoLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, undoLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, commitLogNum, code) - SDB_GET_INT32_VAL(pData, dataLen, redoActionNum, code) - SDB_GET_INT32_VAL(pData, dataLen, undoActionNum, code) - - for (int32_t index = 0; index < redoLogNum; ++index) { - SDB_GET_BINARY_VAL(pData, dataLen, pTmp, sizeof(SSdbRawData), code); - if (code == 0 && pTmp->dataLen > 0) { - SSdbRawData *pRead = malloc(sizeof(SSdbRawData) + pTmp->dataLen); - if (pRead == NULL) { - code = TSDB_CODE_MND_OUT_OF_MEMORY; - break; - } - memcpy(pRead, pTmp, sizeof(SSdbRawData)); - SDB_GET_BINARY_VAL(pData, dataLen, pRead->data, pRead->dataLen, code); - void *ret = taosArrayPush(pTrans->redoLogs, &pRead); - if (ret == NULL) { - code = TSDB_CODE_MND_OUT_OF_MEMORY; - break; - } - } +int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) { + if (code != 0) { + trnSendRpcRsp(pData, terrno); + return 0; } - if (code != 0) { - trnDrop(pTrans); + if (sdbWrite(pRaw) != 0) { + code = terrno; + trnSendRpcRsp(pData, code); terrno = code; - return NULL; + return -1; } - return pTrans; + return 0; } -static int32_t trnActionInsert(STrans *pTrans) { - SArray *pArray = pTrans->redoLogs; - int32_t arraySize = taosArrayGetSize(pArray); +int32_t trnExecuteRedoLogs(STrans *pTrans) {return 0;} +int32_t trnExecuteUndoLogs(STrans *pTrans) {return 0;} +int32_t trnExecuteCommitLogs(STrans *pTrans) {return 0;} +int32_t trnExecuteRedoActions(STrans *pTrans) {return 0;} +int32_t trnExecuteUndoActions(STrans *pTrans) {return 0;} +static int32_t trnPerfomRollbackStage(STrans *pTrans) { return 0; } - for (int32_t index = 0; index < arraySize; ++index) { - SSdbRawData *pRaw = taosArrayGetP(pArray, index); - int32_t code = sdbWrite(pRaw); - if (code != 0) { - return code; - } +int32_t trnExecute(int32_t tranId) { + int32_t code = 0; + + STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId); + if (pTrans == NULL) { + code = terrno; + return code; } - return 0; -} + if (pTrans->stage == TRN_STAGE_PREPARE) { + code = trnExecuteRedoLogs(pTrans); + if (code == 0) { + pTrans->stage = TRN_STAGE_EXECUTE; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + } + } -static int32_t trnActionDelete(STrans *pTrans) { - SArray *pArray = pTrans->redoLogs; - int32_t arraySize = taosArrayGetSize(pArray); + if (pTrans->stage == TRN_STAGE_EXECUTE) { + code = trnExecuteRedoActions(pTrans); + if (code == 0) { + pTrans->stage = TRN_STAGE_COMMIT; + } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { + // do nothing + } else { + if (pTrans->policy == TRN_POLICY_RETRY) { + pTrans->stage = TRN_STAGE_RETRY; + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + } + } + } - for (int32_t index = 0; index < arraySize; ++index) { - SSdbRawData *pRaw = taosArrayGetP(pArray, index); - int32_t code = sdbWrite(pRaw); - if (code != 0) { - return code; + if (pTrans->stage == TRN_STAGE_COMMIT) { + code = trnExecuteCommitLogs(pTrans); + if (code == 0) { + trnDrop(pTrans); } } - return 0; -} + if (pTrans->stage == TRN_STAGE_ROLLBACK) { + } + + if (pTrans->stage == TRN_STAGE_RETRY) { + } -static int32_t trnActionUpdate(STrans *pSrcUser, STrans *pDstUser) { return 0; -} +} \ No newline at end of file diff --git a/source/dnode/mnode/transaction/src/trnThread.c b/source/dnode/mnode/transaction/src/trnThread.c deleted file mode 100644 index 6340f401b121cadb1eb479ac744203f8c0bfe63b..0000000000000000000000000000000000000000 --- a/source/dnode/mnode/transaction/src/trnThread.c +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "trnInt.h" -#include "tthread.h" - -static struct { - pthread_t *threadId; - bool threadRunning; -} tsTrn; - -static void *trnThreadFunc(void *param) { - while (1) { - pthread_testcancel(); - } - return NULL; -} - -int32_t trnInit() { - tsTrn.threadId = taosCreateThread(trnThreadFunc, NULL); - if (tsTrn.threadId == NULL) { - return TSDB_CODE_MND_OUT_OF_MEMORY; - } - - return 0; -} - -void trnCleanup() { - if (tsTrn.threadId) { - taosDestoryThread(tsTrn.threadId); - tsTrn.threadId = NULL; - } -} diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2003270b2168d7f023f3718e1d8333a6b17d7994..4151178e2e0ea4448588e4676fa9d1ea20501245 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -524,3 +524,5 @@ const char* tstrerror(int32_t err) { return ""; } + +const char* terrstr() { return tstrerror(terrno); } \ No newline at end of file