diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h
index 455898585aaec2935d72aab0cdf6dfab6a0aac48..e8e931daa50292f333b3c56cff0983ed09bb3638 100644
--- a/include/common/tmsgdef.h
+++ b/include/common/tmsgdef.h
@@ -158,6 +158,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
+ TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
// Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG)
diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h
index 28c470a4437d46cdaf213d857e51c6e67108d1d4..f2c8c916c8b9704f69a8a0d6caaf214c2b34e7fd 100644
--- a/include/dnode/mnode/mnode.h
+++ b/include/dnode/mnode/mnode.h
@@ -89,6 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
* @return int32_t 0 for success, -1 for failure.
*/
int32_t mndProcessMsg(SRpcMsg *pMsg);
+int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
/**
* @brief Generate machine code
diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h
index 2abe0e5c737c8dd52c92cc0e34a052f44155e298..94d41a7416679f496e2324e033ef667e262f3b1c 100644
--- a/include/dnode/mnode/sdb/sdb.h
+++ b/include/dnode/mnode/sdb/sdb.h
@@ -304,13 +304,16 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
/**
- * @brief Update the version of sdb
+ * @brief Update the index of sdb
*
* @param pSdb The sdb object.
- * @param val The update value of the version.
- * @return int32_t The current version of sdb
+ * @param index The update value of the apply index.
+ * @return int32_t The current index of sdb
*/
-int64_t sdbUpdateVer(SSdb *pSdb, int32_t val);
+void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
+int64_t sdbGetApplyIndex(SSdb *pSdb);
+void sdbSetApplyTerm(SSdb *pSdb, int64_t term);
+int64_t sdbGetApplyTerm(SSdb *pSdb);
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw);
@@ -339,6 +342,7 @@ typedef struct SSdb {
char *tmpDir;
int64_t lastCommitVer;
int64_t curVer;
+ int64_t curTerm;
int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX];
diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h
index 9b6593e4b5bb4c8aad5018b3b92f73c7e1d52794..2bf678fa487a86aa666f39e25f6c9cef76cb4bba 100644
--- a/include/libs/sync/sync.h
+++ b/include/libs/sync/sync.h
@@ -78,6 +78,8 @@ typedef struct SFsmCbMeta {
int32_t code;
ESyncState state;
uint64_t seqNum;
+ SyncTerm term;
+ SyncTerm currentTerm;
} SFsmCbMeta;
typedef struct SSyncFSM {
@@ -85,6 +87,7 @@ typedef struct SSyncFSM {
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
+ void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM;
@@ -117,7 +120,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore;
-
typedef struct SSyncInfo {
SyncGroupId vgId;
SSyncCfg syncCfg;
@@ -144,6 +146,7 @@ int32_t syncGetVgId(int64_t rid);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
+bool syncIsRestoreFinish(int64_t rid);
#ifdef __cplusplus
}
diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h
index fcb00ddf019d09866cba28d7865d800d970bf1f4..754a203471fb3810adbd5e17d66b8e0a7d6d8902 100644
--- a/include/libs/transport/trpc.h
+++ b/include/libs/transport/trpc.h
@@ -28,7 +28,7 @@ extern "C" {
#define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)
-extern int tsRpcHeadSize;
+extern int32_t tsRpcHeadSize;
typedef struct {
uint32_t clientIp;
@@ -69,10 +69,10 @@ typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port
char * label; // for debug purpose
- int numOfThreads; // number of threads to handle connections
- int sessions; // number of sessions allowed
+ int32_t numOfThreads; // number of threads to handle connections
+ int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
- int idleTime; // milliseconds, 0 means idle timer is disabled
+ int32_t idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only
char *user; // user name
@@ -108,9 +108,9 @@ int32_t rpcInit();
void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *);
-void * rpcMallocCont(int contLen);
+void * rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont);
-void * rpcReallocCont(void *ptr, int contLen);
+void * rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
@@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock
// These functions will not be called in the child process
-void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
-void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
-int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
-void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
+void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
+void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
+int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
+void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
#ifdef __cplusplus
}
diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
index 75e83d65471fdebfba4fdbfa3083a2dc02f7fd22..030d4b309e3e0a4a70e706cd5606d495323d819d 100644
--- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
@@ -24,19 +24,22 @@ extern "C" {
#endif
typedef struct SMnodeMgmt {
- SDnodeData *pData;
- SMnode *pMnode;
- SMsgCb msgCb;
- const char *path;
- const char *name;
- SSingleWorker queryWorker;
- SSingleWorker readWorker;
- SSingleWorker writeWorker;
- SSingleWorker syncWorker;
- SSingleWorker monitorWorker;
- SReplica replicas[TSDB_MAX_REPLICA];
- int8_t replica;
- int8_t selfIndex;
+ SDnodeData *pData;
+ SMnode *pMnode;
+ SMsgCb msgCb;
+ const char *path;
+ const char *name;
+ SSingleWorker queryWorker;
+ SSingleWorker readWorker;
+ SSingleWorker writeWorker;
+ SSingleWorker syncWorker;
+ SSingleWorker monitorWorker;
+ SReplica replicas[TSDB_MAX_REPLICA];
+ int8_t replica;
+ int8_t selfIndex;
+ bool stopped;
+ int32_t refCount;
+ TdThreadRwlock lock;
} SMnodeMgmt;
// mmFile.c
@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
+int32_t mmAcquire(SMnodeMgmt *pMgmt);
+void mmRelease(SMnodeMgmt *pMgmt);
// mmHandle.c
SArray *mmGetMsgHandles();
diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
index 2ce42d7a5ff30c4cdb6d1da2a00c933cc2e882ac..a894a4962dddc632d583d1e4d5bc5a82fbf07f52 100644
--- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
@@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+ if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
+
code = 0;
_OVER:
diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c
index 4f7fd4a1c0c925093b3773e06b9dfba1718ce945..43113d05af5291295f9f27e7bc767a0617117ba9 100644
--- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c
@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode);
+ taosThreadRwlockDestroy(&pMgmt->lock);
pMgmt->pMnode = NULL;
}
@@ -122,6 +123,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1;
}
+ if (syncInit() != 0) {
+ dError("failed to init sync since %s", terrstr());
+ return -1;
+ }
+
SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
if (pMgmt == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@@ -137,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt;
+ taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) {
@@ -206,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc;
}
+
+int32_t mmAcquire(SMnodeMgmt *pMgmt) {
+ int32_t code = 0;
+
+ taosThreadRwlockRdlock(&pMgmt->lock);
+ if (pMgmt->stopped) {
+ code = -1;
+ } else {
+ atomic_add_fetch_32(&pMgmt->refCount, 1);
+ }
+ taosThreadRwlockUnlock(&pMgmt->lock);
+ return code;
+}
+
+void mmRelease(SMnodeMgmt *pMgmt) {
+ taosThreadRwlockRdlock(&pMgmt->lock);
+ atomic_sub_fetch_32(&pMgmt->refCount, 1);
+ taosThreadRwlockUnlock(&pMgmt->lock);
+}
\ No newline at end of file
diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
index 43ee7cd80a5aa1880092a4ca3abe4add2d42249c..7249edc7066eb29f2e930d972bede712f65515b8 100644
--- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
@@ -56,6 +56,12 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
+static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
+ SMnodeMgmt *pMgmt = pInfo->ahandle;
+ pMsg->info.node = pMgmt->pMnode;
+ mndProcessSyncMsg(pMsg);
+}
+
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg);
@@ -105,7 +111,17 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
- return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
+ int32_t code = -1;
+ if (mmAcquire(pMgmt) == 0) {
+ code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
+ mmRelease(pMgmt);
+ }
+
+ if (code != 0) {
+ rpcFreeCont(pMsg->pCont);
+ pMsg->pCont = NULL;
+ }
+ return code;
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
@@ -149,7 +165,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1,
.max = 1,
.name = "mnode-sync",
- .fp = (FItem)mmProcessQueue,
+ .fp = (FItem)mmProcessSyncQueue,
.param = pMgmt,
};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
@@ -174,6 +190,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
+ taosThreadRwlockWrlock(&pMgmt->lock);
+ pMgmt->stopped = 1;
+ taosThreadRwlockUnlock(&pMgmt->lock);
+ while (pMgmt->refCount > 0) taosMsleep(10);
+
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->readWorker);
diff --git a/source/dnode/mgmt/test/mnode/CMakeLists.txt b/source/dnode/mgmt/test/mnode/CMakeLists.txt
index e83f5dbbec9e84feac3738838e8c96e6dd3f3a3b..788cf53976185f5737c6571232e6c25add05e189 100644
--- a/source/dnode/mgmt/test/mnode/CMakeLists.txt
+++ b/source/dnode/mgmt/test/mnode/CMakeLists.txt
@@ -4,7 +4,7 @@ target_link_libraries(
dmnodeTest sut
)
-add_test(
- NAME dmnodeTest
- COMMAND dmnodeTest
-)
+#add_test(
+# NAME dmnodeTest
+# COMMAND dmnodeTest
+#)
diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h
index 5258fa9e023a49e3fdf4ea41b2785d3ed93a27a8..5a1653b937fee8ed4427aa1e4a40b459b110125c 100644
--- a/source/dnode/mnode/impl/inc/mndInt.h
+++ b/source/dnode/mnode/impl/inc/mndInt.h
@@ -19,6 +19,7 @@
#include "mndDef.h"
#include "sdb.h"
+#include "syncTools.h"
#include "tcache.h"
#include "tdatablock.h"
#include "tglobal.h"
@@ -31,12 +32,14 @@
extern "C" {
#endif
+// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
+// clang-format on
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
@@ -72,10 +75,11 @@ typedef struct {
} STelemMgmt;
typedef struct {
+ SWal *pWal;
int32_t errCode;
+ bool restored;
sem_t syncSem;
- SWal *pWal;
- SSyncNode *pSyncNode;
+ int64_t sync;
ESyncState state;
} SSyncMgmt;
diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h
index fe557cdeac0874dc815b5fe83b795a4b01bbfcec..356f215267fcfd76f5a851202c6290b9433796ee 100644
--- a/source/dnode/mnode/impl/inc/mndSync.h
+++ b/source/dnode/mnode/impl/inc/mndSync.h
@@ -26,6 +26,8 @@ int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
+void mndSyncStart(SMnode *pMnode);
+void mndSyncStop(SMnode *pMnode);
#ifdef __cplusplus
}
diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c
index 3dbe3241a78f617a99148f8c571189eea41e17b5..b77a2c20a6b60deaaaa77501c4a11d252ae2396d 100644
--- a/source/dnode/mnode/impl/src/mndSync.c
+++ b/source/dnode/mnode/impl/src/mndSync.c
@@ -17,178 +17,146 @@
#include "mndSync.h"
#include "mndTrans.h"
-static int32_t mndInitWal(SMnode *pMnode) {
- SSyncMgmt *pMgmt = &pMnode->syncMgmt;
+int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
- char path[PATH_MAX] = {0};
- snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
- SWalCfg cfg = {
- .vgId = 1,
- .fsyncPeriod = 0,
- .rollPeriod = -1,
- .segSize = -1,
- .retentionPeriod = -1,
- .retentionSize = -1,
- .level = TAOS_WAL_FSYNC,
- };
- pMgmt->pWal = walOpen(path, &cfg);
- if (pMgmt->pWal == NULL) return -1;
-
- return 0;
-}
+int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
-static void mndCloseWal(SMnode *pMnode) {
+void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
+ SMnode *pMnode = pFsm->data;
+ SSdb *pSdb = pMnode->pSdb;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
- if (pMgmt->pWal != NULL) {
- walClose(pMgmt->pWal);
- pMgmt->pWal = NULL;
+ SSdbRaw *pRaw = pMsg->pCont;
+
+ mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
+ sdbWriteWithoutFree(pSdb, pRaw);
+ sdbSetApplyIndex(pSdb, cbMeta.index);
+ sdbSetApplyTerm(pSdb, cbMeta.term);
+ if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
+ tsem_post(&pMgmt->syncSem);
}
}
-static int32_t mndRestoreWal(SMnode *pMnode) {
- SWal *pWal = pMnode->syncMgmt.pWal;
- SSdb *pSdb = pMnode->pSdb;
- int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
- int32_t code = -1;
-
- SWalReadHandle *pHandle = walOpenReadHandle(pWal);
- if (pHandle == NULL) return -1;
-
- int64_t first = walGetFirstVer(pWal);
- int64_t last = walGetLastVer(pWal);
- mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
-
- first = TMAX(lastSdbVer + 1, first);
- for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
- if (walReadWithHandle(pHandle, ver) < 0) {
- mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
- goto _OVER;
- }
-
- SWalHead *pHead = pHandle->pHead;
- int64_t sdbVer = sdbUpdateVer(pSdb, 0);
- if (sdbVer + 1 != ver) {
- terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
- mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
- goto _OVER;
- }
-
- mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
- if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
- mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
- goto _OVER;
- }
-
- sdbUpdateVer(pSdb, 1);
- mDebug("ver:%" PRId64 ", is restored", ver);
- }
-
- int64_t sdbVer = sdbUpdateVer(pSdb, 0);
- mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
+int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
+ SMnode *pMnode = pFsm->data;
+ pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
+ pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
+ return 0;
+}
+void mndRestoreFinish(struct SSyncFSM *pFsm) {
+ SMnode *pMnode = pFsm->data;
mndTransPullup(pMnode);
- sdbVer = sdbUpdateVer(pSdb, 0);
- mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer);
-
- if (sdbVer != lastSdbVer) {
- mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
- if (sdbWriteFile(pSdb) != 0) {
- goto _OVER;
- }
-
- if (walCommit(pWal, sdbVer) != 0) {
- goto _OVER;
- }
-
- if (walBeginSnapshot(pWal, sdbVer) < 0) {
- goto _OVER;
- }
-
- if (walEndSnapshot(pWal) < 0) {
- goto _OVER;
- }
- }
-
- code = 0;
+ pMnode->syncMgmt.restored = true;
+}
-_OVER:
- walCloseReadHandle(pHandle);
- return code;
+SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
+ SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
+ pFsm->data = pMnode;
+ pFsm->FpCommitCb = mndSyncCommitMsg;
+ pFsm->FpPreCommitCb = NULL;
+ pFsm->FpRollBackCb = NULL;
+ pFsm->FpGetSnapshot = mndSyncGetSnapshot;
+ pFsm->FpRestoreFinish = mndRestoreFinish;
+ pFsm->FpRestoreSnapshot = NULL;
+ return pFsm;
}
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
- tsem_init(&pMgmt->syncSem, 0, 0);
- if (mndInitWal(pMnode) < 0) {
+ char path[PATH_MAX + 20] = {0};
+ snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
+ SWalCfg cfg = {
+ .vgId = 1,
+ .fsyncPeriod = 0,
+ .rollPeriod = -1,
+ .segSize = -1,
+ .retentionPeriod = -1,
+ .retentionSize = -1,
+ .level = TAOS_WAL_FSYNC,
+ };
+
+ pMgmt->pWal = walOpen(path, &cfg);
+ if (pMgmt->pWal == NULL) {
mError("failed to open wal since %s", terrstr());
return -1;
}
- if (mndRestoreWal(pMnode) < 0) {
- mError("failed to restore wal since %s", terrstr());
- return -1;
+ SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
+ snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
+ syncInfo.pWal = pMgmt->pWal;
+ syncInfo.pFsm = mndSyncMakeFsm(pMnode);
+
+ SSyncCfg *pCfg = &syncInfo.syncCfg;
+ pCfg->replicaNum = pMnode->replica;
+ pCfg->myIndex = pMnode->selfIndex;
+ for (int32_t i = 0; i < pMnode->replica; ++i) {
+ SNodeInfo *pNode = &pCfg->nodeInfo[i];
+ tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
+ pNode->nodePort = pMnode->replicas[i].port;
}
- if (pMnode->selfId == 1) {
- pMgmt->state = TAOS_SYNC_STATE_LEADER;
+ tsem_init(&pMgmt->syncSem, 0, 0);
+ pMgmt->sync = syncOpen(&syncInfo);
+ if (pMgmt->sync <= 0) {
+ mError("failed to open sync since %s", terrstr());
+ return -1;
}
- pMgmt->pSyncNode = NULL;
+
+ mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
return 0;
}
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
- tsem_destroy(&pMgmt->syncSem);
- mndCloseWal(pMnode);
-}
-
-static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
- SMnode *pMnode = pData;
- SSyncMgmt *pMgmt = &pMnode->syncMgmt;
+ syncStop(pMgmt->sync);
+ mDebug("sync:%" PRId64 " is stopped", pMgmt->sync);
- pMgmt->errCode = 0;
- tsem_post(&pMgmt->syncSem);
+ tsem_destroy(&pMgmt->syncSem);
+ if (pMgmt->pWal != NULL) {
+ walClose(pMgmt->pWal);
+ }
- return 0;
+ memset(pMgmt, 0, sizeof(SSyncMgmt));
}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
- SWal *pWal = pMnode->syncMgmt.pWal;
- SSdb *pSdb = pMnode->pSdb;
-
- int64_t ver = sdbUpdateVer(pSdb, 1);
- if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
- sdbUpdateVer(pSdb, -1);
- mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
- return -1;
- }
-
- mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
- walCommit(pWal, ver);
- walFsync(pWal, true);
-
-#if 1
- return 0;
-#else
- if (pMnode->replica == 1) return 0;
-
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
- SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};
-
- bool isWeak = false;
- int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak);
+ SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
+ rsp.pCont = rpcMallocCont(rsp.contLen);
+ if (rsp.pCont == NULL) return -1;
+ memcpy(rsp.pCont, pRaw, rsp.contLen);
+
+ const bool isWeak = false;
+ int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
+ if (code == 0) {
+ tsem_wait(&pMgmt->syncSem);
+ } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
+ terrno = TSDB_CODE_APP_NOT_READY;
+ } else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
+ terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
+ } else {
+ terrno = TSDB_CODE_APP_ERROR;
+ }
if (code != 0) return code;
-
- tsem_wait(&pMgmt->syncSem);
return pMgmt->errCode;
-#endif
}
+void mndSyncStart(SMnode *pMnode) {
+ SSyncMgmt *pMgmt = &pMnode->syncMgmt;
+ syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
+ syncStart(pMgmt->sync);
+ mDebug("sync:%" PRId64 " is started", pMgmt->sync);
+}
+
+void mndSyncStop(SMnode *pMnode) {}
+
bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
- return pMgmt->state == TAOS_SYNC_STATE_LEADER;
+ pMgmt->state = syncGetMyRole(pMgmt->sync);
+
+ return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
}
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 2a500829e5039e87b072e7e624df3dca159b57c3..d685c8850028b65563da0ee824917611746835a5 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -682,13 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
mDebug("trans:%d, sync finished", pTrans->id);
-
- code = sdbWrite(pMnode->pSdb, pRaw);
- if (code != 0) {
- mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
- return -1;
- }
-
return 0;
}
@@ -1360,19 +1353,35 @@ _OVER:
return code;
}
+static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; }
+
void mndTransPullup(SMnode *pMnode) {
- STrans *pTrans = NULL;
- void *pIter = NULL;
+ SSdb *pSdb = pMnode->pSdb;
+ SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_TRANS), sizeof(int32_t));
+ if (pArray == NULL) return;
+ void *pIter = NULL;
while (1) {
+ STrans *pTrans = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) break;
+ taosArrayPush(pArray, &pTrans->id);
+ sdbRelease(pSdb, pTrans);
+ }
- mndTransExecute(pMnode, pTrans);
- sdbRelease(pMnode->pSdb, pTrans);
+ taosArraySort(pArray, (__compar_fn_t)mndCompareTransId);
+
+ for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
+ int32_t *pTransId = taosArrayGet(pArray, i);
+ STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
+ if (pTrans != NULL) {
+ mndTransExecute(pMnode, pTrans);
+ }
+ mndReleaseTrans(pMnode, pTrans);
}
sdbWriteFile(pMnode->pSdb);
+ taosArrayDestroy(pArray);
}
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c
index 23abd1503b7de1734f6a5e2238a1606d10b2c42f..5b8c1a3101cad0d6a9c4ee0468eaff547bfd174d 100644
--- a/source/dnode/mnode/impl/src/mnode.c
+++ b/source/dnode/mnode/impl/src/mnode.c
@@ -335,9 +335,107 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return 0;
}
-int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); }
+int32_t mndStart(SMnode *pMnode) {
+ mndSyncStart(pMnode);
+ return mndInitTimer(pMnode);
+}
+
+void mndStop(SMnode *pMnode) {
+ mndSyncStop(pMnode);
+ return mndCleanupTimer(pMnode);
+}
+
+int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
+ SMnode *pMnode = pMsg->info.node;
+ void *ahandle = pMsg->info.ahandle;
+ int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
+
+ if (syncEnvIsStart()) {
+ SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync);
+ assert(pSyncNode != NULL);
+
+ ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync);
+ SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync);
+
+ SMsgHead *pHead = pMsg->pCont;
+
+ char logBuf[512];
+ char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync);
+ snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
+ syncRpcMsgLog2(logBuf, pMsg);
+ taosMemoryFree(syncNodeStr);
+
+ SRpcMsg *pRpcMsg = pMsg;
+
+ if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
+ SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
+ syncTimeoutDestroy(pSyncMsg);
+
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
+ SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
+ syncPingDestroy(pSyncMsg);
+
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
+ SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
+ syncPingReplyDestroy(pSyncMsg);
-void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); }
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
+ SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
+ syncClientRequestDestroy(pSyncMsg);
+
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
+ SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
+ syncRequestVoteDestroy(pSyncMsg);
+
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
+ SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
+ syncRequestVoteReplyDestroy(pSyncMsg);
+
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
+ SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
+ syncAppendEntriesDestroy(pSyncMsg);
+
+ } else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
+ SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
+ assert(pSyncMsg != NULL);
+
+ ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
+ syncAppendEntriesReplyDestroy(pSyncMsg);
+
+ } else {
+ mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType);
+ ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
+ }
+
+ syncNodeRelease(pSyncNode);
+ } else {
+ mError("==mndProcessSyncMsg== error syncEnv stop");
+ ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
+ }
+
+ return ret;
+}
int32_t mndProcessMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp
index f67ca2fb91d1847ad1491802a0ee69052c90abed..df535c4456615b8b501236f2c7ad1684c2f4ac6f 100644
--- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp
+++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp
@@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 );
- ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
- ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
- ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
+ sdbSetApplyIndex(pSdb, -1);
+ ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
ASSERT_EQ(mnode.insertTimes, 2);
ASSERT_EQ(mnode.deleteTimes, 0);
@@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4);
- ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
- ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
- ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
ASSERT_EQ(mnode.insertTimes, 3);
ASSERT_EQ(mnode.deleteTimes, 0);
@@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) {
}
// write version
- ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
- ASSERT_EQ(sdbUpdateVer(pSdb, 1), 1);
+ sdbSetApplyIndex(pSdb, 0);
+ sdbSetApplyIndex(pSdb, 1);
+ ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
ASSERT_EQ(sdbWriteFile(pSdb), 0);
ASSERT_EQ(sdbWriteFile(pSdb), 0);
@@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5);
- ASSERT_EQ(sdbUpdateVer(pSdb, 0), 1);
+ ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
ASSERT_EQ(mnode.insertTimes, 4);
ASSERT_EQ(mnode.deleteTimes, 0);
diff --git a/source/dnode/mnode/impl/test/trans/CMakeLists.txt b/source/dnode/mnode/impl/test/trans/CMakeLists.txt
index 023c8caa627777233536b16c94e4ede71ccf6e6d..22ff85563fab6119a0d35b36afeb1cd7aa450996 100644
--- a/source/dnode/mnode/impl/test/trans/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/trans/CMakeLists.txt
@@ -31,7 +31,7 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
)
-# add_test(
-# NAME transTest2
-# COMMAND transTest2
-# )
+#add_test(
+# NAME transTest2
+# COMMAND transTest2
+#)
diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp
index ec972cf784ce8f43e40def54b09d94eb13844c47..b78f1c7021ef44313a2a6393ecc58294921f2a18 100644
--- a/source/dnode/mnode/impl/test/trans/trans2.cpp
+++ b/source/dnode/mnode/impl/test/trans/trans2.cpp
@@ -23,6 +23,11 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return -1;
}
+int32_t putToQueue(void *pMgmt, SRpcMsg *pMsg) {
+ terrno = TSDB_CODE_INVALID_PTR;
+ return -1;
+}
+
class MndTestTrans2 : public ::testing::Test {
protected:
static void InitLog() {
@@ -55,6 +60,9 @@ class MndTestTrans2 : public ::testing::Test {
msgCb.reportStartupFp = reportStartup;
msgCb.sendReqFp = sendReq;
msgCb.sendRspFp = sendRsp;
+ msgCb.queueFps[SYNC_QUEUE] = putToQueue;
+ msgCb.queueFps[WRITE_QUEUE] = putToQueue;
+ msgCb.queueFps[READ_QUEUE] = putToQueue;
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb);
@@ -77,6 +85,7 @@ class MndTestTrans2 : public ::testing::Test {
static void SetUpTestSuite() {
InitLog();
walInit();
+ syncInit();
InitMnode();
}
diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c
index 1f11a77e6c7575a8f602bb4720b0445b5c5c0372..7b90d8acb53083461220ac4cf6ab19c025bf2a72 100644
--- a/source/dnode/mnode/sdb/src/sdb.c
+++ b/source/dnode/mnode/sdb/src/sdb.c
@@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) {
char path[PATH_MAX + 100] = {0};
snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP);
pSdb->currDir = strdup(path);
- snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP);
- pSdb->syncDir = strdup(path);
snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP);
pSdb->tmpDir = strdup(path);
- if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) {
+ if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) {
sdbCleanup(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to init sdb since %s", terrstr());
@@ -55,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
}
pSdb->curVer = -1;
+ pSdb->curTerm = -1;
pSdb->lastCommitVer = -1;
pSdb->pMnode = pOption->pMnode;
mDebug("sdb init successfully");
@@ -149,12 +148,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return -1;
}
- if (taosMkDir(pSdb->syncDir) != 0) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
- return -1;
- }
-
if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
@@ -164,4 +157,10 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return 0;
}
-int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); }
\ No newline at end of file
+void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
+
+int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
+
+void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
+
+int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; }
diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c
index a391ea8d03c6e2119fb21e0d0178ee9096883d48..b000c208c87b0393616cf0fb1d4a0cdbc08782b7 100644
--- a/source/dnode/mnode/sdb/src/sdbFile.c
+++ b/source/dnode/mnode/sdb/src/sdbFile.c
@@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1;
}
+ ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
+ if (ret < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+ if (ret != sizeof(int64_t)) {
+ terrno = TSDB_CODE_FILE_CORRUPTED;
+ return -1;
+ }
+
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = 0;
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
@@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1;
}
+ if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = 0;
if (i < SDB_MAX) {
@@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
if (sdbReadFileHead(pSdb, pFile) != 0) {
mError("failed to read file:%s head since %s", file, terrstr());
pSdb->curVer = -1;
+ pSdb->curTerm = -1;
taosMemoryFree(pRaw);
taosCloseFile(&pFile);
return -1;
@@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
- mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
- pSdb->lastCommitVer);
+ mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
+ pSdb->curTerm, pSdb->lastCommitVer);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
@@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mError("failed to write file:%s since %s", curfile, tstrerror(code));
} else {
pSdb->lastCommitVer = pSdb->curVer;
- mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
+ mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
}
terrno = code;
diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c
index 8659c418070cdccf4dc9c3164d36f5548199f030..882ee912cde37414bc219efe75c113d0868c1810 100644
--- a/source/dnode/vnode/src/vnd/vnodeSync.c
+++ b/source/dnode/vnode/src/vnd/vnodeSync.c
@@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
-int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
+int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
+ int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
+ if (code != 0) {
+ rpcFreeCont(pMsg->pCont);
+ }
+ return code;
+}
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
@@ -141,5 +147,6 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
+ pFsm->FpRestoreFinish = NULL;
return pFsm;
}
\ No newline at end of file
diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h
index f65a31769420d6cf584d2079f1b147e510f3bdb6..b69c087b5fbf9a343ea51d8846a7c8e929c42265 100644
--- a/source/libs/sync/inc/syncIO.h
+++ b/source/libs/sync/inc/syncIO.h
@@ -36,8 +36,8 @@ typedef struct SSyncIO {
STaosQueue *pMsgQ;
STaosQset * pQset;
TdThread consumerTid;
- void *serverRpc;
- void *clientRpc;
+ void * serverRpc;
+ void * clientRpc;
SEpSet myAddr;
SMsgCb msgcb;
diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h
index 768e1c1cf1b55486dea6c98dae7e6df9ed2f891a..9246041b815e401f1c7638e5cba07160048a36f4 100644
--- a/source/libs/sync/inc/syncInt.h
+++ b/source/libs/sync/inc/syncInt.h
@@ -147,6 +147,11 @@ typedef struct SSyncNode {
// tools
SSyncRespMgr* pSyncRespMgr;
+ // restore state
+ bool restoreFinish;
+ //sem_t restoreSem;
+ SSnapshot* pSnapshot;
+
} SSyncNode;
// open/close --------------
diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c
index 1a5d418e7545122b48b15e1b83c4a2bef3d9a860..fa735e71c029e22d67e7b2681ff1fc7144527061 100644
--- a/source/libs/sync/src/syncAppendEntries.c
+++ b/source/libs/sync/src/syncAppendEntries.c
@@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
- // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
@@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
+ cbMeta.term = pEntry->term;
+ cbMeta.currentTerm = ths->pRaftStore->currentTerm;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
+
+ bool needExecute = true;
+ if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
+ needExecute = false;
+ }
+
+ if (needExecute) {
+ ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
+ }
}
// config change
@@ -349,6 +359,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
}
+ // restore finish
+ if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
+ if (ths->restoreFinish == false) {
+ if (ths->pFsm->FpRestoreFinish != NULL) {
+ ths->pFsm->FpRestoreFinish(ths->pFsm);
+ }
+ ths->restoreFinish = true;
+ sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
+
+ /*
+ tsem_post(&ths->restoreSem);
+ sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
+ */
+ }
+ }
+
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c
index 0f17cf267e8e8a54d6020f12b47bafb594c92434..18c6f8930ac73f2bdc5d9e3d860f8b2f8dec0188 100644
--- a/source/libs/sync/src/syncCommit.c
+++ b/source/libs/sync/src/syncCommit.c
@@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
- // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
@@ -110,7 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta.code = 0;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum;
- pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
+ cbMeta.term = pEntry->term;
+ cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
+
+ bool needExecute = true;
+ if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
+ needExecute = false;
+ }
+
+ if (needExecute) {
+ pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
+ }
}
// config change
@@ -127,6 +136,22 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
}
+ // restore finish
+ if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
+ if (pSyncNode->restoreFinish == false) {
+ if (pSyncNode->pFsm->FpRestoreFinish != NULL) {
+ pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
+ }
+ pSyncNode->restoreFinish = true;
+ sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
+
+ /*
+ tsem_post(&pSyncNode->restoreSem);
+ sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode);
+ */
+ }
+ }
+
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
@@ -162,4 +187,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
}
}
return false;
-}
\ No newline at end of file
+}
diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c
index d9ff60bbe22b573db34331e5aabbd04b06ff5616..a233603adffd229fb6b0b8891c22305358ef26bc 100644
--- a/source/libs/sync/src/syncMain.c
+++ b/source/libs/sync/src/syncMain.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include
#include "sync.h"
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
@@ -55,14 +54,17 @@ static void syncFreeNode(void* param);
// ---------------------------------
int32_t syncInit() {
- int32_t ret;
- tsNodeRefId = taosOpenRef(200, syncFreeNode);
- if (tsNodeRefId < 0) {
- sError("failed to init node ref");
- syncCleanUp();
- ret = -1;
- } else {
- ret = syncEnvStart();
+ int32_t ret = 0;
+
+ if (!syncEnvIsStart()) {
+ tsNodeRefId = taosOpenRef(200, syncFreeNode);
+ if (tsNodeRefId < 0) {
+ sError("failed to init node ref");
+ syncCleanUp();
+ ret = -1;
+ } else {
+ ret = syncEnvStart();
+ }
}
return ret;
@@ -155,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) {
return state;
}
+bool syncIsRestoreFinish(int64_t rid) {
+ SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
+ if (pSyncNode == NULL) {
+ return false;
+ }
+ assert(rid == pSyncNode->rid);
+ bool b = pSyncNode->restoreFinish;
+
+ taosReleaseRef(tsNodeRefId, pSyncNode->rid);
+ return b;
+}
+
const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid));
return s;
@@ -240,7 +254,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
return ret;
}
-void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
+void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
@@ -306,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
+ rpcFreeCont(pMsg->pCont);
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
+
assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
@@ -490,6 +506,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
assert(pSyncNode->pSyncRespMgr != NULL);
+ // restore state
+ pSyncNode->restoreFinish = false;
+ pSyncNode->pSnapshot = NULL;
+ if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
+ pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
+ pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
+ }
+ //tsem_init(&(pSyncNode->restoreSem), 0, 0);
+
// start in syncNodeStart
// start raft
// syncNodeBecomeFollower(pSyncNode);
@@ -509,6 +534,20 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// use this now
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
+
+ /*
+ sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
+ tsem_wait(&pSyncNode->restoreSem);
+ sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
+ */
+
+ /*
+ while (pSyncNode->restoreFinish != true) {
+ taosMsleep(10);
+ }
+ */
+
+ sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
return;
}
@@ -518,6 +557,19 @@ void syncNodeStart(SSyncNode* pSyncNode) {
int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
+
+ /*
+ sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
+ tsem_wait(&pSyncNode->restoreSem);
+ sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
+ */
+
+ /*
+ while (pSyncNode->restoreFinish != true) {
+ taosMsleep(10);
+ }
+ */
+ sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
@@ -554,6 +606,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pFsm);
}
+ if (pSyncNode->pSnapshot != NULL) {
+ taosMemoryFree(pSyncNode->pSnapshot);
+ }
+
+ //tsem_destroy(&pSyncNode->restoreSem);
+
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
}
diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp
index cff692239a756081cf35191cf0787be5bd878326..0850ef6343d2ce5b6719f7eb92eccc55cdafc41d 100644
--- a/source/libs/sync/test/syncConfigChangeTest.cpp
+++ b/source/libs/sync/test/syncConfigChangeTest.cpp
@@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0;
}
+void FpRestoreFinishCb(struct SSyncFSM* pFsm) {
+ sTrace("==callback== ==FpRestoreFinishCb==");
+}
+
SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb;
+ pFsm->FpRestoreFinish = FpRestoreFinishCb;
return pFsm;
}
diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp
index 62bda5b22ec8633f1cb6ba2ff2cfbe224ead8c94..8ccd69890708781dbfb5b4a3ae835acc5c17d15c 100644
--- a/source/libs/sync/test/syncSnapshotTest.cpp
+++ b/source/libs/sync/test/syncSnapshotTest.cpp
@@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) {
}
int main(int argc, char **argv) {
+ sprintf(tsTempDir, "%s", ".");
+
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h
index e71e19edceedc32fa196fbdba37fb1d38ac62525..30f799f39ec046b4819a35f9adaec06ff8f6b81f 100644
--- a/source/libs/transport/inc/transComm.h
+++ b/source/libs/transport/inc/transComm.h
@@ -94,7 +94,7 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
-#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit
+#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c
index 5627dbfbf54be3eeed4b4d132b19e2c6b9b1d030..9e71c87fa5289d2af6d71639c313d208fe6d9b37 100644
--- a/source/libs/transport/src/trans.c
+++ b/source/libs/transport/src/trans.c
@@ -17,7 +17,7 @@
#include "transComm.h"
-void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
+void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
@@ -77,37 +77,38 @@ void rpcClose(void* arg) {
taosMemoryFree(pRpc);
return;
}
-void* rpcMallocCont(int contLen) {
- int size = contLen + TRANS_MSG_OVERHEAD;
- char* start = (char*)taosMemoryCalloc(1, (size_t)size);
+void* rpcMallocCont(int32_t contLen) {
+ int32_t size = contLen + TRANS_MSG_OVERHEAD;
+ char* start = taosMemoryCalloc(1, size);
if (start == NULL) {
tError("failed to malloc msg, size:%d", size);
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
} else {
tTrace("malloc mem:%p size:%d", start, size);
}
+
return start + sizeof(STransMsgHead);
}
-void rpcFreeCont(void* cont) {
- // impl
- if (cont == NULL) {
- return;
- }
+void rpcFreeCont(void* cont) {
+ if (cont == NULL) return;
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
}
-void* rpcReallocCont(void* ptr, int contLen) {
- if (ptr == NULL) {
- return rpcMallocCont(contLen);
- }
- char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
- int sz = contLen + TRANS_MSG_OVERHEAD;
+
+void* rpcReallocCont(void* ptr, int32_t contLen) {
+ if (ptr == NULL) return rpcMallocCont(contLen);
+
+ char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
+ int32_t sz = contLen + TRANS_MSG_OVERHEAD;
st = taosMemoryRealloc(st, sz);
if (st == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
+
return st + TRANS_MSG_OVERHEAD;
}
@@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
assert(0);
}
-int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
-void rpcCancelRequest(int64_t rid) { return; }
+int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
+void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
transSendRequest(shandle, pEpSet, pMsg, NULL);
@@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
-void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
-int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
+void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
+int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c
index 2bc60f777ca99460afc4306d3fa8b13cb25e9003..1d3eba7cde9ed4fc11694f0936c5f69c9760192d 100644
--- a/tests/test/c/sdbDump.c
+++ b/tests/test/c/sdbDump.c
@@ -262,7 +262,7 @@ void dumpCluster(SSdb *pSdb, SJson *json) {
}
void dumpTrans(SSdb *pSdb, SJson *json) {
- void *pIter = NULL;
+ void *pIter = NULL;
SJson *items = tjsonCreateObject();
tjsonAddItemToObject(json, "transactions", items);
@@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
void dumpHeader(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(json, "sver", 1);
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer));
+ tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm));
SJson *maxIdsJson = tjsonCreateObject();
tjsonAddItemToObject(json, "maxIds", maxIdsJson);