提交 72ec9324 编写于 作者: S Shengliang Guan

refact transaction

上级 fed05bb6
...@@ -56,39 +56,39 @@ extern "C" { ...@@ -56,39 +56,39 @@ extern "C" {
dataPos += valLen; \ dataPos += valLen; \
} }
#define SDB_SET_INT64(pData, dataPos, val) \ #define SDB_SET_INT64(pRaw, dataPos, val) \
{ \ { \
if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \ if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += sizeof(int64_t); \ dataPos += sizeof(int64_t); \
} }
#define SDB_SET_INT32(pData, dataPos, val) \ #define SDB_SET_INT32(pRaw, dataPos, val) \
{ \ { \
if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \ if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += sizeof(int32_t); \ dataPos += sizeof(int32_t); \
} }
#define SDB_SET_INT8(pData, dataPos, val) \ #define SDB_SET_INT8(pRaw, dataPos, val) \
{ \ { \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += sizeof(int8_t); \ dataPos += sizeof(int8_t); \
} }
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \ #define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \
{ \ { \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \ if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
dataPos += valLen; \ dataPos += valLen; \
} }
...@@ -97,7 +97,7 @@ extern "C" { ...@@ -97,7 +97,7 @@ extern "C" {
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \ if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \ sdbFreeRaw(pRaw); \
return NULL; \ return NULL; \
}; \ } \
} }
typedef struct SSdbRaw SSdbRaw; typedef struct SSdbRaw SSdbRaw;
......
...@@ -41,7 +41,7 @@ int32_t dnodeInit(); ...@@ -41,7 +41,7 @@ int32_t dnodeInit();
void dnodeCleanup(); void dnodeCleanup();
EDnStat dnodeGetRunStat(); EDnStat dnodeGetRunStat();
void dnodeSetRunStat(); void dnodeSetRunStat(EDnStat stat);
void dnodeReportStartup(char *name, char *desc); void dnodeReportStartup(char *name, char *desc);
void dnodeReportStartupFinished(char *name, char *desc); void dnodeReportStartupFinished(char *name, char *desc);
......
...@@ -33,7 +33,10 @@ static struct { ...@@ -33,7 +33,10 @@ static struct {
EDnStat dnodeGetRunStat() { return tsInt.runStat; } EDnStat dnodeGetRunStat() { return tsInt.runStat; }
void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; } void dnodeSetRunStat(EDnStat stat) {
dDebug("runstat set to %d", stat);
tsInt.runStat = stat;
}
void dnodeReportStartup(char *name, char *desc) { void dnodeReportStartup(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup; SStartupMsg *pStartup = &tsInt.startup;
...@@ -99,7 +102,6 @@ static int32_t dnodeInitDir() { ...@@ -99,7 +102,6 @@ static int32_t dnodeInitDir() {
} }
static int32_t dnodeInitMain() { static int32_t dnodeInitMain() {
tsInt.runStat = DN_RUN_STAT_STOPPED;
tscEmbedded = 1; tscEmbedded = 1;
taosIgnSIGPIPE(); taosIgnSIGPIPE();
taosBlockSIGPIPE(); taosBlockSIGPIPE();
...@@ -147,7 +149,9 @@ static void dnodeCleanupMain() { ...@@ -147,7 +149,9 @@ static void dnodeCleanupMain() {
int32_t dnodeInit() { int32_t dnodeInit() {
SSteps *steps = taosStepInit(10, dnodeReportStartup); SSteps *steps = taosStepInit(10, dnodeReportStartup);
if (steps == NULL) return -1; if (steps == NULL) return -1;
#if 1
dnodeSetRunStat(DN_RUN_STAT_RUNNING);
#endif
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain); taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup); taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", NULL, NULL); taosStepAdd(steps, "dnode-tfs", NULL, NULL);
......
...@@ -330,17 +330,22 @@ static void dnodeProcessMnodeApplyQueue(void *unused, SMnodeMsg *pMsg) { mnodePr ...@@ -330,17 +330,22 @@ static void dnodeProcessMnodeApplyQueue(void *unused, SMnodeMsg *pMsg) { mnodePr
static void dnodeProcessMnodeSyncQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_SYNC); } static void dnodeProcessMnodeSyncQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_SYNC); }
static int32_t dnodeWriteMnodeMsgToQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) { static int32_t dnodeWriteMnodeMsgToQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
int32_t code = 0; int32_t code = 0;
SMnodeMsg *pMsg = NULL;
if (pQueue == NULL) { if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED; code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else { } else {
SMnodeMsg *pMsg = mnodeInitMsg(pRpcMsg); pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) { if (pMsg == NULL) {
code = terrno; code = terrno;
} }
} }
if (code == 0) {
code = taosWriteQitem(pQueue, pMsg);
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
......
...@@ -314,6 +314,8 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c ...@@ -314,6 +314,8 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
} }
static int32_t dnodeInitShellServer() { static int32_t dnodeInitShellServer() {
dnodeInitMsgFp();
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
numOfThreads = 1; numOfThreads = 1;
...@@ -336,6 +338,17 @@ static int32_t dnodeInitShellServer() { ...@@ -336,6 +338,17 @@ static int32_t dnodeInitShellServer() {
return -1; return -1;
} }
#if 1
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER;
rpcMsg.contLen = sizeof(SCreateUserMsg);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
SCreateUserMsg *pMsg = (SCreateUserMsg*)rpcMsg.pCont;
strcpy(pMsg->user, "u1");
strcpy(pMsg->pass, "up1");
dnodeProcessShellReq(&rpcMsg, NULL);
#endif
dInfo("dnode shell rpc server is initialized"); dInfo("dnode shell rpc server is initialized");
return 0; return 0;
} }
...@@ -369,7 +382,11 @@ void dnodeCleanupTrans() { ...@@ -369,7 +382,11 @@ void dnodeCleanupTrans() {
dnodeCleanupClient(); dnodeCleanupClient();
} }
void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) {
#if 0
rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL);
#endif
}
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SEpSet epSet = {0}; SEpSet epSet = {0};
......
add_subdirectory(impl) add_subdirectory(impl)
add_subdirectory(sdb) add_subdirectory(sdb)
add_subdirectory(transaction)
...@@ -8,7 +8,6 @@ target_include_directories( ...@@ -8,7 +8,6 @@ target_include_directories(
target_link_libraries( target_link_libraries(
mnode mnode
PRIVATE sdb PRIVATE sdb
PRIVATE transaction
PUBLIC transport PUBLIC transport
PUBLIC cjson PUBLIC cjson
) )
\ No newline at end of file
...@@ -76,6 +76,28 @@ typedef enum { ...@@ -76,6 +76,28 @@ typedef enum {
MN_AUTH_MAX MN_AUTH_MAX
} EMnAuthOp; } EMnAuthOp;
typedef enum {
TRN_STAGE_PREPARE = 1,
TRN_STAGE_EXECUTE = 2,
TRN_STAGE_COMMIT = 3,
TRN_STAGE_ROLLBACK = 4,
TRN_STAGE_RETRY = 5
} ETrnStage;
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
typedef struct STrans {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
void *rpcHandle;
SArray *redoLogs;
SArray *undoLogs;
SArray *commitLogs;
SArray *redoActions;
SArray *undoActions;
} STrans;
typedef struct SClusterObj { typedef struct SClusterObj {
......
...@@ -18,13 +18,23 @@ ...@@ -18,13 +18,23 @@
#include "mnodeDef.h" #include "mnodeDef.h"
#include "sdb.h" #include "sdb.h"
#include "trn.h" #include "tstep.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef void (*MnodeRpcFp)(SMnodeMsg *pMsg); typedef int32_t (*MnodeRpcFp)(SMnodeMsg *pMsg);
typedef struct SMnode {
int32_t dnodeId;
int64_t clusterId;
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
SMnodePara para;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
} SMnode;
tmr_h mnodeGetTimer(); tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId(); int32_t mnodeGetDnodeId();
......
...@@ -13,25 +13,20 @@ ...@@ -13,25 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_TRANSACTION_H_ #ifndef _TD_TRANSACTION_INT_H_
#define _TD_TRANSACTION_H_ #define _TD_TRANSACTION_INT_H_
#include "sdb.h" #include "mnodeInt.h"
#include "taosmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
typedef struct STrans STrans; int32_t mnodeInitTrans();
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; void mnodeCleanupTrans();
int32_t trnInit(); STrans *trnCreate(ETrnPolicy policy, void *rpcHandle);
void trnCleanup();
STrans *trnCreate(ETrnPolicy);
void trnDrop(STrans *pTrans); void trnDrop(STrans *pTrans);
void trnSetRpcHandle(STrans *pTrans, void *rpcHandle);
int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw); int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw);
int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw); int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw);
int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw); int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw);
...@@ -42,8 +37,11 @@ int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData) ...@@ -42,8 +37,11 @@ int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code); int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code);
int32_t trnExecute(int32_t tranId); int32_t trnExecute(int32_t tranId);
SSdbRaw *trnActionEncode(STrans *pTrans);
SSdbRow *trnActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_TRANSACTION_H_*/ #endif /*_TD_TRANSACTION_INT_H_*/
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
#include "tstep.h"
#include "tqueue.h" #include "tqueue.h"
#include "mnodeAcct.h" #include "mnodeAcct.h"
#include "mnodeAuth.h" #include "mnodeAuth.h"
...@@ -34,16 +33,9 @@ ...@@ -34,16 +33,9 @@
#include "mnodeTelem.h" #include "mnodeTelem.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#include "mnodeTrans.h"
static struct { SMnode tsMint = {0};
int32_t dnodeId;
int64_t clusterId;
tmr_h timer;
SSteps *pInitSteps;
SSteps *pStartSteps;
SMnodePara para;
MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
} tsMint;
int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; } int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; }
...@@ -116,7 +108,7 @@ static int32_t mnodeAllocInitSteps() { ...@@ -116,7 +108,7 @@ static int32_t mnodeAllocInitSteps() {
struct SSteps *steps = taosStepInit(16, NULL); struct SSteps *steps = taosStepInit(16, NULL);
if (steps == NULL) return -1; if (steps == NULL) return -1;
if (taosStepAdd(steps, "mnode-trans", trnInit, trnCleanup) != 0) return -1; if (taosStepAdd(steps, "mnode-trans", mnodeInitTrans, mnodeCleanupTrans) != 0) return -1;
if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1; if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1;
if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1; if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1;
if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1; if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1;
...@@ -224,10 +216,14 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) { ...@@ -224,10 +216,14 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) {
static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) { static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
int32_t msgType = pMsg->rpcMsg.msgType; int32_t msgType = pMsg->rpcMsg.msgType;
if (tsMint.msgFp[msgType] == NULL) { MnodeRpcFp fp = tsMint.msgFp[msgType];
if (fp == NULL) {
} }
(*tsMint.msgFp[msgType])(pMsg); int32_t code = (fp)(pMsg);
if (code != 0) {
assert(code);
}
} }
void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) { void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) {
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "mnodeInt.h" #include "mnodeInt.h"
#include "mnodeTrans.h"
int32_t mnodeInitSync() { return 0; } int32_t mnodeInitSync() { return 0; }
void mnodeCleanUpSync() {} void mnodeCleanUpSync() {}
......
...@@ -14,9 +14,11 @@ ...@@ -14,9 +14,11 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "trnInt.h" #include "mnodeTrans.h"
#include "trpc.h"
#define SDB_TRANS_VER 1 #define SDB_TRANS_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8
SSdbRaw *trnActionEncode(STrans *pTrans) { SSdbRaw *trnActionEncode(STrans *pTrans) {
int32_t rawDataLen = 10 * sizeof(int32_t); int32_t rawDataLen = 10 * sizeof(int32_t);
...@@ -27,66 +29,111 @@ SSdbRaw *trnActionEncode(STrans *pTrans) { ...@@ -27,66 +29,111 @@ SSdbRaw *trnActionEncode(STrans *pTrans) {
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
for (int32_t index = 0; index < redoLogNum; ++index) { for (int32_t index = 0; index < redoLogNum; ++index) {
SSdbRaw *pRaw = taosArrayGet(pTrans->redoLogs, index); SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, index);
rawDataLen += sdbGetRawTotalSize(pRaw); rawDataLen += sdbGetRawTotalSize(pTmp);
} }
for (int32_t index = 0; index < undoLogNum; ++index) { for (int32_t index = 0; index < undoLogNum; ++index) {
SSdbRaw *pRaw = taosArrayGet(pTrans->undoLogs, index); SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, index);
rawDataLen += sdbGetRawTotalSize(pRaw); rawDataLen += sdbGetRawTotalSize(pTmp);
} }
for (int32_t index = 0; index < commitLogNum; ++index) { for (int32_t index = 0; index < commitLogNum; ++index) {
SSdbRaw *pRaw = taosArrayGet(pTrans->commitLogs, index); SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, index);
rawDataLen += sdbGetRawTotalSize(pRaw); rawDataLen += sdbGetRawTotalSize(pTmp);
} }
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen); SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen);
if (pRaw == NULL) return NULL; if (pRaw == NULL) {
mError("trn:%d, failed to alloc raw since %s", pTrans->id, terrstr());
return NULL;
}
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pData, dataPos, pTrans->id) SDB_SET_INT32(pRaw, dataPos, pTrans->id)
SDB_SET_INT8(pData, dataPos, pTrans->stage) SDB_SET_INT8(pRaw, dataPos, pTrans->stage)
SDB_SET_INT8(pData, dataPos, pTrans->policy) SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
SDB_SET_INT32(pData, dataPos, redoLogNum) SDB_SET_INT32(pRaw, dataPos, redoLogNum)
SDB_SET_INT32(pData, dataPos, undoLogNum) SDB_SET_INT32(pRaw, dataPos, undoLogNum)
SDB_SET_INT32(pData, dataPos, commitLogNum) SDB_SET_INT32(pRaw, dataPos, commitLogNum)
SDB_SET_INT32(pData, dataPos, redoActionNum) SDB_SET_INT32(pRaw, dataPos, redoActionNum)
SDB_SET_INT32(pData, dataPos, undoActionNum) SDB_SET_INT32(pRaw, dataPos, undoActionNum)
SDB_SET_DATALEN(pRaw, dataPos);
for (int32_t index = 0; index < redoLogNum; ++index) {
SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, index);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
}
for (int32_t index = 0; index < undoLogNum; ++index) {
SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, index);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
}
for (int32_t index = 0; index < commitLogNum; ++index) {
SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, index);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
}
mDebug("trn:%d, is encoded as raw:%p, len:%d", pTrans->id, pRaw, dataPos);
return pRaw; return pRaw;
} }
STrans *trnActionDecode(SSdbRaw *pRaw) { SSdbRow *trnActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr());
return NULL;
}
if (sver != SDB_TRANS_VER) { if (sver != SDB_TRANS_VER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr());
return NULL;
}
SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
STrans *pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) {
mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
return NULL; return NULL;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(STrans)); pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
STrans *pTrans = sdbGetRowObj(pRow); pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
if (pTrans == NULL) return NULL; pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
int32_t redoLogNum = 0; if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
int32_t undoLogNum = 0; pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
int32_t commitLogNum = 0; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t redoActionNum = 0; mDebug("trn:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
int32_t undoActionNum = 0; return NULL;
}
int32_t redoLogNum = 0;
int32_t undoLogNum = 0;
int32_t commitLogNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id) SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->stage) SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->stage)
SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->policy) SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum) SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum) SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum) SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum) SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum) SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)
int32_t code = 0;
for (int32_t index = 0; index < redoLogNum; ++index) { for (int32_t index = 0; index < redoLogNum; ++index) {
int32_t dataLen = 0; int32_t dataLen = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
...@@ -95,64 +142,89 @@ STrans *trnActionDecode(SSdbRaw *pRaw) { ...@@ -95,64 +142,89 @@ STrans *trnActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
void *ret = taosArrayPush(pTrans->redoLogs, pData); void *ret = taosArrayPush(pTrans->redoLogs, pData);
if (ret == NULL) { if (ret == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
break; break;
} }
} }
// if (code != 0) { if (code != 0) {
// trnDrop(pTrans); terrno = code;
// terrno = code; mError("trn:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
// return NULL; trnDrop(pTrans);
// } return NULL;
}
return pTrans; mDebug("trn:%d, is parsed from raw:%p", pTrans->id, pRaw);
return pRow;
} }
int32_t trnActionInsert(STrans *pTrans) { static int32_t trnActionInsert(STrans *pTrans) {
SArray *pArray = pTrans->redoLogs; SArray *pArray = pTrans->redoLogs;
int32_t arraySize = taosArrayGetSize(pArray); int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t index = 0; index < arraySize; ++index) { for (int32_t index = 0; index < arraySize; ++index) {
SSdbRaw *pRaw = taosArrayGetP(pArray, index); SSdbRaw *pRaw = taosArrayGet(pArray, index);
int32_t code = sdbWrite(pRaw); int32_t code = sdbWrite(pRaw);
if (code != 0) { if (code != 0) {
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
return code; return code;
} }
} }
mDebug("trn:%d, write to sdb", pTrans->id);
return 0; return 0;
} }
int32_t trnActionDelete(STrans *pTrans) { static int32_t trnActionDelete(STrans *pTrans) {
SArray *pArray = pTrans->redoLogs; SArray *pArray = pTrans->redoLogs;
int32_t arraySize = taosArrayGetSize(pArray); int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t index = 0; index < arraySize; ++index) { for (int32_t index = 0; index < arraySize; ++index) {
SSdbRaw *pRaw = taosArrayGetP(pArray, index); SSdbRaw *pRaw = taosArrayGet(pArray, index);
int32_t code = sdbWrite(pRaw); int32_t code = sdbWrite(pRaw);
if (code != 0) { if (code != 0) {
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
return code; return code;
} }
} }
mDebug("trn:%d, delete from sdb", pTrans->id);
return 0; return 0;
} }
int32_t trnActionUpdate(STrans *pSrcTrans, STrans *pDstTrans) { return 0; } static int32_t trnActionUpdate(STrans *pTrans, STrans *pDstTrans) {
assert(true);
SArray *pArray = pTrans->redoLogs;
int32_t arraySize = taosArrayGetSize(pArray);
int32_t trnGenerateTransId() { return 1; } for (int32_t index = 0; index < arraySize; ++index) {
SSdbRaw *pRaw = taosArrayGet(pArray, index);
int32_t code = sdbWrite(pRaw);
if (code != 0) {
mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
return code;
}
}
STrans *trnCreate(ETrnPolicy policy) { pTrans->stage = pDstTrans->stage;
mDebug("trn:%d, update in sdb", pTrans->id);
return 0;
}
static int32_t trnGenerateTransId() { return 1; }
STrans *trnCreate(ETrnPolicy policy, void *rpcHandle) {
STrans *pTrans = calloc(1, sizeof(STrans)); STrans *pTrans = calloc(1, sizeof(STrans));
if (pTrans == NULL) { if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr());
return NULL; return NULL;
} }
pTrans->id = trnGenerateTransId(); pTrans->id = trnGenerateTransId();
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->rpcHandle = rpcHandle;
pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
...@@ -162,15 +234,17 @@ STrans *trnCreate(ETrnPolicy policy) { ...@@ -162,15 +234,17 @@ STrans *trnCreate(ETrnPolicy policy) {
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) { pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr());
return NULL; return NULL;
} }
mDebug("trn:%d, is created, %p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
static void trnDropArray(SArray *pArray) { static void trnDropArray(SArray *pArray) {
for (int32_t index = 0; index < pArray->size; ++index) { for (int32_t index = 0; index < pArray->size; ++index) {
SSdbRaw *pRaw = taosArrayGetP(pArray, index); SSdbRaw *pRaw = taosArrayGet(pArray, index);
tfree(pRaw); tfree(pRaw);
} }
...@@ -183,10 +257,15 @@ void trnDrop(STrans *pTrans) { ...@@ -183,10 +257,15 @@ void trnDrop(STrans *pTrans) {
trnDropArray(pTrans->commitLogs); trnDropArray(pTrans->commitLogs);
trnDropArray(pTrans->redoActions); trnDropArray(pTrans->redoActions);
trnDropArray(pTrans->undoActions); trnDropArray(pTrans->undoActions);
mDebug("trn:%d, is dropped, %p", pTrans->id, pTrans);
tfree(pTrans); tfree(pTrans);
} }
void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcHandle; } void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) {
pTrans->rpcHandle = rpcHandle;
mTrace("trn:%d, set rpc handle:%p", pTrans->id, rpcHandle);
}
static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) { static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
if (pArray == NULL || pRaw == NULL) { if (pArray == NULL || pRaw == NULL) {
...@@ -194,7 +273,7 @@ static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) { ...@@ -194,7 +273,7 @@ static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
return -1; return -1;
} }
void *ptr = taosArrayPush(pArray, &pRaw); void *ptr = taosArrayPush(pArray, pRaw);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
...@@ -203,21 +282,37 @@ static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) { ...@@ -203,21 +282,37 @@ static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
return 0; return 0;
} }
int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->redoLogs, pRaw); } int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = trnAppendArray(pTrans->redoLogs, pRaw);
mTrace("trn:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code);
return code;
}
int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->undoLogs, pRaw); } int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = trnAppendArray(pTrans->undoLogs, pRaw);
mTrace("trn:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code);
return code;
}
int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->commitLogs, pRaw); } int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw) {
int32_t code = trnAppendArray(pTrans->commitLogs, pRaw);
mTrace("trn:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code);
return code;
}
int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
return trnAppendArray(pTrans->redoActions, pMsg); int32_t code = trnAppendArray(pTrans->redoActions, pMsg);
mTrace("trn:%d, msg:%p append to redo actions", pTrans->id, pMsg);
return code;
} }
int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
return trnAppendArray(pTrans->undoActions, pMsg); int32_t code = trnAppendArray(pTrans->undoActions, pMsg);
mTrace("trn:%d, msg:%p append to undo actions", pTrans->id, pMsg);
return code;
} }
int32_t trnInit() { int32_t mnodeInitTrans() {
SSdbTable table = {.sdbType = SDB_TRANS, SSdbTable table = {.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32, .keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)trnActionEncode, .encodeFp = (SdbEncodeFp)trnActionEncode,
...@@ -227,7 +322,181 @@ int32_t trnInit() { ...@@ -227,7 +322,181 @@ int32_t trnInit() {
.deleteFp = (SdbDeleteFp)trnActionDelete}; .deleteFp = (SdbDeleteFp)trnActionDelete};
sdbSetTable(table); sdbSetTable(table);
mInfo("trn module is initialized");
return 0; return 0;
} }
void trnCleanup() {} void mnodeCleanupTrans() { mInfo("trn module is cleaned up"); }
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
if (syncfp == NULL) return -1;
SSdbRaw *pRaw = trnActionEncode(pTrans);
if (pRaw == NULL) {
mError("trn:%d, failed to decode trans since %s", pTrans->id, terrstr());
return -1;
}
sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
if (sdbWrite(pRaw) != 0) {
mError("trn:%d, failed to write trans since %s", pTrans->id, terrstr());
return -1;
}
if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) {
mError("trn:%d, failed to sync trans since %s", pTrans->id, terrstr());
return -1;
}
return 0;
}
static void trnSendRpcRsp(void *rpcHandle, int32_t code) {
if (rpcHandle != NULL) {
SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno};
rpcSendResponse(&rspMsg);
}
}
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
if (code != 0) {
trnSendRpcRsp(pData, terrno);
return 0;
}
if (sdbWrite(pData) != 0) {
code = terrno;
trnSendRpcRsp(pData, code);
terrno = code;
return -1;
}
return 0;
}
static int32_t trnExecuteArray(SArray *pArray) {
for (int32_t index = 0; index < pArray->size; ++index) {
SSdbRaw *pRaw = taosArrayGetP(pArray, index);
if (sdbWrite(pRaw) != 0) {
return -1;
}
}
return 0;
}
static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); }
static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); }
static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); }
static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); }
static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); }
static int32_t trnPerformPrepareStage(STrans *pTrans) {
if (trnExecuteRedoLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
static int32_t trnPerformExecuteStage(STrans *pTrans) {
int32_t code = trnExecuteRedoActions(pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_COMMIT;
return 0;
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return -1;
} else {
if (pTrans->policy == TRN_POLICY_RETRY) {
pTrans->stage = TRN_STAGE_RETRY;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
}
return 0;
}
}
static int32_t trnPerformCommitStage(STrans *pTrans) {
if (trnExecuteCommitLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
static int32_t trnPerformRollbackStage(STrans *pTrans) {
if (trnExecuteCommitLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
static int32_t trnPerformRetryStage(STrans *pTrans) {
if (trnExecuteCommitLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
int32_t trnExecute(int32_t tranId) {
int32_t code = 0;
STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
if (pTrans == NULL) {
return -1;
}
if (pTrans->stage == TRN_STAGE_PREPARE) {
if (trnPerformPrepareStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_EXECUTE) {
if (trnPerformExecuteStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_COMMIT) {
if (trnPerformCommitStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_ROLLBACK) {
if (trnPerformRollbackStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_RETRY) {
if (trnPerformRetryStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
sdbRelease(pTrans);
return 0;
}
\ No newline at end of file
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
#include "tkey.h" #include "tkey.h"
#include "mnodeTrans.h"
#define SDB_USER_VER 1 #define SDB_USER_VER 1
...@@ -142,12 +143,12 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM ...@@ -142,12 +143,12 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
userObj.updateTime = userObj.createdTime; userObj.updateTime = userObj.createdTime;
userObj.rootAuth = 0; userObj.rootAuth = 0;
STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK); STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle);
SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj);
if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) {
mError("failed to append redo log since %s", terrstr());
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
...@@ -155,6 +156,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM ...@@ -155,6 +156,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj);
if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) { if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) {
mError("failed to append undo log since %s", terrstr());
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
...@@ -162,6 +164,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM ...@@ -162,6 +164,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj); SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj);
if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) { if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) {
mError("failed to append commit log since %s", terrstr());
trnDrop(pTrans); trnDrop(pTrans);
return -1; return -1;
} }
...@@ -228,6 +231,8 @@ int32_t mnodeInitUser() { ...@@ -228,6 +231,8 @@ int32_t mnodeInitUser() {
.deleteFp = (SdbDeleteFp)mnodeUserActionDelete}; .deleteFp = (SdbDeleteFp)mnodeUserActionDelete};
sdbSetTable(table); sdbSetTable(table);
mnodeSetMsgFp(TSDB_MSG_TYPE_CREATE_USER, mnodeProcessCreateUserMsg);
return 0; return 0;
} }
......
...@@ -45,7 +45,7 @@ static int32_t sdbCreateDir() { ...@@ -45,7 +45,7 @@ static int32_t sdbCreateDir() {
static int32_t sdbRunDeployFp() { static int32_t sdbRunDeployFp() {
mDebug("start to run deploy functions"); mDebug("start to run deploy functions");
for (int32_t i = SDB_START; i < SDB_MAX; ++i) { for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
SdbDeployFp fp = tsSdb.deployFps[i]; SdbDeployFp fp = tsSdb.deployFps[i];
if (fp == NULL) continue; if (fp == NULL) continue;
if ((*fp)() != 0) { if ((*fp)() != 0) {
...@@ -54,6 +54,7 @@ static int32_t sdbRunDeployFp() { ...@@ -54,6 +54,7 @@ static int32_t sdbRunDeployFp() {
} }
} }
mDebug("end of run deploy functions");
return 0; return 0;
} }
......
aux_source_directory(src MNODE_SRC)
add_library(transaction ${MNODE_SRC})
target_include_directories(
transaction
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mnode/transaction"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
transaction
PRIVATE os
PRIVATE common
PRIVATE util
PRIVATE sdb
PRIVATE transport
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TRANSACTION_INT_H_
#define _TD_TRANSACTION_INT_H_
#include "os.h"
#include "trn.h"
#include "tglobal.h"
#include "tarray.h"
#include "tlog.h"
#ifdef __cplusplus
extern "C" {
#endif
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define TRN_VER 1
#define TRN_DEFAULT_ARRAY_SIZE 8
typedef enum {
TRN_STAGE_PREPARE = 1,
TRN_STAGE_EXECUTE = 2,
TRN_STAGE_COMMIT = 3,
TRN_STAGE_ROLLBACK = 4,
TRN_STAGE_RETRY = 5
} ETrnStage;
typedef struct STrans {
int32_t id;
int8_t stage;
int8_t policy;
void *rpcHandle;
SArray *redoLogs;
SArray *undoLogs;
SArray *commitLogs;
SArray *redoActions;
SArray *undoActions;
} STrans;
SSdbRaw *trnActionEncode(STrans *pTrans);
STrans *trnActionDecode(SSdbRaw *pRaw);
int32_t trnActionInsert(STrans *pTrans);
int32_t trnActionDelete(STrans *pTrans);
int32_t trnActionUpdate(STrans *pSrcTrans, STrans *pDstTrans);
int32_t trnGenerateTransId();
#ifdef __cplusplus
}
#endif
#endif /*_TD_TRANSACTION_INT_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "trnInt.h"
#include "trpc.h"
int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
if (syncfp == NULL) return -1;
SSdbRaw *pRaw = trnActionEncode(pTrans);
if (pRaw == NULL) {
mError("tranId:%d, failed to decode trans since %s", pTrans->id, terrstr());
return -1;
}
if (sdbWrite(pRaw) != 0) {
mError("tranId:%d, failed to write trans since %s", pTrans->id, terrstr());
return -1;
}
if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) {
mError("tranId:%d, failed to sync trans since %s", pTrans->id, terrstr());
return -1;
}
return 0;
}
static void trnSendRpcRsp(void *rpcHandle, int32_t code) {
if (rpcHandle != NULL) {
SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno};
rpcSendResponse(&rspMsg);
}
}
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
if (code != 0) {
trnSendRpcRsp(pData, terrno);
return 0;
}
if (sdbWrite(pData) != 0) {
code = terrno;
trnSendRpcRsp(pData, code);
terrno = code;
return -1;
}
return 0;
}
static int32_t trnExecuteArray(SArray *pArray) {
for (int32_t index = 0; index < pArray->size; ++index) {
SSdbRaw *pRaw = taosArrayGetP(pArray, index);
if (sdbWrite(pRaw) != 0) {
return -1;
}
}
return 0;
}
static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); }
static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); }
static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); }
static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); }
static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); }
static int32_t trnPerformPrepareStage(STrans *pTrans) {
if (trnExecuteRedoLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
static int32_t trnPerformExecuteStage(STrans *pTrans) {
int32_t code = trnExecuteRedoActions(pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_COMMIT;
return 0;
} else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return -1;
} else {
if (pTrans->policy == TRN_POLICY_RETRY) {
pTrans->stage = TRN_STAGE_RETRY;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
}
return 0;
}
}
static int32_t trnPerformCommitStage(STrans *pTrans) {
if (trnExecuteCommitLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
static int32_t trnPerformRollbackStage(STrans *pTrans) {
if (trnExecuteCommitLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
static int32_t trnPerformRetryStage(STrans *pTrans) {
if (trnExecuteCommitLogs(pTrans) == 0) {
pTrans->stage = TRN_STAGE_EXECUTE;
return 0;
} else {
pTrans->stage = TRN_STAGE_ROLLBACK;
return -1;
}
}
int32_t trnExecute(int32_t tranId) {
int32_t code = 0;
STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
if (pTrans == NULL) {
return -1;
}
if (pTrans->stage == TRN_STAGE_PREPARE) {
if (trnPerformPrepareStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_EXECUTE) {
if (trnPerformExecuteStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_COMMIT) {
if (trnPerformCommitStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_ROLLBACK) {
if (trnPerformRollbackStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
if (pTrans->stage == TRN_STAGE_RETRY) {
if (trnPerformRetryStage(pTrans) != 0) {
sdbRelease(pTrans);
return -1;
}
}
sdbRelease(pTrans);
return 0;
}
\ No newline at end of file
...@@ -505,14 +505,18 @@ void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) { ...@@ -505,14 +505,18 @@ void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
} }
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
#if 0
SRpcConn *pConn = (SRpcConn *)thandle; SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn->user[0] == 0) return -1; if (pConn->user[0] == 0) return -1;
pInfo->clientIp = pConn->peerIp; pInfo->clientIp = pConn->peerIp;
pInfo->clientPort = pConn->peerPort; pInfo->clientPort = pConn->peerPort;
// pInfo->serverIp = pConn->destIp; // pInfo->serverIp = pConn->destIp;
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
#else
strcpy(pInfo->user, "root");
#endif
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册