提交 eec7056d 编写于 作者: X Xiaoyu Wang

Merge remote-tracking branch 'origin/3.0' into feature/3.0_wxy

...@@ -158,6 +158,7 @@ enum { ...@@ -158,6 +158,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
// Requests handled by VNODE // Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
...@@ -89,6 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); ...@@ -89,6 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
int32_t mndProcessMsg(SRpcMsg *pMsg); int32_t mndProcessMsg(SRpcMsg *pMsg);
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
/** /**
* @brief Generate machine code * @brief Generate machine code
......
...@@ -304,13 +304,16 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type); ...@@ -304,13 +304,16 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type); int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
/** /**
* @brief Update the version of sdb * @brief Update the index of sdb
* *
* @param pSdb The sdb object. * @param pSdb The sdb object.
* @param val The update value of the version. * @param index The update value of the apply index.
* @return int32_t The current version of sdb * @return int32_t The current index of sdb
*/ */
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val); void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
int64_t sdbGetApplyIndex(SSdb *pSdb);
void sdbSetApplyTerm(SSdb *pSdb, int64_t term);
int64_t sdbGetApplyTerm(SSdb *pSdb);
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw); void sdbFreeRaw(SSdbRaw *pRaw);
...@@ -339,6 +342,7 @@ typedef struct SSdb { ...@@ -339,6 +342,7 @@ typedef struct SSdb {
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t lastCommitVer;
int64_t curVer; int64_t curVer;
int64_t curTerm;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX]; int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX]; EKeyType keyTypes[SDB_MAX];
......
...@@ -78,6 +78,8 @@ typedef struct SFsmCbMeta { ...@@ -78,6 +78,8 @@ typedef struct SFsmCbMeta {
int32_t code; int32_t code;
ESyncState state; ESyncState state;
uint64_t seqNum; uint64_t seqNum;
SyncTerm term;
SyncTerm currentTerm;
} SFsmCbMeta; } SFsmCbMeta;
typedef struct SSyncFSM { typedef struct SSyncFSM {
...@@ -85,6 +87,7 @@ typedef struct SSyncFSM { ...@@ -85,6 +87,7 @@ typedef struct SSyncFSM {
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM; } SSyncFSM;
...@@ -117,7 +120,6 @@ typedef struct SSyncLogStore { ...@@ -117,7 +120,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore; } SSyncLogStore;
typedef struct SSyncInfo { typedef struct SSyncInfo {
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
...@@ -144,6 +146,7 @@ int32_t syncGetVgId(int64_t rid); ...@@ -144,6 +146,7 @@ int32_t syncGetVgId(int64_t rid);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,7 +28,7 @@ extern "C" { ...@@ -28,7 +28,7 @@ extern "C" {
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U) #define IsReq(pMsg) (pMsg->msgType & 1U)
extern int tsRpcHeadSize; extern int32_t tsRpcHeadSize;
typedef struct { typedef struct {
uint32_t clientIp; uint32_t clientIp;
...@@ -69,10 +69,10 @@ typedef struct SRpcInit { ...@@ -69,10 +69,10 @@ typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port uint16_t localPort; // local port
char * label; // for debug purpose char * label; // for debug purpose
int numOfThreads; // number of threads to handle connections int32_t numOfThreads; // number of threads to handle connections
int sessions; // number of sessions allowed int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int idleTime; // milliseconds, 0 means idle timer is disabled int32_t idleTime; // milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only // the following is for client app ecurity only
char *user; // user name char *user; // user name
...@@ -108,9 +108,9 @@ int32_t rpcInit(); ...@@ -108,9 +108,9 @@ int32_t rpcInit();
void rpcCleanup(); void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc); void * rpcOpen(const SRpcInit *pRpc);
void rpcClose(void *); void rpcClose(void *);
void * rpcMallocCont(int contLen); void * rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont); void rpcFreeCont(void *pCont);
void * rpcReallocCont(void *ptr, int contLen); void * rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode // Because taosd supports multi-process mode
// These functions should not be used on the server side // These functions should not be used on the server side
...@@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg); ...@@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock void rpcReleaseHandle(void *handle, int8_t type); // just release client conn to rpc instance, no close sock
// These functions will not be called in the child process // These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet); void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -24,19 +24,22 @@ extern "C" { ...@@ -24,19 +24,22 @@ extern "C" {
#endif #endif
typedef struct SMnodeMgmt { typedef struct SMnodeMgmt {
SDnodeData *pData; SDnodeData *pData;
SMnode *pMnode; SMnode *pMnode;
SMsgCb msgCb; SMsgCb msgCb;
const char *path; const char *path;
const char *name; const char *name;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
SSingleWorker monitorWorker; SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
bool stopped;
int32_t refCount;
TdThreadRwlock lock;
} SMnodeMgmt; } SMnodeMgmt;
// mmFile.c // mmFile.c
...@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); ...@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c // mmInt.c
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg);
int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt);
// mmHandle.c // mmHandle.c
SArray *mmGetMsgHandles(); SArray *mmGetMsgHandles();
......
...@@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() { ...@@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_REQUEST_VOTE_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:
......
...@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { ...@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) { if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
taosThreadRwlockDestroy(&pMgmt->lock);
pMgmt->pMnode = NULL; pMgmt->pMnode = NULL;
} }
...@@ -122,6 +123,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -122,6 +123,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return -1; return -1;
} }
if (syncInit() != 0) {
dError("failed to init sync since %s", terrstr());
return -1;
}
SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt)); SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
if (pMgmt == NULL) { if (pMgmt == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -137,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { ...@@ -137,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL);
bool deployed = false; bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) { if (mmReadFile(pMgmt, &deployed) != 0) {
...@@ -206,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() { ...@@ -206,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc; return mgmtFunc;
} }
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}
\ No newline at end of file
...@@ -56,6 +56,12 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -56,6 +56,12 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle;
pMsg->info.node = pMgmt->pMnode;
mndProcessSyncMsg(pMsg);
}
static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) { static int32_t mmPutNodeMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); dTrace("msg:%p, put into worker %s, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType));
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
...@@ -105,7 +111,17 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -105,7 +111,17 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); int32_t code = -1;
if (mmAcquire(pMgmt) == 0) {
code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
mmRelease(pMgmt);
}
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...@@ -149,7 +165,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -149,7 +165,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.min = 1, .min = 1,
.max = 1, .max = 1,
.name = "mnode-sync", .name = "mnode-sync",
.fp = (FItem)mmProcessQueue, .fp = (FItem)mmProcessSyncQueue,
.param = pMgmt, .param = pMgmt,
}; };
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
...@@ -174,6 +190,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -174,6 +190,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
while (pMgmt->refCount > 0) taosMsleep(10);
tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
......
...@@ -4,7 +4,7 @@ target_link_libraries( ...@@ -4,7 +4,7 @@ target_link_libraries(
dmnodeTest sut dmnodeTest sut
) )
add_test( #add_test(
NAME dmnodeTest # NAME dmnodeTest
COMMAND dmnodeTest # COMMAND dmnodeTest
) #)
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "mndDef.h" #include "mndDef.h"
#include "sdb.h" #include "sdb.h"
#include "syncTools.h"
#include "tcache.h" #include "tcache.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -31,12 +32,14 @@ ...@@ -31,12 +32,14 @@
extern "C" { extern "C" {
#endif #endif
// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} #define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} #define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} #define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
...@@ -72,10 +75,11 @@ typedef struct { ...@@ -72,10 +75,11 @@ typedef struct {
} STelemMgmt; } STelemMgmt;
typedef struct { typedef struct {
SWal *pWal;
int32_t errCode; int32_t errCode;
bool restored;
sem_t syncSem; sem_t syncSem;
SWal *pWal; int64_t sync;
SSyncNode *pSyncNode;
ESyncState state; ESyncState state;
} SSyncMgmt; } SSyncMgmt;
......
...@@ -26,6 +26,8 @@ int32_t mndInitSync(SMnode *pMnode); ...@@ -26,6 +26,8 @@ int32_t mndInitSync(SMnode *pMnode);
void mndCleanupSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode);
bool mndIsMaster(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode);
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
void mndSyncStart(SMnode *pMnode);
void mndSyncStop(SMnode *pMnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -17,178 +17,146 @@ ...@@ -17,178 +17,146 @@
#include "mndSync.h" #include "mndSync.h"
#include "mndTrans.h" #include "mndTrans.h"
static int32_t mndInitWal(SMnode *pMnode) { int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
char path[PATH_MAX] = {0}; int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {
.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC,
};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) return -1;
return 0;
}
static void mndCloseWal(SMnode *pMnode) { void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
SSdb *pSdb = pMnode->pSdb;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (pMgmt->pWal != NULL) { SSdbRaw *pRaw = pMsg->pCont;
walClose(pMgmt->pWal);
pMgmt->pWal = NULL; mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
sdbWriteWithoutFree(pSdb, pRaw);
sdbSetApplyIndex(pSdb, cbMeta.index);
sdbSetApplyTerm(pSdb, cbMeta.term);
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
tsem_post(&pMgmt->syncSem);
} }
} }
static int32_t mndRestoreWal(SMnode *pMnode) { int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SWal *pWal = pMnode->syncMgmt.pWal; SMnode *pMnode = pFsm->data;
SSdb *pSdb = pMnode->pSdb; pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0); pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
int32_t code = -1; return 0;
}
SWalReadHandle *pHandle = walOpenReadHandle(pWal);
if (pHandle == NULL) return -1;
int64_t first = walGetFirstVer(pWal);
int64_t last = walGetLastVer(pWal);
mDebug("start to restore wal, sdbver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, lastSdbVer, first, last);
first = TMAX(lastSdbVer + 1, first);
for (int64_t ver = first; ver >= 0 && ver <= last; ++ver) {
if (walReadWithHandle(pHandle, ver) < 0) {
mError("ver:%" PRId64 ", failed to read from wal since %s", ver, terrstr());
goto _OVER;
}
SWalHead *pHead = pHandle->pHead;
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
if (sdbVer + 1 != ver) {
terrno = TSDB_CODE_SDB_INVALID_WAl_VER;
mError("ver:%" PRId64 ", failed to write to sdb, since inconsistent with sdbver:%" PRId64, ver, sdbVer);
goto _OVER;
}
mTrace("ver:%" PRId64 ", will be restored, content:%p", ver, pHead->head.body);
if (sdbWriteWithoutFree(pSdb, (void *)pHead->head.body) < 0) {
mError("ver:%" PRId64 ", failed to write to sdb since %s", ver, terrstr());
goto _OVER;
}
sdbUpdateVer(pSdb, 1);
mDebug("ver:%" PRId64 ", is restored", ver);
}
int64_t sdbVer = sdbUpdateVer(pSdb, 0);
mDebug("restore wal finished, sdbver:%" PRId64, sdbVer);
void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
mndTransPullup(pMnode); mndTransPullup(pMnode);
sdbVer = sdbUpdateVer(pSdb, 0); pMnode->syncMgmt.restored = true;
mDebug("pullup trans finished, sdbver:%" PRId64, sdbVer); }
if (sdbVer != lastSdbVer) {
mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastSdbVer, sdbVer);
if (sdbWriteFile(pSdb) != 0) {
goto _OVER;
}
if (walCommit(pWal, sdbVer) != 0) {
goto _OVER;
}
if (walBeginSnapshot(pWal, sdbVer) < 0) {
goto _OVER;
}
if (walEndSnapshot(pWal) < 0) {
goto _OVER;
}
}
code = 0;
_OVER: SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
walCloseReadHandle(pHandle); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
return code; pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinish = mndRestoreFinish;
pFsm->FpRestoreSnapshot = NULL;
return pFsm;
} }
int32_t mndInitSync(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
if (mndInitWal(pMnode) < 0) { char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {
.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC,
};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) {
mError("failed to open wal since %s", terrstr()); mError("failed to open wal since %s", terrstr());
return -1; return -1;
} }
if (mndRestoreWal(pMnode) < 0) { SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
mError("failed to restore wal since %s", terrstr()); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
return -1; syncInfo.pWal = pMgmt->pWal;
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex;
for (int32_t i = 0; i < pMnode->replica; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMnode->replicas[i].port;
} }
if (pMnode->selfId == 1) { tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->state = TAOS_SYNC_STATE_LEADER; pMgmt->sync = syncOpen(&syncInfo);
if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr());
return -1;
} }
pMgmt->pSyncNode = NULL;
mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
return 0; return 0;
} }
void mndCleanupSync(SMnode *pMnode) { void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_destroy(&pMgmt->syncSem); syncStop(pMgmt->sync);
mndCloseWal(pMnode); mDebug("sync:%" PRId64 " is stopped", pMgmt->sync);
}
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
SMnode *pMnode = pData;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0; tsem_destroy(&pMgmt->syncSem);
tsem_post(&pMgmt->syncSem); if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
}
return 0; memset(pMgmt, 0, sizeof(SSyncMgmt));
} }
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
SWal *pWal = pMnode->syncMgmt.pWal;
SSdb *pSdb = pMnode->pSdb;
int64_t ver = sdbUpdateVer(pSdb, 1);
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
sdbUpdateVer(pSdb, -1);
mError("ver:%" PRId64 ", failed to write raw:%p to wal since %s", ver, pRaw, terrstr());
return -1;
}
mTrace("ver:%" PRId64 ", write to wal, raw:%p", ver, pRaw);
walCommit(pWal, ver);
walFsync(pWal, true);
#if 1
return 0;
#else
if (pMnode->replica == 1) return 0;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0; pMgmt->errCode = 0;
SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
rsp.pCont = rpcMallocCont(rsp.contLen);
bool isWeak = false; if (rsp.pCont == NULL) return -1;
int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak); memcpy(rsp.pCont, pRaw, rsp.contLen);
const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} else {
terrno = TSDB_CODE_APP_ERROR;
}
if (code != 0) return code; if (code != 0) return code;
tsem_wait(&pMgmt->syncSem);
return pMgmt->errCode; return pMgmt->errCode;
#endif
} }
void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync);
mDebug("sync:%" PRId64 " is started", pMgmt->sync);
}
void mndSyncStop(SMnode *pMnode) {}
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
return pMgmt->state == TAOS_SYNC_STATE_LEADER; pMgmt->state = syncGetMyRole(pMgmt->sync);
return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored);
} }
...@@ -682,13 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -682,13 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
} }
mDebug("trans:%d, sync finished", pTrans->id); mDebug("trans:%d, sync finished", pTrans->id);
code = sdbWrite(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
return -1;
}
return 0; return 0;
} }
...@@ -1360,19 +1353,35 @@ _OVER: ...@@ -1360,19 +1353,35 @@ _OVER:
return code; return code;
} }
static int32_t mndCompareTransId(int32_t *pTransId1, int32_t *pTransId2) { return *pTransId1 >= *pTransId2 ? 1 : 0; }
void mndTransPullup(SMnode *pMnode) { void mndTransPullup(SMnode *pMnode) {
STrans *pTrans = NULL; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_TRANS), sizeof(int32_t));
if (pArray == NULL) return;
void *pIter = NULL;
while (1) { while (1) {
STrans *pTrans = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) break; if (pIter == NULL) break;
taosArrayPush(pArray, &pTrans->id);
sdbRelease(pSdb, pTrans);
}
mndTransExecute(pMnode, pTrans); taosArraySort(pArray, (__compar_fn_t)mndCompareTransId);
sdbRelease(pMnode->pSdb, pTrans);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
int32_t *pTransId = taosArrayGet(pArray, i);
STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
if (pTrans != NULL) {
mndTransExecute(pMnode, pTrans);
}
mndReleaseTrans(pMnode, pTrans);
} }
sdbWriteFile(pMnode->pSdb); sdbWriteFile(pMnode->pSdb);
taosArrayDestroy(pArray);
} }
static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
......
...@@ -335,9 +335,107 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -335,9 +335,107 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return 0; return 0;
} }
int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); } int32_t mndStart(SMnode *pMnode) {
mndSyncStart(pMnode);
return mndInitTimer(pMnode);
}
void mndStop(SMnode *pMnode) {
mndSyncStop(pMnode);
return mndCleanupTimer(pMnode);
}
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
void *ahandle = pMsg->info.ahandle;
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pMnode->syncMgmt.sync);
assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pMnode->syncMgmt.sync);
SyncTerm currentTerm = syncGetMyTerm(pMnode->syncMgmt.sync);
SMsgHead *pHead = pMsg->pCont;
char logBuf[512];
char *syncNodeStr = sync2SimpleStr(pMnode->syncMgmt.sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_VND_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); } } else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else {
mError("==mndProcessSyncMsg== error msg type:%d", pRpcMsg->msgType);
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
syncNodeRelease(pSyncNode);
} else {
mError("==mndProcessSyncMsg== error syncEnv stop");
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
return ret;
}
int32_t mndProcessMsg(SRpcMsg *pMsg) { int32_t mndProcessMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
......
...@@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) { ...@@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 ); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 );
ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1); sdbSetApplyIndex(pSdb, -1);
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
ASSERT_EQ(mnode.insertTimes, 2); ASSERT_EQ(mnode.insertTimes, 2);
ASSERT_EQ(mnode.deleteTimes, 0); ASSERT_EQ(mnode.deleteTimes, 0);
...@@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) { ...@@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3); ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4);
ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
ASSERT_EQ(mnode.insertTimes, 3); ASSERT_EQ(mnode.insertTimes, 3);
ASSERT_EQ(mnode.deleteTimes, 0); ASSERT_EQ(mnode.deleteTimes, 0);
...@@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) { ...@@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) {
} }
// write version // write version
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0); sdbSetApplyIndex(pSdb, 0);
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 1); sdbSetApplyIndex(pSdb, 1);
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
ASSERT_EQ(sdbWriteFile(pSdb), 0); ASSERT_EQ(sdbWriteFile(pSdb), 0);
ASSERT_EQ(sdbWriteFile(pSdb), 0); ASSERT_EQ(sdbWriteFile(pSdb), 0);
...@@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) { ...@@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5);
ASSERT_EQ(sdbUpdateVer(pSdb, 0), 1); ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
ASSERT_EQ(mnode.insertTimes, 4); ASSERT_EQ(mnode.insertTimes, 4);
ASSERT_EQ(mnode.deleteTimes, 0); ASSERT_EQ(mnode.deleteTimes, 0);
......
...@@ -31,7 +31,7 @@ target_include_directories( ...@@ -31,7 +31,7 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode" PUBLIC "${TD_SOURCE_DIR}/include/dnode/mnode"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/../../inc"
) )
# add_test( #add_test(
# NAME transTest2 # NAME transTest2
# COMMAND transTest2 # COMMAND transTest2
# ) #)
...@@ -23,6 +23,11 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -23,6 +23,11 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return -1; return -1;
} }
int32_t putToQueue(void *pMgmt, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
class MndTestTrans2 : public ::testing::Test { class MndTestTrans2 : public ::testing::Test {
protected: protected:
static void InitLog() { static void InitLog() {
...@@ -55,6 +60,9 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -55,6 +60,9 @@ class MndTestTrans2 : public ::testing::Test {
msgCb.reportStartupFp = reportStartup; msgCb.reportStartupFp = reportStartup;
msgCb.sendReqFp = sendReq; msgCb.sendReqFp = sendReq;
msgCb.sendRspFp = sendRsp; msgCb.sendRspFp = sendRsp;
msgCb.queueFps[SYNC_QUEUE] = putToQueue;
msgCb.queueFps[WRITE_QUEUE] = putToQueue;
msgCb.queueFps[READ_QUEUE] = putToQueue;
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb); tmsgSetDefault(&msgCb);
...@@ -77,6 +85,7 @@ class MndTestTrans2 : public ::testing::Test { ...@@ -77,6 +85,7 @@ class MndTestTrans2 : public ::testing::Test {
static void SetUpTestSuite() { static void SetUpTestSuite() {
InitLog(); InitLog();
walInit(); walInit();
syncInit();
InitMnode(); InitMnode();
} }
......
...@@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) {
char path[PATH_MAX + 100] = {0}; char path[PATH_MAX + 100] = {0};
snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP); snprintf(path, sizeof(path), "%s%sdata", pOption->path, TD_DIRSEP);
pSdb->currDir = strdup(path); pSdb->currDir = strdup(path);
snprintf(path, sizeof(path), "%s%ssync", pOption->path, TD_DIRSEP);
pSdb->syncDir = strdup(path);
snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP); snprintf(path, sizeof(path), "%s%stmp", pOption->path, TD_DIRSEP);
pSdb->tmpDir = strdup(path); pSdb->tmpDir = strdup(path);
if (pSdb->currDir == NULL || pSdb->currDir == NULL || pSdb->currDir == NULL) { if (pSdb->currDir == NULL || pSdb->tmpDir == NULL) {
sdbCleanup(pSdb); sdbCleanup(pSdb);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to init sdb since %s", terrstr()); mError("failed to init sdb since %s", terrstr());
...@@ -55,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -55,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
} }
pSdb->curVer = -1; pSdb->curVer = -1;
pSdb->curTerm = -1;
pSdb->lastCommitVer = -1; pSdb->lastCommitVer = -1;
pSdb->pMnode = pOption->pMnode; pSdb->pMnode = pOption->pMnode;
mDebug("sdb init successfully"); mDebug("sdb init successfully");
...@@ -149,12 +148,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) { ...@@ -149,12 +148,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return -1; return -1;
} }
if (taosMkDir(pSdb->syncDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr());
return -1;
}
if (taosMkDir(pSdb->tmpDir) != 0) { if (taosMkDir(pSdb->tmpDir) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr()); mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
...@@ -164,4 +157,10 @@ static int32_t sdbCreateDir(SSdb *pSdb) { ...@@ -164,4 +157,10 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return 0; return 0;
} }
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); } void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
\ No newline at end of file
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; }
...@@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = 0; int64_t maxId = 0;
ret = taosReadFile(pFile, &maxId, sizeof(int64_t)); ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
...@@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = 0; int64_t maxId = 0;
if (i < SDB_MAX) { if (i < SDB_MAX) {
...@@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) { ...@@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
if (sdbReadFileHead(pSdb, pFile) != 0) { if (sdbReadFileHead(pSdb, pFile) != 0) {
mError("failed to read file:%s head since %s", file, terrstr()); mError("failed to read file:%s head since %s", file, terrstr());
pSdb->curVer = -1; pSdb->curVer = -1;
pSdb->curTerm = -1;
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
taosCloseFile(&pFile); taosCloseFile(&pFile);
return -1; return -1;
...@@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char curfile[PATH_MAX] = {0}; char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
pSdb->lastCommitVer); pSdb->curTerm, pSdb->lastCommitVer);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mError("failed to write file:%s since %s", curfile, tstrerror(code)); mError("failed to write file:%s since %s", curfile, tstrerror(code));
} else { } else {
pSdb->lastCommitVer = pSdb->curVer; pSdb->lastCommitVer = pSdb->curVer;
mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer); mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
} }
terrno = code; terrno = code;
......
...@@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) { ...@@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
}
return code;
}
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
...@@ -141,5 +147,6 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { ...@@ -141,5 +147,6 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinish = NULL;
return pFsm; return pFsm;
} }
\ No newline at end of file
...@@ -36,8 +36,8 @@ typedef struct SSyncIO { ...@@ -36,8 +36,8 @@ typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset * pQset; STaosQset * pQset;
TdThread consumerTid; TdThread consumerTid;
void *serverRpc; void * serverRpc;
void *clientRpc; void * clientRpc;
SEpSet myAddr; SEpSet myAddr;
SMsgCb msgcb; SMsgCb msgcb;
......
...@@ -147,6 +147,11 @@ typedef struct SSyncNode { ...@@ -147,6 +147,11 @@ typedef struct SSyncNode {
// tools // tools
SSyncRespMgr* pSyncRespMgr; SSyncRespMgr* pSyncRespMgr;
// restore state
bool restoreFinish;
//sem_t restoreSem;
SSnapshot* pSnapshot;
} SSyncNode; } SSyncNode;
// open/close -------------- // open/close --------------
......
...@@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
// if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
...@@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.code = 0; cbMeta.code = 0;
cbMeta.state = ths->state; cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
bool needExecute = true;
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
} }
// config change // config change
...@@ -349,6 +359,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -349,6 +359,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
} }
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinish != NULL) {
ths->pFsm->FpRestoreFinish(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
/*
tsem_post(&ths->restoreSem);
sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
*/
}
}
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
......
...@@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
// if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta; SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
...@@ -110,7 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -110,7 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta.code = 0; cbMeta.code = 0;
cbMeta.state = pSyncNode->state; cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum;
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
bool needExecute = true;
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
}
} }
// config change // config change
...@@ -127,6 +136,22 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -127,6 +136,22 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} }
} }
// restore finish
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
if (pSyncNode->restoreFinish == false) {
if (pSyncNode->pFsm->FpRestoreFinish != NULL) {
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
}
pSyncNode->restoreFinish = true;
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
/*
tsem_post(&pSyncNode->restoreSem);
sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode);
*/
}
}
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
...@@ -162,4 +187,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { ...@@ -162,4 +187,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
} }
} }
return false; return false;
} }
\ No newline at end of file
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <stdint.h>
#include "sync.h" #include "sync.h"
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
...@@ -55,14 +54,17 @@ static void syncFreeNode(void* param); ...@@ -55,14 +54,17 @@ static void syncFreeNode(void* param);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
int32_t ret; int32_t ret = 0;
tsNodeRefId = taosOpenRef(200, syncFreeNode);
if (tsNodeRefId < 0) { if (!syncEnvIsStart()) {
sError("failed to init node ref"); tsNodeRefId = taosOpenRef(200, syncFreeNode);
syncCleanUp(); if (tsNodeRefId < 0) {
ret = -1; sError("failed to init node ref");
} else { syncCleanUp();
ret = syncEnvStart(); ret = -1;
} else {
ret = syncEnvStart();
}
} }
return ret; return ret;
...@@ -155,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) { ...@@ -155,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) {
return state; return state;
} }
bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
assert(rid == pSyncNode->rid);
bool b = pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return b;
}
const char* syncGetMyRoleStr(int64_t rid) { const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid)); const char* s = syncUtilState2String(syncGetMyRole(rid));
return s; return s;
...@@ -240,7 +254,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { ...@@ -240,7 +254,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
return ret; return ret;
} }
void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
...@@ -306,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -306,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
rpcFreeCont(pMsg->pCont);
return TAOS_SYNC_PROPOSE_OTHER_ERROR; return TAOS_SYNC_PROPOSE_OTHER_ERROR;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
...@@ -490,6 +506,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -490,6 +506,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
assert(pSyncNode->pSyncRespMgr != NULL); assert(pSyncNode->pSyncRespMgr != NULL);
// restore state
pSyncNode->restoreFinish = false;
pSyncNode->pSnapshot = NULL;
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
}
//tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
// syncNodeBecomeFollower(pSyncNode); // syncNodeBecomeFollower(pSyncNode);
...@@ -509,6 +534,20 @@ void syncNodeStart(SSyncNode* pSyncNode) { ...@@ -509,6 +534,20 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// use this now // use this now
syncNodeAppendNoop(pSyncNode); syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
/*
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
}
*/
sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
return; return;
} }
...@@ -518,6 +557,19 @@ void syncNodeStart(SSyncNode* pSyncNode) { ...@@ -518,6 +557,19 @@ void syncNodeStart(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode); // ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0); assert(ret == 0);
/*
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
}
*/
sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
} }
void syncNodeStartStandBy(SSyncNode* pSyncNode) { void syncNodeStartStandBy(SSyncNode* pSyncNode) {
...@@ -554,6 +606,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -554,6 +606,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pFsm); taosMemoryFree(pSyncNode->pFsm);
} }
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
}
//tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode // free memory in syncFreeNode
// taosMemoryFree(pSyncNode); // taosMemoryFree(pSyncNode);
} }
......
...@@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { ...@@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0; return 0;
} }
void FpRestoreFinishCb(struct SSyncFSM* pFsm) {
sTrace("==callback== ==FpRestoreFinishCb==");
}
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb; pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb; pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinish = FpRestoreFinishCb;
return pFsm; return pFsm;
} }
......
...@@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) { ...@@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) {
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
sprintf(tsTempDir, "%s", ".");
// taosInitLog((char *)"syncTest.log", 100000, 10); // taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143 + 64; sDebugFlag = 143 + 64;
......
...@@ -94,7 +94,7 @@ typedef void* queue[2]; ...@@ -94,7 +94,7 @@ typedef void* queue[2];
/* Return the structure holding the given element. */ /* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval #define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout #define TRANS_CONN_TIMEOUT 3 // connect timeout
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#include "transComm.h" #include "transComm.h"
void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int32_t numOfThreads, void* fp, void* shandle) = {
transInitServer, transInitClient}; transInitServer, transInitClient};
void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
...@@ -77,37 +77,38 @@ void rpcClose(void* arg) { ...@@ -77,37 +77,38 @@ void rpcClose(void* arg) {
taosMemoryFree(pRpc); taosMemoryFree(pRpc);
return; return;
} }
void* rpcMallocCont(int contLen) {
int size = contLen + TRANS_MSG_OVERHEAD;
char* start = (char*)taosMemoryCalloc(1, (size_t)size); void* rpcMallocCont(int32_t contLen) {
int32_t size = contLen + TRANS_MSG_OVERHEAD;
char* start = taosMemoryCalloc(1, size);
if (start == NULL) { if (start == NULL) {
tError("failed to malloc msg, size:%d", size); tError("failed to malloc msg, size:%d", size);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} else { } else {
tTrace("malloc mem:%p size:%d", start, size); tTrace("malloc mem:%p size:%d", start, size);
} }
return start + sizeof(STransMsgHead); return start + sizeof(STransMsgHead);
} }
void rpcFreeCont(void* cont) {
// impl
if (cont == NULL) {
return;
}
void rpcFreeCont(void* cont) {
if (cont == NULL) return;
taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD); taosMemoryFree((char*)cont - TRANS_MSG_OVERHEAD);
tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD); tTrace("free mem: %p", (char*)cont - TRANS_MSG_OVERHEAD);
} }
void* rpcReallocCont(void* ptr, int contLen) {
if (ptr == NULL) { void* rpcReallocCont(void* ptr, int32_t contLen) {
return rpcMallocCont(contLen); if (ptr == NULL) return rpcMallocCont(contLen);
}
char* st = (char*)ptr - TRANS_MSG_OVERHEAD; char* st = (char*)ptr - TRANS_MSG_OVERHEAD;
int sz = contLen + TRANS_MSG_OVERHEAD; int32_t sz = contLen + TRANS_MSG_OVERHEAD;
st = taosMemoryRealloc(st, sz); st = taosMemoryRealloc(st, sz);
if (st == NULL) { if (st == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
return st + TRANS_MSG_OVERHEAD; return st + TRANS_MSG_OVERHEAD;
} }
...@@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { ...@@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
assert(0); assert(0);
} }
int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; } void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
transSendRequest(shandle, pEpSet, pMsg, NULL); transSendRequest(shandle, pEpSet, pMsg, NULL);
...@@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { ...@@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp); transSendRecv(shandle, pEpSet, pMsg, pRsp);
} }
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); } int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return transGetConnInfo((void*)thandle, pInfo); }
void rpcRefHandle(void* handle, int8_t type) { void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
......
...@@ -262,7 +262,7 @@ void dumpCluster(SSdb *pSdb, SJson *json) { ...@@ -262,7 +262,7 @@ void dumpCluster(SSdb *pSdb, SJson *json) {
} }
void dumpTrans(SSdb *pSdb, SJson *json) { void dumpTrans(SSdb *pSdb, SJson *json) {
void *pIter = NULL; void *pIter = NULL;
SJson *items = tjsonCreateObject(); SJson *items = tjsonCreateObject();
tjsonAddItemToObject(json, "transactions", items); tjsonAddItemToObject(json, "transactions", items);
...@@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) { ...@@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
void dumpHeader(SSdb *pSdb, SJson *json) { void dumpHeader(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(json, "sver", 1); tjsonAddIntegerToObject(json, "sver", 1);
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer)); tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer));
tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm));
SJson *maxIdsJson = tjsonCreateObject(); SJson *maxIdsJson = tjsonCreateObject();
tjsonAddItemToObject(json, "maxIds", maxIdsJson); tjsonAddItemToObject(json, "maxIds", maxIdsJson);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册