提交 b10cff8b 编写于 作者: S Shengliang Guan

framework integrated with sync

上级 1f70617b
...@@ -27,7 +27,6 @@ typedef struct SMnodeMsg SMnodeMsg; ...@@ -27,7 +27,6 @@ typedef struct SMnodeMsg SMnodeMsg;
typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg);
typedef struct SMnodeLoad { typedef struct SMnodeLoad {
int64_t numOfDnode; int64_t numOfDnode;
...@@ -63,7 +62,6 @@ typedef struct { ...@@ -63,7 +62,6 @@ typedef struct {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SMnodeCfg cfg; SMnodeCfg cfg;
SDnode *pDnode; SDnode *pDnode;
PutMsgToMnodeQFp putMsgToApplyMsgFp;
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
...@@ -172,14 +170,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg); ...@@ -172,14 +170,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg);
*/ */
void mndProcessSyncMsg(SMnodeMsg *pMsg); void mndProcessSyncMsg(SMnodeMsg *pMsg);
/**
* @brief Process the apply request.
*
* @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure.
*/
void mndProcessApplyMsg(SMnodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -80,7 +80,6 @@ typedef struct { ...@@ -80,7 +80,6 @@ typedef struct {
SRWLatch latch; SRWLatch latch;
taos_queue pReadQ; taos_queue pReadQ;
taos_queue pWriteQ; taos_queue pWriteQ;
taos_queue pApplyQ;
taos_queue pSyncQ; taos_queue pSyncQ;
taos_queue pMgmtQ; taos_queue pMgmtQ;
SWorkerPool mgmtPool; SWorkerPool mgmtPool;
......
...@@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) { ...@@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) {
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads);
contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = 9527}; SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status msg to mnode", pDnode); dTrace("pDnode:%p, send status msg to mnode", pDnode);
......
...@@ -28,18 +28,15 @@ static void dndCleanupMnodeSyncWorker(SDnode *pDnode); ...@@ -28,18 +28,15 @@ static void dndCleanupMnodeSyncWorker(SDnode *pDnode);
static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); static void dndCleanupMnodeMgmtWorker(SDnode *pDnode);
static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode);
static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode);
static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode);
static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode);
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode); static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode);
static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode);
static void dndFreeMnodeWriteQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode);
static void dndFreeMnodeApplyQueue(SDnode *pDnode);
static void dndFreeMnodeSyncQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode);
static void dndFreeMnodeMgmtQueue(SDnode *pDnode); static void dndFreeMnodeMgmtQueue(SDnode *pDnode);
static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg);
static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg);
...@@ -47,7 +44,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp ...@@ -47,7 +44,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp
void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t dndStartMnodeWorker(SDnode *pDnode); static int32_t dndStartMnodeWorker(SDnode *pDnode);
static void dndStopMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode);
...@@ -271,11 +267,6 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) { ...@@ -271,11 +267,6 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) {
return -1; return -1;
} }
if (dndAllocMnodeApplyQueue(pDnode) != 0) {
dError("failed to alloc mnode apply queue since %s", terrstr());
return -1;
}
if (dndAllocMnodeSyncQueue(pDnode) != 0) { if (dndAllocMnodeSyncQueue(pDnode) != 0) {
dError("failed to alloc mnode sync queue since %s", terrstr()); dError("failed to alloc mnode sync queue since %s", terrstr());
return -1; return -1;
...@@ -293,7 +284,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { ...@@ -293,7 +284,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
while (pMgmt->refCount > 1) taosMsleep(10); while (pMgmt->refCount > 1) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pApplyQ)) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10);
while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10);
...@@ -303,7 +293,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { ...@@ -303,7 +293,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) {
dndFreeMnodeReadQueue(pDnode); dndFreeMnodeReadQueue(pDnode);
dndFreeMnodeWriteQueue(pDnode); dndFreeMnodeWriteQueue(pDnode);
dndFreeMnodeApplyQueue(pDnode);
dndFreeMnodeSyncQueue(pDnode); dndFreeMnodeSyncQueue(pDnode);
} }
...@@ -328,7 +317,6 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -328,7 +317,6 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) {
pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendMsgToDnodeFp = dndSendMsgToDnode;
pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendMsgToMnodeFp = dndSendMsgToMnode;
pOption->sendRedirectMsgFp = dndSendRedirectMsg; pOption->sendRedirectMsgFp = dndSendRedirectMsg;
pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue;
pOption->dnodeId = dndGetDnodeId(pDnode); pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode); pOption->clusterId = dndGetClusterId(pDnode);
pOption->cfg.sver = pDnode->opt.sver; pOption->cfg.sver = pDnode->opt.sver;
...@@ -604,20 +592,6 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { ...@@ -604,20 +592,6 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
mndCleanupMsg(pMsg); mndCleanupMsg(pMsg);
} }
static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode != NULL) {
mndProcessApplyMsg(pMsg);
dndReleaseMnode(pDnode, pMnode);
} else {
mndSendRsp(pMsg, terrno);
}
mndCleanupMsg(pMsg);
}
static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
...@@ -712,19 +686,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -712,19 +686,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndReleaseMnode(pDnode, pMnode); dndReleaseMnode(pDnode, pMnode);
} }
static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = dndAcquireMnode(pDnode);
if (pMnode == NULL) {
return -1;
}
int32_t code = taosWriteQitem(pMgmt->pApplyQ, pMsg);
dndReleaseMnode(pDnode, pMnode);
return code;
}
static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue); pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue);
...@@ -817,23 +778,6 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) { ...@@ -817,23 +778,6 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) {
pMgmt->pWriteQ = NULL; pMgmt->pWriteQ = NULL;
} }
static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeApplyQueue);
if (pMgmt->pApplyQ == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static void dndFreeMnodeApplyQueue(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pApplyQ);
pMgmt->pApplyQ = NULL;
}
static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SWorkerPool *pPool = &pMgmt->writePool; SWorkerPool *pPool = &pMgmt->writePool;
......
...@@ -61,6 +61,13 @@ typedef struct { ...@@ -61,6 +61,13 @@ typedef struct {
char email[TSDB_FQDN_LEN]; char email[TSDB_FQDN_LEN];
} STelemMgmt; } STelemMgmt;
typedef struct {
int32_t errCode;
sem_t syncSem;
SSyncNode *pSyncNode;
ESyncState state;
} SSyncMgmt;
typedef struct SMnode { typedef struct SMnode {
int32_t dnodeId; int32_t dnodeId;
int32_t clusterId; int32_t clusterId;
...@@ -77,11 +84,11 @@ typedef struct SMnode { ...@@ -77,11 +84,11 @@ typedef struct SMnode {
SShowMgmt showMgmt; SShowMgmt showMgmt;
SProfileMgmt profileMgmt; SProfileMgmt profileMgmt;
STelemMgmt telemMgmt; STelemMgmt telemMgmt;
SSyncMgmt syncMgmt;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp;
SendRedirectMsgFp sendRedirectMsgFp; SendRedirectMsgFp sendRedirectMsgFp;
PutMsgToMnodeQFp putMsgToApplyMsgFp;
} SMnode; } SMnode;
void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
......
...@@ -14,26 +14,54 @@ ...@@ -14,26 +14,54 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "mndSync.h"
#include "mndInt.h"
#include "mndTrans.h"
int32_t mndInitSync(SMnode *pMnode) { return 0; } int32_t mndInitSync(SMnode *pMnode) {
void mndCleanupSync(SMnode *pMnode) {} SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_init(&pMgmt->syncSem, 0, 0);
pMgmt->state = TAOS_SYNC_STATE_LEADER;
pMgmt->pSyncNode = NULL;
return 0;
}
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
tsem_destroy(&pMgmt->syncSem);
}
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
SMnode *pMnode = pData;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
tsem_post(&pMgmt->syncSem);
return 0;
}
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
int32_t code = 0; #if 1
return 0;
#else
if (pMnode->replica == 1) return 0;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = 0;
SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};
bool isWeak = false;
int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak);
// int32_t len = sdbGetRawTotalSize(pRaw); if (code != 0) return code;
// SSdbRaw *pReceived = calloc(1, len);
// memcpy(pReceived, pRaw, len);
// mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg);
// mndTransApply(pMnode, pReceived, code); tsem_wait(&pMgmt->syncSem);
return code; return pMgmt->errCode;
#endif
} }
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
// pMnode->role = TAOS_SYNC_STATE_LEADER; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
return true; return pMgmt->state == TAOS_SYNC_STATE_LEADER;
} }
\ No newline at end of file
...@@ -746,7 +746,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { ...@@ -746,7 +746,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) {
} }
} }
return 0; return code;
} }
static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
......
...@@ -202,7 +202,6 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -202,7 +202,6 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode; pMnode->pDnode = pOption->pDnode;
pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp;
pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp;
pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp;
pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp;
...@@ -217,8 +216,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { ...@@ -217,8 +216,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo); pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL ||
pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->cfg.statusInterval < 1) {
pMnode->cfg.statusInterval < 1) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS; terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1; return -1;
} }
...@@ -438,8 +436,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } ...@@ -438,8 +436,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); }
void mndProcessApplyMsg(SMnodeMsg *pMsg) {}
uint64_t mndGenerateUid(char *name, int32_t len) { uint64_t mndGenerateUid(char *name, int32_t len) {
int64_t us = taosGetTimestampUs(); int64_t us = taosGetTimestampUs();
int32_t hashval = MurmurHash3_32(name, len); int32_t hashval = MurmurHash3_32(name, len);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册