diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h
index 90c5ef0c4a71af2bf3dc18d0f31109bb11592ddf..cb514cef53307b510f50ace0e0eaa1f6a148513b 100644
--- a/include/dnode/mnode/sdb/sdb.h
+++ b/include/dnode/mnode/sdb/sdb.h
@@ -56,39 +56,39 @@ extern "C" {
dataPos += valLen; \
}
-#define SDB_SET_INT64(pData, dataPos, val) \
+#define SDB_SET_INT64(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt64(pRaw, dataPos, val) != 0) { \
- sdbFreeRaw(pRaw); \
+ sdbFreeRaw(pRaw); \
return NULL; \
- }; \
+ } \
dataPos += sizeof(int64_t); \
}
-#define SDB_SET_INT32(pData, dataPos, val) \
+#define SDB_SET_INT32(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt32(pRaw, dataPos, val) != 0) { \
- sdbFreeRaw(pRaw); \
+ sdbFreeRaw(pRaw); \
return NULL; \
- }; \
+ } \
dataPos += sizeof(int32_t); \
}
-#define SDB_SET_INT8(pData, dataPos, val) \
+#define SDB_SET_INT8(pRaw, dataPos, val) \
{ \
if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \
- sdbFreeRaw(pRaw); \
+ sdbFreeRaw(pRaw); \
return NULL; \
- }; \
+ } \
dataPos += sizeof(int8_t); \
}
#define SDB_SET_BINARY(pRaw, dataPos, val, valLen) \
{ \
if (sdbSetRawBinary(pRaw, dataPos, val, valLen) != 0) { \
- sdbFreeRaw(pRaw); \
+ sdbFreeRaw(pRaw); \
return NULL; \
- }; \
+ } \
dataPos += valLen; \
}
@@ -97,7 +97,7 @@ extern "C" {
if (sdbSetRawDataLen(pRaw, dataLen) != 0) { \
sdbFreeRaw(pRaw); \
return NULL; \
- }; \
+ } \
}
typedef struct SSdbRaw SSdbRaw;
diff --git a/source/dnode/mgmt/inc/dnodeInt.h b/source/dnode/mgmt/inc/dnodeInt.h
index 906455dce47dbb30de312ef32895f41682db5e54..a4226a47b838736b540ac9505c9de62eb55d55b6 100644
--- a/source/dnode/mgmt/inc/dnodeInt.h
+++ b/source/dnode/mgmt/inc/dnodeInt.h
@@ -41,7 +41,7 @@ int32_t dnodeInit();
void dnodeCleanup();
EDnStat dnodeGetRunStat();
-void dnodeSetRunStat();
+void dnodeSetRunStat(EDnStat stat);
void dnodeReportStartup(char *name, char *desc);
void dnodeReportStartupFinished(char *name, char *desc);
diff --git a/source/dnode/mgmt/src/dnodeInt.c b/source/dnode/mgmt/src/dnodeInt.c
index 2674e107fd31235f3c61038803877b2238a3033a..eee4bac0507f60a1a7e2f7aeccf4b00280e6cf53 100644
--- a/source/dnode/mgmt/src/dnodeInt.c
+++ b/source/dnode/mgmt/src/dnodeInt.c
@@ -33,7 +33,10 @@ static struct {
EDnStat dnodeGetRunStat() { return tsInt.runStat; }
-void dnodeSetRunStat(EDnStat stat) { tsInt.runStat = stat; }
+void dnodeSetRunStat(EDnStat stat) {
+ dDebug("runstat set to %d", stat);
+ tsInt.runStat = stat;
+}
void dnodeReportStartup(char *name, char *desc) {
SStartupMsg *pStartup = &tsInt.startup;
@@ -99,7 +102,6 @@ static int32_t dnodeInitDir() {
}
static int32_t dnodeInitMain() {
- tsInt.runStat = DN_RUN_STAT_STOPPED;
tscEmbedded = 1;
taosIgnSIGPIPE();
taosBlockSIGPIPE();
@@ -147,7 +149,9 @@ static void dnodeCleanupMain() {
int32_t dnodeInit() {
SSteps *steps = taosStepInit(10, dnodeReportStartup);
if (steps == NULL) return -1;
-
+#if 1
+ dnodeSetRunStat(DN_RUN_STAT_RUNNING);
+#endif
taosStepAdd(steps, "dnode-main", dnodeInitMain, dnodeCleanupMain);
taosStepAdd(steps, "dnode-rpc", rpcInit, rpcCleanup);
taosStepAdd(steps, "dnode-tfs", NULL, NULL);
diff --git a/source/dnode/mgmt/src/dnodeMnode.c b/source/dnode/mgmt/src/dnodeMnode.c
index 48cc1cb20defe6483045dd43efdaa167b76fa180..b6d54a28d2150488c08e8a779643b714022d2654 100644
--- a/source/dnode/mgmt/src/dnodeMnode.c
+++ b/source/dnode/mgmt/src/dnodeMnode.c
@@ -330,17 +330,22 @@ static void dnodeProcessMnodeApplyQueue(void *unused, SMnodeMsg *pMsg) { mnodePr
static void dnodeProcessMnodeSyncQueue(void *unused, SMnodeMsg *pMsg) { mnodeProcessMsg(pMsg, MN_MSG_TYPE_SYNC); }
static int32_t dnodeWriteMnodeMsgToQueue(taos_queue pQueue, SRpcMsg *pRpcMsg) {
- int32_t code = 0;
+ int32_t code = 0;
+ SMnodeMsg *pMsg = NULL;
if (pQueue == NULL) {
code = TSDB_CODE_DND_MSG_NOT_PROCESSED;
} else {
- SMnodeMsg *pMsg = mnodeInitMsg(pRpcMsg);
+ pMsg = mnodeInitMsg(pRpcMsg);
if (pMsg == NULL) {
code = terrno;
}
}
+ if (code == 0) {
+ code = taosWriteQitem(pQueue, pMsg);
+ }
+
if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
diff --git a/source/dnode/mgmt/src/dnodeTransport.c b/source/dnode/mgmt/src/dnodeTransport.c
index 475470b574cf836081fc96d74d3c75c083a24c1b..ed5650ace87ad7707f4264addca0fb09098bc8ed 100644
--- a/source/dnode/mgmt/src/dnodeTransport.c
+++ b/source/dnode/mgmt/src/dnodeTransport.c
@@ -314,6 +314,8 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c
}
static int32_t dnodeInitShellServer() {
+ dnodeInitMsgFp();
+
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) {
numOfThreads = 1;
@@ -336,6 +338,17 @@ static int32_t dnodeInitShellServer() {
return -1;
}
+#if 1
+ SRpcMsg rpcMsg = {0};
+ rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_USER;
+ rpcMsg.contLen = sizeof(SCreateUserMsg);
+ rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
+ SCreateUserMsg *pMsg = (SCreateUserMsg*)rpcMsg.pCont;
+ strcpy(pMsg->user, "u1");
+ strcpy(pMsg->pass, "up1");
+ dnodeProcessShellReq(&rpcMsg, NULL);
+
+#endif
dInfo("dnode shell rpc server is initialized");
return 0;
}
@@ -369,7 +382,11 @@ void dnodeCleanupTrans() {
dnodeCleanupClient();
}
-void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); }
+void dnodeSendMsgToDnode(SEpSet *epSet, SRpcMsg *rpcMsg) {
+ #if 0
+ rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL);
+ #endif
+ }
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) {
SEpSet epSet = {0};
diff --git a/source/dnode/mnode/CMakeLists.txt b/source/dnode/mnode/CMakeLists.txt
index 6de5f06476fc587b24724125e8f4ba72dc88e14f..45a5af75665943f77e16fccbb2c44c1b9a8f3424 100644
--- a/source/dnode/mnode/CMakeLists.txt
+++ b/source/dnode/mnode/CMakeLists.txt
@@ -1,3 +1,2 @@
add_subdirectory(impl)
add_subdirectory(sdb)
-add_subdirectory(transaction)
diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt
index 4c9d44b39e0a09c860c2ca8be99dd236aedcaf0f..49c9d5451355b4b9567a84e5ddccd6781b4b4569 100644
--- a/source/dnode/mnode/impl/CMakeLists.txt
+++ b/source/dnode/mnode/impl/CMakeLists.txt
@@ -8,7 +8,6 @@ target_include_directories(
target_link_libraries(
mnode
PRIVATE sdb
- PRIVATE transaction
PUBLIC transport
PUBLIC cjson
)
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/inc/mnodeDef.h b/source/dnode/mnode/impl/inc/mnodeDef.h
index b6449ecfe7dba18075ae0538040448c302596e2a..ccdba13006bb6dc46eb8f33f5d56709a93e653f7 100644
--- a/source/dnode/mnode/impl/inc/mnodeDef.h
+++ b/source/dnode/mnode/impl/inc/mnodeDef.h
@@ -76,6 +76,28 @@ typedef enum {
MN_AUTH_MAX
} EMnAuthOp;
+typedef enum {
+ TRN_STAGE_PREPARE = 1,
+ TRN_STAGE_EXECUTE = 2,
+ TRN_STAGE_COMMIT = 3,
+ TRN_STAGE_ROLLBACK = 4,
+ TRN_STAGE_RETRY = 5
+} ETrnStage;
+
+typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
+
+
+typedef struct STrans {
+ int32_t id;
+ ETrnStage stage;
+ ETrnPolicy policy;
+ void *rpcHandle;
+ SArray *redoLogs;
+ SArray *undoLogs;
+ SArray *commitLogs;
+ SArray *redoActions;
+ SArray *undoActions;
+} STrans;
typedef struct SClusterObj {
diff --git a/source/dnode/mnode/impl/inc/mnodeInt.h b/source/dnode/mnode/impl/inc/mnodeInt.h
index 96803ba4a56c5b42b7b8ef7c4ee6356e49276171..373be6aa84523cea295ea3397d4ac9a2da800534 100644
--- a/source/dnode/mnode/impl/inc/mnodeInt.h
+++ b/source/dnode/mnode/impl/inc/mnodeInt.h
@@ -18,13 +18,23 @@
#include "mnodeDef.h"
#include "sdb.h"
-#include "trn.h"
+#include "tstep.h"
#ifdef __cplusplus
extern "C" {
#endif
-typedef void (*MnodeRpcFp)(SMnodeMsg *pMsg);
+typedef int32_t (*MnodeRpcFp)(SMnodeMsg *pMsg);
+
+typedef struct SMnode {
+ int32_t dnodeId;
+ int64_t clusterId;
+ tmr_h timer;
+ SSteps *pInitSteps;
+ SSteps *pStartSteps;
+ SMnodePara para;
+ MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
+} SMnode;
tmr_h mnodeGetTimer();
int32_t mnodeGetDnodeId();
diff --git a/include/dnode/mnode/transaction/trn.h b/source/dnode/mnode/impl/inc/mnodeTrans.h
similarity index 77%
rename from include/dnode/mnode/transaction/trn.h
rename to source/dnode/mnode/impl/inc/mnodeTrans.h
index 8ba043de1269938e0f651d7d0743a3f5bf4e576a..2abe101dfdad4a04bf18f7bf853308ee8c5dcd84 100644
--- a/include/dnode/mnode/transaction/trn.h
+++ b/source/dnode/mnode/impl/inc/mnodeTrans.h
@@ -13,25 +13,20 @@
* along with this program. If not, see .
*/
-#ifndef _TD_TRANSACTION_H_
-#define _TD_TRANSACTION_H_
+#ifndef _TD_TRANSACTION_INT_H_
+#define _TD_TRANSACTION_INT_H_
-#include "sdb.h"
-#include "taosmsg.h"
+#include "mnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
-typedef struct STrans STrans;
-typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
+int32_t mnodeInitTrans();
+void mnodeCleanupTrans();
-int32_t trnInit();
-void trnCleanup();
-
-STrans *trnCreate(ETrnPolicy);
+STrans *trnCreate(ETrnPolicy policy, void *rpcHandle);
void trnDrop(STrans *pTrans);
-void trnSetRpcHandle(STrans *pTrans, void *rpcHandle);
int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw);
int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw);
int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw);
@@ -42,8 +37,11 @@ int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)
int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code);
int32_t trnExecute(int32_t tranId);
+SSdbRaw *trnActionEncode(STrans *pTrans);
+SSdbRow *trnActionDecode(SSdbRaw *pRaw);
+
#ifdef __cplusplus
}
#endif
-#endif /*_TD_TRANSACTION_H_*/
+#endif /*_TD_TRANSACTION_INT_H_*/
diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c
index f52e60dbad94fe81e2a4eebfb06a8a192dac5088..ae909917de1fd7fd722c3a04bf43cf42e6b522e0 100644
--- a/source/dnode/mnode/impl/src/mnode.c
+++ b/source/dnode/mnode/impl/src/mnode.c
@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
-#include "tstep.h"
#include "tqueue.h"
#include "mnodeAcct.h"
#include "mnodeAuth.h"
@@ -34,16 +33,9 @@
#include "mnodeTelem.h"
#include "mnodeUser.h"
#include "mnodeVgroup.h"
+#include "mnodeTrans.h"
-static struct {
- int32_t dnodeId;
- int64_t clusterId;
- tmr_h timer;
- SSteps *pInitSteps;
- SSteps *pStartSteps;
- SMnodePara para;
- MnodeRpcFp msgFp[TSDB_MSG_TYPE_MAX];
-} tsMint;
+SMnode tsMint = {0};
int32_t mnodeGetDnodeId() { return tsMint.para.dnodeId; }
@@ -116,7 +108,7 @@ static int32_t mnodeAllocInitSteps() {
struct SSteps *steps = taosStepInit(16, NULL);
if (steps == NULL) return -1;
- if (taosStepAdd(steps, "mnode-trans", trnInit, trnCleanup) != 0) return -1;
+ if (taosStepAdd(steps, "mnode-trans", mnodeInitTrans, mnodeCleanupTrans) != 0) return -1;
if (taosStepAdd(steps, "mnode-cluster", mnodeInitCluster, mnodeCleanupCluster) != 0) return -1;
if (taosStepAdd(steps, "mnode-dnode", mnodeInitDnode, mnodeCleanupDnode) != 0) return -1;
if (taosStepAdd(steps, "mnode-mnode", mnodeInitMnode, mnodeCleanupMnode) != 0) return -1;
@@ -224,10 +216,14 @@ void mnodeCleanupMsg(SMnodeMsg *pMsg) {
static void mnodeProcessRpcMsg(SMnodeMsg *pMsg) {
int32_t msgType = pMsg->rpcMsg.msgType;
- if (tsMint.msgFp[msgType] == NULL) {
+ MnodeRpcFp fp = tsMint.msgFp[msgType];
+ if (fp == NULL) {
}
- (*tsMint.msgFp[msgType])(pMsg);
+ int32_t code = (fp)(pMsg);
+ if (code != 0) {
+ assert(code);
+ }
}
void mnodeSetMsgFp(int32_t msgType, MnodeRpcFp fp) {
diff --git a/source/dnode/mnode/impl/src/mnodeSync.c b/source/dnode/mnode/impl/src/mnodeSync.c
index fd3479317268dc864174c4a4a0177adbeff9e7e1..6e4084ffa6646e2a2b94b36db57e309e7bebd4f5 100644
--- a/source/dnode/mnode/impl/src/mnodeSync.c
+++ b/source/dnode/mnode/impl/src/mnodeSync.c
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnodeInt.h"
+#include "mnodeTrans.h"
int32_t mnodeInitSync() { return 0; }
void mnodeCleanUpSync() {}
diff --git a/source/dnode/mnode/impl/src/mnodeTrans.c b/source/dnode/mnode/impl/src/mnodeTrans.c
new file mode 100644
index 0000000000000000000000000000000000000000..4cd6bf7bdbf30c586b9ffd8d5c14c97645dc0436
--- /dev/null
+++ b/source/dnode/mnode/impl/src/mnodeTrans.c
@@ -0,0 +1,502 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "mnodeTrans.h"
+#include "trpc.h"
+
+#define SDB_TRANS_VER 1
+#define TRN_DEFAULT_ARRAY_SIZE 8
+
+SSdbRaw *trnActionEncode(STrans *pTrans) {
+ int32_t rawDataLen = 10 * sizeof(int32_t);
+ int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
+ int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
+ int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
+ int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
+ int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
+
+ for (int32_t index = 0; index < redoLogNum; ++index) {
+ SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, index);
+ rawDataLen += sdbGetRawTotalSize(pTmp);
+ }
+
+ for (int32_t index = 0; index < undoLogNum; ++index) {
+ SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, index);
+ rawDataLen += sdbGetRawTotalSize(pTmp);
+ }
+
+ for (int32_t index = 0; index < commitLogNum; ++index) {
+ SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, index);
+ rawDataLen += sdbGetRawTotalSize(pTmp);
+ }
+
+ SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen);
+ if (pRaw == NULL) {
+ mError("trn:%d, failed to alloc raw since %s", pTrans->id, terrstr());
+ return NULL;
+ }
+
+ int32_t dataPos = 0;
+ SDB_SET_INT32(pRaw, dataPos, pTrans->id)
+ SDB_SET_INT8(pRaw, dataPos, pTrans->stage)
+ SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
+ SDB_SET_INT32(pRaw, dataPos, redoLogNum)
+ SDB_SET_INT32(pRaw, dataPos, undoLogNum)
+ SDB_SET_INT32(pRaw, dataPos, commitLogNum)
+ SDB_SET_INT32(pRaw, dataPos, redoActionNum)
+ SDB_SET_INT32(pRaw, dataPos, undoActionNum)
+
+ for (int32_t index = 0; index < redoLogNum; ++index) {
+ SSdbRaw *pTmp = taosArrayGet(pTrans->redoLogs, index);
+ int32_t len = sdbGetRawTotalSize(pTmp);
+ SDB_SET_INT32(pRaw, dataPos, len)
+ SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
+ }
+
+ for (int32_t index = 0; index < undoLogNum; ++index) {
+ SSdbRaw *pTmp = taosArrayGet(pTrans->undoLogs, index);
+ int32_t len = sdbGetRawTotalSize(pTmp);
+ SDB_SET_INT32(pRaw, dataPos, len)
+ SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
+ }
+
+ for (int32_t index = 0; index < commitLogNum; ++index) {
+ SSdbRaw *pTmp = taosArrayGet(pTrans->commitLogs, index);
+ int32_t len = sdbGetRawTotalSize(pTmp);
+ SDB_SET_INT32(pRaw, dataPos, len)
+ SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len)
+ }
+
+ mDebug("trn:%d, is encoded as raw:%p, len:%d", pTrans->id, pRaw, dataPos);
+ return pRaw;
+}
+
+SSdbRow *trnActionDecode(SSdbRaw *pRaw) {
+ int8_t sver = 0;
+ if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
+ mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr());
+ return NULL;
+ }
+
+ if (sver != SDB_TRANS_VER) {
+ terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
+ mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr());
+ return NULL;
+ }
+
+ SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
+ STrans *pTrans = sdbGetRowObj(pRow);
+ if (pTrans == NULL) {
+ mError("failed to alloc trans from raw:%p since %s", pRaw, terrstr());
+ return NULL;
+ }
+
+ pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+
+ if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
+ pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ mDebug("trn:%d, failed to create array while parsed from raw:%p", pTrans->id, pRaw);
+ return NULL;
+ }
+
+ int32_t redoLogNum = 0;
+ int32_t undoLogNum = 0;
+ int32_t commitLogNum = 0;
+ int32_t redoActionNum = 0;
+ int32_t undoActionNum = 0;
+
+ int32_t dataPos = 0;
+ SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
+ SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->stage)
+ SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
+ SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
+ SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
+ SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
+ SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
+ SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)
+
+ int32_t code = 0;
+ for (int32_t index = 0; index < redoLogNum; ++index) {
+ int32_t dataLen = 0;
+ SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
+
+ char *pData = malloc(dataLen);
+ SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
+ void *ret = taosArrayPush(pTrans->redoLogs, pData);
+ if (ret == NULL) {
+ code = TSDB_CODE_OUT_OF_MEMORY;
+ break;
+ }
+ }
+
+ if (code != 0) {
+ terrno = code;
+ mError("trn:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
+ trnDrop(pTrans);
+ return NULL;
+ }
+
+ mDebug("trn:%d, is parsed from raw:%p", pTrans->id, pRaw);
+ return pRow;
+}
+
+static int32_t trnActionInsert(STrans *pTrans) {
+ SArray *pArray = pTrans->redoLogs;
+ int32_t arraySize = taosArrayGetSize(pArray);
+
+ for (int32_t index = 0; index < arraySize; ++index) {
+ SSdbRaw *pRaw = taosArrayGet(pArray, index);
+ int32_t code = sdbWrite(pRaw);
+ if (code != 0) {
+ mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
+ return code;
+ }
+ }
+
+ mDebug("trn:%d, write to sdb", pTrans->id);
+ return 0;
+}
+
+static int32_t trnActionDelete(STrans *pTrans) {
+ SArray *pArray = pTrans->redoLogs;
+ int32_t arraySize = taosArrayGetSize(pArray);
+
+ for (int32_t index = 0; index < arraySize; ++index) {
+ SSdbRaw *pRaw = taosArrayGet(pArray, index);
+ int32_t code = sdbWrite(pRaw);
+ if (code != 0) {
+ mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
+ return code;
+ }
+ }
+
+ mDebug("trn:%d, delete from sdb", pTrans->id);
+ return 0;
+}
+
+static int32_t trnActionUpdate(STrans *pTrans, STrans *pDstTrans) {
+ assert(true);
+ SArray *pArray = pTrans->redoLogs;
+ int32_t arraySize = taosArrayGetSize(pArray);
+
+ for (int32_t index = 0; index < arraySize; ++index) {
+ SSdbRaw *pRaw = taosArrayGet(pArray, index);
+ int32_t code = sdbWrite(pRaw);
+ if (code != 0) {
+ mError("trn:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr());
+ return code;
+ }
+ }
+
+ pTrans->stage = pDstTrans->stage;
+ mDebug("trn:%d, update in sdb", pTrans->id);
+ return 0;
+}
+
+static int32_t trnGenerateTransId() { return 1; }
+
+STrans *trnCreate(ETrnPolicy policy, void *rpcHandle) {
+ STrans *pTrans = calloc(1, sizeof(STrans));
+ if (pTrans == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ mError("failed to create transaction since %s", terrstr());
+ return NULL;
+ }
+
+ pTrans->id = trnGenerateTransId();
+ pTrans->stage = TRN_STAGE_PREPARE;
+ pTrans->policy = policy;
+ pTrans->rpcHandle = rpcHandle;
+ pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+ pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
+
+ if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
+ pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ mError("failed to create transaction since %s", terrstr());
+ return NULL;
+ }
+
+ mDebug("trn:%d, is created, %p", pTrans->id, pTrans);
+ return pTrans;
+}
+
+static void trnDropArray(SArray *pArray) {
+ for (int32_t index = 0; index < pArray->size; ++index) {
+ SSdbRaw *pRaw = taosArrayGet(pArray, index);
+ tfree(pRaw);
+ }
+
+ taosArrayDestroy(pArray);
+}
+
+void trnDrop(STrans *pTrans) {
+ trnDropArray(pTrans->redoLogs);
+ trnDropArray(pTrans->undoLogs);
+ trnDropArray(pTrans->commitLogs);
+ trnDropArray(pTrans->redoActions);
+ trnDropArray(pTrans->undoActions);
+
+ mDebug("trn:%d, is dropped, %p", pTrans->id, pTrans);
+ tfree(pTrans);
+}
+
+void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) {
+ pTrans->rpcHandle = rpcHandle;
+ mTrace("trn:%d, set rpc handle:%p", pTrans->id, rpcHandle);
+}
+
+static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
+ if (pArray == NULL || pRaw == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ void *ptr = taosArrayPush(pArray, pRaw);
+ if (ptr == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ return 0;
+}
+
+int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw) {
+ int32_t code = trnAppendArray(pTrans->redoLogs, pRaw);
+ mTrace("trn:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code);
+ return code;
+}
+
+int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw) {
+ int32_t code = trnAppendArray(pTrans->undoLogs, pRaw);
+ mTrace("trn:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code);
+ return code;
+}
+
+int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw) {
+ int32_t code = trnAppendArray(pTrans->commitLogs, pRaw);
+ mTrace("trn:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code);
+ return code;
+}
+
+int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
+ int32_t code = trnAppendArray(pTrans->redoActions, pMsg);
+ mTrace("trn:%d, msg:%p append to redo actions", pTrans->id, pMsg);
+ return code;
+}
+
+int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
+ int32_t code = trnAppendArray(pTrans->undoActions, pMsg);
+ mTrace("trn:%d, msg:%p append to undo actions", pTrans->id, pMsg);
+ return code;
+}
+
+int32_t mnodeInitTrans() {
+ SSdbTable table = {.sdbType = SDB_TRANS,
+ .keyType = SDB_KEY_INT32,
+ .encodeFp = (SdbEncodeFp)trnActionEncode,
+ .decodeFp = (SdbDecodeFp)trnActionDecode,
+ .insertFp = (SdbInsertFp)trnActionInsert,
+ .updateFp = (SdbUpdateFp)trnActionUpdate,
+ .deleteFp = (SdbDeleteFp)trnActionDelete};
+ sdbSetTable(table);
+
+ mInfo("trn module is initialized");
+ return 0;
+}
+
+void mnodeCleanupTrans() { mInfo("trn module is cleaned up"); }
+
+
+int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
+ if (syncfp == NULL) return -1;
+
+ SSdbRaw *pRaw = trnActionEncode(pTrans);
+ if (pRaw == NULL) {
+ mError("trn:%d, failed to decode trans since %s", pTrans->id, terrstr());
+ return -1;
+ }
+ sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
+
+ if (sdbWrite(pRaw) != 0) {
+ mError("trn:%d, failed to write trans since %s", pTrans->id, terrstr());
+ return -1;
+ }
+
+ if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) {
+ mError("trn:%d, failed to sync trans since %s", pTrans->id, terrstr());
+ return -1;
+ }
+
+ return 0;
+}
+
+static void trnSendRpcRsp(void *rpcHandle, int32_t code) {
+ if (rpcHandle != NULL) {
+ SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno};
+ rpcSendResponse(&rspMsg);
+ }
+}
+
+int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
+ if (code != 0) {
+ trnSendRpcRsp(pData, terrno);
+ return 0;
+ }
+
+ if (sdbWrite(pData) != 0) {
+ code = terrno;
+ trnSendRpcRsp(pData, code);
+ terrno = code;
+ return -1;
+ }
+
+ return 0;
+}
+
+static int32_t trnExecuteArray(SArray *pArray) {
+ for (int32_t index = 0; index < pArray->size; ++index) {
+ SSdbRaw *pRaw = taosArrayGetP(pArray, index);
+ if (sdbWrite(pRaw) != 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); }
+
+static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); }
+
+static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); }
+
+static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); }
+
+static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); }
+
+static int32_t trnPerformPrepareStage(STrans *pTrans) {
+ if (trnExecuteRedoLogs(pTrans) == 0) {
+ pTrans->stage = TRN_STAGE_EXECUTE;
+ return 0;
+ } else {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ return -1;
+ }
+}
+
+static int32_t trnPerformExecuteStage(STrans *pTrans) {
+ int32_t code = trnExecuteRedoActions(pTrans);
+
+ if (code == 0) {
+ pTrans->stage = TRN_STAGE_COMMIT;
+ return 0;
+ } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
+ return -1;
+ } else {
+ if (pTrans->policy == TRN_POLICY_RETRY) {
+ pTrans->stage = TRN_STAGE_RETRY;
+ } else {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ }
+ return 0;
+ }
+}
+
+static int32_t trnPerformCommitStage(STrans *pTrans) {
+ if (trnExecuteCommitLogs(pTrans) == 0) {
+ pTrans->stage = TRN_STAGE_EXECUTE;
+ return 0;
+ } else {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ return -1;
+ }
+}
+
+static int32_t trnPerformRollbackStage(STrans *pTrans) {
+ if (trnExecuteCommitLogs(pTrans) == 0) {
+ pTrans->stage = TRN_STAGE_EXECUTE;
+ return 0;
+ } else {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ return -1;
+ }
+}
+
+static int32_t trnPerformRetryStage(STrans *pTrans) {
+ if (trnExecuteCommitLogs(pTrans) == 0) {
+ pTrans->stage = TRN_STAGE_EXECUTE;
+ return 0;
+ } else {
+ pTrans->stage = TRN_STAGE_ROLLBACK;
+ return -1;
+ }
+}
+
+int32_t trnExecute(int32_t tranId) {
+ int32_t code = 0;
+
+ STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
+ if (pTrans == NULL) {
+ return -1;
+ }
+
+ if (pTrans->stage == TRN_STAGE_PREPARE) {
+ if (trnPerformPrepareStage(pTrans) != 0) {
+ sdbRelease(pTrans);
+ return -1;
+ }
+ }
+
+ if (pTrans->stage == TRN_STAGE_EXECUTE) {
+ if (trnPerformExecuteStage(pTrans) != 0) {
+ sdbRelease(pTrans);
+ return -1;
+ }
+ }
+
+ if (pTrans->stage == TRN_STAGE_COMMIT) {
+ if (trnPerformCommitStage(pTrans) != 0) {
+ sdbRelease(pTrans);
+ return -1;
+ }
+ }
+
+ if (pTrans->stage == TRN_STAGE_ROLLBACK) {
+ if (trnPerformRollbackStage(pTrans) != 0) {
+ sdbRelease(pTrans);
+ return -1;
+ }
+ }
+
+ if (pTrans->stage == TRN_STAGE_RETRY) {
+ if (trnPerformRetryStage(pTrans) != 0) {
+ sdbRelease(pTrans);
+ return -1;
+ }
+ }
+
+ sdbRelease(pTrans);
+ return 0;
+}
\ No newline at end of file
diff --git a/source/dnode/mnode/impl/src/mnodeUser.c b/source/dnode/mnode/impl/src/mnodeUser.c
index 63aa171238631116b62240a22a88a115e37e12aa..b8e5706484759f047944f84602eb5ee4b39512cc 100644
--- a/source/dnode/mnode/impl/src/mnodeUser.c
+++ b/source/dnode/mnode/impl/src/mnodeUser.c
@@ -18,6 +18,7 @@
#include "os.h"
#include "tglobal.h"
#include "tkey.h"
+#include "mnodeTrans.h"
#define SDB_USER_VER 1
@@ -142,12 +143,12 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
userObj.updateTime = userObj.createdTime;
userObj.rootAuth = 0;
- STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK);
+ STrans *pTrans = trnCreate(TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) return -1;
- trnSetRpcHandle(pTrans, pMsg->rpcMsg.handle);
SSdbRaw *pRedoRaw = mnodeUserActionEncode(&userObj);
if (pRedoRaw == NULL || trnAppendRedoLog(pTrans, pRedoRaw) != 0) {
+ mError("failed to append redo log since %s", terrstr());
trnDrop(pTrans);
return -1;
}
@@ -155,6 +156,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
SSdbRaw *pUndoRaw = mnodeUserActionEncode(&userObj);
if (pUndoRaw == NULL || trnAppendUndoLog(pTrans, pUndoRaw) != 0) {
+ mError("failed to append undo log since %s", terrstr());
trnDrop(pTrans);
return -1;
}
@@ -162,6 +164,7 @@ static int32_t mnodeCreateUser(char *acct, char *user, char *pass, SMnodeMsg *pM
SSdbRaw *pCommitRaw = mnodeUserActionEncode(&userObj);
if (pCommitRaw == NULL || trnAppendCommitLog(pTrans, pCommitRaw) != 0) {
+ mError("failed to append commit log since %s", terrstr());
trnDrop(pTrans);
return -1;
}
@@ -228,6 +231,8 @@ int32_t mnodeInitUser() {
.deleteFp = (SdbDeleteFp)mnodeUserActionDelete};
sdbSetTable(table);
+ mnodeSetMsgFp(TSDB_MSG_TYPE_CREATE_USER, mnodeProcessCreateUserMsg);
+
return 0;
}
diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c
index 69c82c77f1fe44bbfb1d3c04738d837e0a670197..169b2cf8092b8a657956e4410c4b78c30dbb0075 100644
--- a/source/dnode/mnode/sdb/src/sdbFile.c
+++ b/source/dnode/mnode/sdb/src/sdbFile.c
@@ -45,7 +45,7 @@ static int32_t sdbCreateDir() {
static int32_t sdbRunDeployFp() {
mDebug("start to run deploy functions");
- for (int32_t i = SDB_START; i < SDB_MAX; ++i) {
+ for (int32_t i = SDB_MAX - 1; i > SDB_START; --i) {
SdbDeployFp fp = tsSdb.deployFps[i];
if (fp == NULL) continue;
if ((*fp)() != 0) {
@@ -54,6 +54,7 @@ static int32_t sdbRunDeployFp() {
}
}
+ mDebug("end of run deploy functions");
return 0;
}
diff --git a/source/dnode/mnode/transaction/CMakeLists.txt b/source/dnode/mnode/transaction/CMakeLists.txt
deleted file mode 100644
index d35a8c9b3ff045a4e36af3e261576c3d29ba3b57..0000000000000000000000000000000000000000
--- a/source/dnode/mnode/transaction/CMakeLists.txt
+++ /dev/null
@@ -1,15 +0,0 @@
-aux_source_directory(src MNODE_SRC)
-add_library(transaction ${MNODE_SRC})
-target_include_directories(
- transaction
- PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mnode/transaction"
- private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
-)
-target_link_libraries(
- transaction
- PRIVATE os
- PRIVATE common
- PRIVATE util
- PRIVATE sdb
- PRIVATE transport
-)
diff --git a/source/dnode/mnode/transaction/inc/trnInt.h b/source/dnode/mnode/transaction/inc/trnInt.h
deleted file mode 100644
index 771217dcc053635294376e633330dec8cdf0d3fd..0000000000000000000000000000000000000000
--- a/source/dnode/mnode/transaction/inc/trnInt.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_TRANSACTION_INT_H_
-#define _TD_TRANSACTION_INT_H_
-
-#include "os.h"
-#include "trn.h"
-#include "tglobal.h"
-#include "tarray.h"
-#include "tlog.h"
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", 255, __VA_ARGS__); }}
-#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", 255, __VA_ARGS__); }}
-#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", 255, __VA_ARGS__); }}
-#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", 255, __VA_ARGS__); }}
-#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
-#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
-
-#define TRN_VER 1
-#define TRN_DEFAULT_ARRAY_SIZE 8
-
-typedef enum {
- TRN_STAGE_PREPARE = 1,
- TRN_STAGE_EXECUTE = 2,
- TRN_STAGE_COMMIT = 3,
- TRN_STAGE_ROLLBACK = 4,
- TRN_STAGE_RETRY = 5
-} ETrnStage;
-
-typedef struct STrans {
- int32_t id;
- int8_t stage;
- int8_t policy;
- void *rpcHandle;
- SArray *redoLogs;
- SArray *undoLogs;
- SArray *commitLogs;
- SArray *redoActions;
- SArray *undoActions;
-} STrans;
-
-SSdbRaw *trnActionEncode(STrans *pTrans);
-STrans *trnActionDecode(SSdbRaw *pRaw);
-int32_t trnActionInsert(STrans *pTrans);
-int32_t trnActionDelete(STrans *pTrans);
-int32_t trnActionUpdate(STrans *pSrcTrans, STrans *pDstTrans);
-int32_t trnGenerateTransId();
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_TRANSACTION_INT_H_*/
diff --git a/source/dnode/mnode/transaction/src/trn.c b/source/dnode/mnode/transaction/src/trn.c
deleted file mode 100644
index 0d7c1a061efe7b305e80fa91afc3d1c348e209b4..0000000000000000000000000000000000000000
--- a/source/dnode/mnode/transaction/src/trn.c
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "trnInt.h"
-
-#define SDB_TRANS_VER 1
-
-SSdbRaw *trnActionEncode(STrans *pTrans) {
- int32_t rawDataLen = 10 * sizeof(int32_t);
- int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
- int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
- int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
- int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
- int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
-
- for (int32_t index = 0; index < redoLogNum; ++index) {
- SSdbRaw *pRaw = taosArrayGet(pTrans->redoLogs, index);
- rawDataLen += sdbGetRawTotalSize(pRaw);
- }
-
- for (int32_t index = 0; index < undoLogNum; ++index) {
- SSdbRaw *pRaw = taosArrayGet(pTrans->undoLogs, index);
- rawDataLen += sdbGetRawTotalSize(pRaw);
- }
-
- for (int32_t index = 0; index < commitLogNum; ++index) {
- SSdbRaw *pRaw = taosArrayGet(pTrans->commitLogs, index);
- rawDataLen += sdbGetRawTotalSize(pRaw);
- }
-
- SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen);
- if (pRaw == NULL) return NULL;
-
- int32_t dataPos = 0;
- SDB_SET_INT32(pData, dataPos, pTrans->id)
- SDB_SET_INT8(pData, dataPos, pTrans->stage)
- SDB_SET_INT8(pData, dataPos, pTrans->policy)
- SDB_SET_INT32(pData, dataPos, redoLogNum)
- SDB_SET_INT32(pData, dataPos, undoLogNum)
- SDB_SET_INT32(pData, dataPos, commitLogNum)
- SDB_SET_INT32(pData, dataPos, redoActionNum)
- SDB_SET_INT32(pData, dataPos, undoActionNum)
- SDB_SET_DATALEN(pRaw, dataPos);
-
- return pRaw;
-}
-
-STrans *trnActionDecode(SSdbRaw *pRaw) {
- int8_t sver = 0;
- if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
-
- if (sver != SDB_TRANS_VER) {
- terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
- return NULL;
- }
-
- SSdbRow *pRow = sdbAllocRow(sizeof(STrans));
- STrans *pTrans = sdbGetRowObj(pRow);
- if (pTrans == NULL) return NULL;
-
- int32_t redoLogNum = 0;
- int32_t undoLogNum = 0;
- int32_t commitLogNum = 0;
- int32_t redoActionNum = 0;
- int32_t undoActionNum = 0;
-
- int32_t dataPos = 0;
- SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
- SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->stage)
- SDB_GET_INT8(pRaw, pRow, dataPos, &pTrans->policy)
- SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
- SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
- SDB_GET_INT32(pRaw, pRow, dataPos, &commitLogNum)
- SDB_GET_INT32(pRaw, pRow, dataPos, &redoActionNum)
- SDB_GET_INT32(pRaw, pRow, dataPos, &undoActionNum)
-
- for (int32_t index = 0; index < redoLogNum; ++index) {
- int32_t dataLen = 0;
- SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen)
-
- char *pData = malloc(dataLen);
- SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen);
- void *ret = taosArrayPush(pTrans->redoLogs, pData);
- if (ret == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- break;
- }
- }
-
- // if (code != 0) {
- // trnDrop(pTrans);
- // terrno = code;
- // return NULL;
- // }
-
- return pTrans;
-}
-
-int32_t trnActionInsert(STrans *pTrans) {
- SArray *pArray = pTrans->redoLogs;
- int32_t arraySize = taosArrayGetSize(pArray);
-
- for (int32_t index = 0; index < arraySize; ++index) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, index);
- int32_t code = sdbWrite(pRaw);
- if (code != 0) {
- return code;
- }
- }
-
- return 0;
-}
-
-int32_t trnActionDelete(STrans *pTrans) {
- SArray *pArray = pTrans->redoLogs;
- int32_t arraySize = taosArrayGetSize(pArray);
-
- for (int32_t index = 0; index < arraySize; ++index) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, index);
- int32_t code = sdbWrite(pRaw);
- if (code != 0) {
- return code;
- }
- }
-
- return 0;
-}
-
-int32_t trnActionUpdate(STrans *pSrcTrans, STrans *pDstTrans) { return 0; }
-
-int32_t trnGenerateTransId() { return 1; }
-
-STrans *trnCreate(ETrnPolicy policy) {
- STrans *pTrans = calloc(1, sizeof(STrans));
- if (pTrans == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return NULL;
- }
-
- pTrans->id = trnGenerateTransId();
- pTrans->stage = TRN_STAGE_PREPARE;
- pTrans->policy = policy;
- pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
- pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *));
-
- if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
- pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return NULL;
- }
-
- return pTrans;
-}
-
-static void trnDropArray(SArray *pArray) {
- for (int32_t index = 0; index < pArray->size; ++index) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, index);
- tfree(pRaw);
- }
-
- taosArrayDestroy(pArray);
-}
-
-void trnDrop(STrans *pTrans) {
- trnDropArray(pTrans->redoLogs);
- trnDropArray(pTrans->undoLogs);
- trnDropArray(pTrans->commitLogs);
- trnDropArray(pTrans->redoActions);
- trnDropArray(pTrans->undoActions);
- tfree(pTrans);
-}
-
-void trnSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcHandle; }
-
-static int32_t trnAppendArray(SArray *pArray, SSdbRaw *pRaw) {
- if (pArray == NULL || pRaw == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return -1;
- }
-
- void *ptr = taosArrayPush(pArray, &pRaw);
- if (ptr == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return -1;
- }
-
- return 0;
-}
-
-int32_t trnAppendRedoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->redoLogs, pRaw); }
-
-int32_t trnAppendUndoLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->undoLogs, pRaw); }
-
-int32_t trnAppendCommitLog(STrans *pTrans, SSdbRaw *pRaw) { return trnAppendArray(pTrans->commitLogs, pRaw); }
-
-int32_t trnAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
- return trnAppendArray(pTrans->redoActions, pMsg);
-}
-
-int32_t trnAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) {
- return trnAppendArray(pTrans->undoActions, pMsg);
-}
-
-int32_t trnInit() {
- SSdbTable table = {.sdbType = SDB_TRANS,
- .keyType = SDB_KEY_INT32,
- .encodeFp = (SdbEncodeFp)trnActionEncode,
- .decodeFp = (SdbDecodeFp)trnActionDecode,
- .insertFp = (SdbInsertFp)trnActionInsert,
- .updateFp = (SdbUpdateFp)trnActionUpdate,
- .deleteFp = (SdbDeleteFp)trnActionDelete};
- sdbSetTable(table);
-
- return 0;
-}
-
-void trnCleanup() {}
diff --git a/source/dnode/mnode/transaction/src/trnExec.c b/source/dnode/mnode/transaction/src/trnExec.c
deleted file mode 100644
index fc15c1622577a978ad14a99add51f4a0eef9b853..0000000000000000000000000000000000000000
--- a/source/dnode/mnode/transaction/src/trnExec.c
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#define _DEFAULT_SOURCE
-#include "trnInt.h"
-#include "trpc.h"
-
-int32_t trnPrepare(STrans *pTrans, int32_t (*syncfp)(SSdbRaw *pRaw, void *pData)) {
- if (syncfp == NULL) return -1;
-
- SSdbRaw *pRaw = trnActionEncode(pTrans);
- if (pRaw == NULL) {
- mError("tranId:%d, failed to decode trans since %s", pTrans->id, terrstr());
- return -1;
- }
-
- if (sdbWrite(pRaw) != 0) {
- mError("tranId:%d, failed to write trans since %s", pTrans->id, terrstr());
- return -1;
- }
-
- if ((*syncfp)(pRaw, pTrans->rpcHandle) != 0) {
- mError("tranId:%d, failed to sync trans since %s", pTrans->id, terrstr());
- return -1;
- }
-
- return 0;
-}
-
-static void trnSendRpcRsp(void *rpcHandle, int32_t code) {
- if (rpcHandle != NULL) {
- SRpcMsg rspMsg = {.handle = rpcHandle, .code = terrno};
- rpcSendResponse(&rspMsg);
- }
-}
-
-int32_t trnApply(SSdbRaw *pRaw, void *pData, int32_t code) {
- if (code != 0) {
- trnSendRpcRsp(pData, terrno);
- return 0;
- }
-
- if (sdbWrite(pData) != 0) {
- code = terrno;
- trnSendRpcRsp(pData, code);
- terrno = code;
- return -1;
- }
-
- return 0;
-}
-
-static int32_t trnExecuteArray(SArray *pArray) {
- for (int32_t index = 0; index < pArray->size; ++index) {
- SSdbRaw *pRaw = taosArrayGetP(pArray, index);
- if (sdbWrite(pRaw) != 0) {
- return -1;
- }
- }
-
- return 0;
-}
-
-static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->redoLogs); }
-
-static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->undoLogs); }
-
-static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->commitLogs); }
-
-static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->redoActions); }
-
-static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->undoActions); }
-
-static int32_t trnPerformPrepareStage(STrans *pTrans) {
- if (trnExecuteRedoLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
- }
-}
-
-static int32_t trnPerformExecuteStage(STrans *pTrans) {
- int32_t code = trnExecuteRedoActions(pTrans);
-
- if (code == 0) {
- pTrans->stage = TRN_STAGE_COMMIT;
- return 0;
- } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
- return -1;
- } else {
- if (pTrans->policy == TRN_POLICY_RETRY) {
- pTrans->stage = TRN_STAGE_RETRY;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- }
- return 0;
- }
-}
-
-static int32_t trnPerformCommitStage(STrans *pTrans) {
- if (trnExecuteCommitLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
- }
-}
-
-static int32_t trnPerformRollbackStage(STrans *pTrans) {
- if (trnExecuteCommitLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
- }
-}
-
-static int32_t trnPerformRetryStage(STrans *pTrans) {
- if (trnExecuteCommitLogs(pTrans) == 0) {
- pTrans->stage = TRN_STAGE_EXECUTE;
- return 0;
- } else {
- pTrans->stage = TRN_STAGE_ROLLBACK;
- return -1;
- }
-}
-
-int32_t trnExecute(int32_t tranId) {
- int32_t code = 0;
-
- STrans *pTrans = sdbAcquire(SDB_TRANS, &tranId);
- if (pTrans == NULL) {
- return -1;
- }
-
- if (pTrans->stage == TRN_STAGE_PREPARE) {
- if (trnPerformPrepareStage(pTrans) != 0) {
- sdbRelease(pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_EXECUTE) {
- if (trnPerformExecuteStage(pTrans) != 0) {
- sdbRelease(pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_COMMIT) {
- if (trnPerformCommitStage(pTrans) != 0) {
- sdbRelease(pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_ROLLBACK) {
- if (trnPerformRollbackStage(pTrans) != 0) {
- sdbRelease(pTrans);
- return -1;
- }
- }
-
- if (pTrans->stage == TRN_STAGE_RETRY) {
- if (trnPerformRetryStage(pTrans) != 0) {
- sdbRelease(pTrans);
- return -1;
- }
- }
-
- sdbRelease(pTrans);
- return 0;
-}
\ No newline at end of file
diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c
index 1aafc880bef7679061d879300258c4f7e6efa120..934a8dd6ab432a36c7a27ec0bd9f6b83e4dfe2af 100644
--- a/source/libs/transport/src/rpcMain.c
+++ b/source/libs/transport/src/rpcMain.c
@@ -505,14 +505,18 @@ void rpcSendRedirectRsp(void *thandle, const SEpSet *pEpSet) {
}
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
+#if 0
SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn->user[0] == 0) return -1;
pInfo->clientIp = pConn->peerIp;
pInfo->clientPort = pConn->peerPort;
// pInfo->serverIp = pConn->destIp;
-
+
tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user));
+#else
+ strcpy(pInfo->user, "root");
+#endif
return 0;
}