提交 04af6494 编写于 作者: S Shengliang Guan

shm

上级 41c90280
...@@ -63,13 +63,24 @@ typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat; ...@@ -63,13 +63,24 @@ typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANUP } EEnvStat;
typedef struct SMgmtFp SMgmtFp; typedef struct SMgmtFp SMgmtFp;
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SMsgHandle SMsgHandle;
typedef void (*RpcMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps);
typedef void (*NodeMsgFp)(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps);
typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg); typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg);
typedef SMgmtWrapper *(*MgmtOpenFp)(SDnode *pDnode, const char *path); typedef SMgmtWrapper *(*MgmtOpenFp)(SDnode *pDnode, const char *path);
typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pMgmt); typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pWrapper);
typedef bool (*MgmtRequiredFp)(SMgmtWrapper *pMgmt); typedef bool (*MgmtRequiredFp)(SMgmtWrapper *pWrapper);
typedef SArray *(*MgmtMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); typedef int32_t (*MgmtHandleMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg);
typedef SMsgHandle (*GetMsgHandleFp)(SMgmtWrapper *pWrapper, int32_t msgIndex);
typedef struct SMsgHandle {
RpcMsgFp rpcMsgFp;
NodeMsgFp nodeMsgFp;
SMgmtWrapper *pWrapper;
} SMsgHandle;
typedef struct { typedef struct {
EWorkerType type; EWorkerType type;
...@@ -116,7 +127,7 @@ typedef struct { ...@@ -116,7 +127,7 @@ typedef struct {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
// //
MndMsgFp msgFp[TDMT_MAX]; SMsgHandle msgHandles[TDMT_MAX];
SProcObj *pProcess; SProcObj *pProcess;
bool singleProc; bool singleProc;
} SMnodeMgmt; } SMnodeMgmt;
...@@ -171,16 +182,16 @@ typedef struct { ...@@ -171,16 +182,16 @@ typedef struct {
} SVnodesMgmt; } SVnodesMgmt;
typedef struct { typedef struct {
void *serverRpc; void *serverRpc;
void *clientRpc; void *clientRpc;
DndMsgFp msgFp[TDMT_MAX]; SMsgHandle msgHandles[TDMT_MAX];
} STransMgmt; } STransMgmt;
typedef struct SMgmtFp { typedef struct SMgmtFp {
MgmtOpenFp openFp; MgmtOpenFp openFp;
MgmtCloseFp closeFp; MgmtCloseFp closeFp;
MgmtRequiredFp requiredFp; MgmtRequiredFp requiredFp;
MgmtMsgFp msgFp; GetMsgHandleFp getMsgHandleFp;
} SMgmtFp; } SMgmtFp;
typedef struct SMgmtWrapper { typedef struct SMgmtWrapper {
...@@ -216,6 +227,10 @@ void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); ...@@ -216,6 +227,10 @@ void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(char *dataDir); TdFilePtr dndCheckRunning(char *dataDir);
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) ;
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
extern "C" { extern "C" {
#endif #endif
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -23,8 +23,9 @@ extern "C" { ...@@ -23,8 +23,9 @@ extern "C" {
#endif #endif
int32_t dndInitTrans(SDnode *pDnode); int32_t dndInitTrans(SDnode *pDnode);
void dndCleanupTransClient(SDnode *pDnode);
void dndCleanupTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg); int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pRpcMsg);
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
static int8_t once = DND_ENV_INIT;
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
void dndSetStatus(SDnode *pDnode, EDndStatus status) { void dndSetStatus(SDnode *pDnode, EDndStatus status) {
...@@ -72,7 +74,70 @@ TdFilePtr dndCheckRunning(char *dataDir) { ...@@ -72,7 +74,70 @@ TdFilePtr dndCheckRunning(char *dataDir) {
return pFile; return pFile;
} }
int32_t dndInit() {
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr());
return -1;
}
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
if (rpcInit() != 0) {
dError("failed to init rpc since %s", terrstr());
dndCleanup();
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
dndCleanup();
return -1;
}
// SVnodeOpt vnodeOpt = {
// .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp =
// dndSendReqToDnode};
// if (vnodeInit(&vnodeOpt) != 0) {
// dError("failed to init vnode since %s", terrstr());
// dndCleanup();
// return -1;
// }
SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp};
if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr());
dndCleanup();
return -1;
}
dInfo("dnode env is initialized");
return 0;
}
void dndCleanup() {
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up");
return;
}
walCleanUp();
// vnodeCleanup();
rpcCleanup();
monCleanup();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode); dInfo("dnode object receive event %d, data:%p", event, pDnode);
pDnode->event = event; pDnode->event = event;
} }
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) {
return &pDnode->mgmts[nodeType];
}
\ No newline at end of file
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
#include "smInt.h" #include "smInt.h"
#include "vmInt.h" #include "vmInt.h"
static int8_t once = DND_ENV_INIT;
static void dndResetLog(SMgmtWrapper *pMgmt) { static void dndResetLog(SMgmtWrapper *pMgmt) {
char logname[24] = {0}; char logname[24] = {0};
snprintf(logname, sizeof(logname), "%slog", pMgmt->name); snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
...@@ -171,55 +169,6 @@ _OVER: ...@@ -171,55 +169,6 @@ _OVER:
return pDnode; return pDnode;
} }
#if 0
if (dndInitVnodes(pDnode) != 0) {
dError("failed to init vnodes since %s", terrstr());
dndClose(pDnode);
return NULL;
}
if (dndInitQnode(pDnode) != 0) {
dError("failed to init qnode since %s", terrstr());
dndClose(pDnode);
return NULL;
}
if (dndInitSnode(pDnode) != 0) {
dError("failed to init snode since %s", terrstr());
dndClose(pDnode);
return NULL;
}
if (dndInitBnode(pDnode) != 0) {
dError("failed to init bnode since %s", terrstr());
dndClose(pDnode);
return NULL;
}
if (mmInit(pDnode) != 0) {
dError("failed to init mnode since %s", terrstr());
dndClose(pDnode);
return NULL;
}
// mmCleanup(pDnode);
// dndCleanupBnode(pDnode);
// dndCleanupSnode(pDnode);
// dndCleanupQnode(pDnode);
// dndCleanupVnodes(pDnode);
return pDnode;
}
#endif
void dndClose(SDnode *pDnode) { void dndClose(SDnode *pDnode) {
if (pDnode == NULL) return; if (pDnode == NULL) return;
...@@ -237,67 +186,12 @@ void dndClose(SDnode *pDnode) { ...@@ -237,67 +186,12 @@ void dndClose(SDnode *pDnode) {
dInfo("dnode object is closed, data:%p", pDnode); dInfo("dnode object is closed, data:%p", pDnode);
} }
int32_t dndInit() {
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr());
return -1;
}
taosIgnSIGPIPE();
taosBlockSIGPIPE();
taosResolveCRC();
if (rpcInit() != 0) {
dError("failed to init rpc since %s", terrstr());
dndCleanup();
return -1;
}
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
dndCleanup();
return -1;
}
// SVnodeOpt vnodeOpt = {
// .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp =
// dndSendReqToDnode};
// if (vnodeInit(&vnodeOpt) != 0) {
// dError("failed to init vnode since %s", terrstr());
// dndCleanup();
// return -1;
// }
SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn, .comp = tsMonitorComp};
if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr());
dndCleanup();
return -1;
}
dInfo("dnode env is initialized");
return 0;
}
void dndCleanup() {
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up");
return;
}
walCleanUp();
// vnodeCleanup();
rpcCleanup();
monCleanup();
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
void dndRun(SDnode *pDnode) { void dndRun(SDnode *pDnode) {
while (pDnode->event != DND_EVENT_STOP) { while (pDnode->event != DND_EVENT_STOP) {
taosMsleep(100); taosMsleep(100);
} }
} }
void dndProcessRpcMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {
}
\ No newline at end of file
...@@ -22,13 +22,13 @@ ...@@ -22,13 +22,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndTransport.h" #include "dndTransport.h"
#include "dndMgmt.h" #include "dndMgmt.h"
#include "mm.h" #include "mmInt.h"
#include "dndVnodes.h"
#define INTERNAL_USER "_dnd" #define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
#if 0
static void dndInitMsgFp(STransMgmt *pMgmt) { static void dndInitMsgFp(STransMgmt *pMgmt) {
// Requests handled by DNODE // Requests handled by DNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg;
...@@ -155,8 +155,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -155,8 +155,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_HEARTBEAT)] = dndProcessVnodeFetchMsg;
} }
#endif
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode * pDnode = parent; SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
...@@ -168,11 +170,11 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { ...@@ -168,11 +170,11 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
return; return;
} }
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (fp != NULL) { if (pHandle->rpcMsgFp != NULL) {
dTrace("RPC %p, rsp:%s will be processed, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->code & 0XFFFF, dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType),
pRsp->ahandle); pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle);
(*fp)(pDnode, pRsp, pEpSet); (*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pRsp, pEpSet);
} else { } else {
dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle);
rpcFreeCont(pRsp->pCont); rpcFreeCont(pRsp->pCont);
...@@ -184,7 +186,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -184,7 +186,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "D-C"; rpcInit.label = "CLI";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse; rpcInit.cfp = dndProcessResponse;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
...@@ -209,7 +211,7 @@ static int32_t dndInitClient(SDnode *pDnode) { ...@@ -209,7 +211,7 @@ static int32_t dndInitClient(SDnode *pDnode) {
return 0; return 0;
} }
static void dndCleanupClient(SDnode *pDnode) { void dndCleanupClient(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
if (pMgmt->clientRpc) { if (pMgmt->clientRpc) {
rpcClose(pMgmt->clientRpc); rpcClose(pMgmt->clientRpc);
...@@ -219,7 +221,7 @@ static void dndCleanupClient(SDnode *pDnode) { ...@@ -219,7 +221,7 @@ static void dndCleanupClient(SDnode *pDnode) {
} }
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode * pDnode = param; SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
tmsg_t msgType = pReq->msgType; tmsg_t msgType = pReq->msgType;
...@@ -250,10 +252,11 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { ...@@ -250,10 +252,11 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
return; return;
} }
DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (fp != NULL) { if (pHandle->rpcMsgFp != NULL) {
dTrace("RPC %p, req:%s will be processed, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name,
(*fp)(pDnode, pReq, pEpSet); pReq->ahandle);
(*pHandle->rpcMsgFp)(pDnode, pHandle->pWrapper, pReq, pEpSet);
} else { } else {
dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle}; SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
...@@ -270,37 +273,37 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp ...@@ -270,37 +273,37 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp
rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp);
} }
static int32_t dndAuthInternalReq(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
int32_t code = 0;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
if (strcmp(user, INTERNAL_USER) == 0) { if (strcmp(user, INTERNAL_USER) == 0) {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass); taosEncryptPass_c((uint8_t *)(INTERNAL_SECRET), strlen(INTERNAL_SECRET), pass);
memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 1;
*encrypt = 0;
*ckey = 0;
return 0;
} else if (strcmp(user, TSDB_NETTEST_USER) == 0) { } else if (strcmp(user, TSDB_NETTEST_USER) == 0) {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass); taosEncryptPass_c((uint8_t *)(TSDB_NETTEST_USER), strlen(TSDB_NETTEST_USER), pass);
} else {
code = -1;
}
if (code == 0) {
memcpy(secret, pass, TSDB_PASSWORD_LEN); memcpy(secret, pass, TSDB_PASSWORD_LEN);
*spi = 1; *spi = 1;
*encrypt = 0; *encrypt = 0;
*ckey = 0; *ckey = 0;
return 0;
} else {
return -1;
} }
return code;
} }
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent; SDnode *pDnode = parent;
if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) { if (dndGetHideUserAuth(parent, user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0; return 0;
} }
if (dndGetUserAuthFromMnode(pDnode, user, spi, encrypt, secret, ckey) == 0) { if (mmGetUserAuth(dndGetWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0; return 0;
} }
...@@ -313,7 +316,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -313,7 +316,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
...@@ -341,7 +344,6 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -341,7 +344,6 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
static int32_t dndInitServer(SDnode *pDnode) { static int32_t dndInitServer(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt; STransMgmt *pMgmt = &pDnode->tmgmt;
dndInitMsgFp(pMgmt);
int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0);
if (numOfThreads < 1) { if (numOfThreads < 1) {
...@@ -351,7 +353,7 @@ static int32_t dndInitServer(SDnode *pDnode) { ...@@ -351,7 +353,7 @@ static int32_t dndInitServer(SDnode *pDnode) {
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = pDnode->cfg.serverPort; rpcInit.localPort = pDnode->cfg.serverPort;
rpcInit.label = "D-S"; rpcInit.label = "SRV";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest; rpcInit.cfp = dndProcessRequest;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
...@@ -379,7 +381,40 @@ static void dndCleanupServer(SDnode *pDnode) { ...@@ -379,7 +381,40 @@ static void dndCleanupServer(SDnode *pDnode) {
} }
} }
static int32_t dndSetMsgHandle(SDnode *pDnode) {
STransMgmt *pMgmt = &pDnode->tmgmt;
for (ENodeType nodeType = 0; nodeType < NODE_MAX; ++nodeType) {
SMgmtWrapper *pWrapper = &pDnode->mgmts[nodeType];
GetMsgHandleFp getMsgHandleFp = pDnode->fps[nodeType].getMsgHandleFp;
if (getMsgHandleFp == NULL) continue;
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
SMsgHandle msgHandle = (*getMsgHandleFp)(pWrapper, msgIndex);
if (msgHandle.rpcMsgFp == NULL) continue;
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (pHandle->rpcMsgFp != NULL) {
dError("msg:%s, has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex],
pHandle->pWrapper->name, pWrapper->name);
return -1;
} else {
dDebug("msg:%s, will be processed by node:%s", tMsgInfo[msgIndex], pWrapper->name);
*pHandle = msgHandle;
}
}
}
return 0;
}
int32_t dndInitTrans(SDnode *pDnode) { int32_t dndInitTrans(SDnode *pDnode) {
dInfo("dnode-transport start to init");
if (dndSetMsgHandle(pDnode) != 0) {
return -1;
}
if (dndInitClient(pDnode) != 0) { if (dndInitClient(pDnode) != 0) {
return -1; return -1;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MNODE_FILE_H_
#define _TD_DND_MNODE_FILE_H_
#include "mmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mmReadFile(SDnode *pDnode);
int32_t mmWriteFile(SDnode *pDnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_FILE_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MNODE_HANDLE_H_
#define _TD_DND_MNODE_HANDLE_H_
#include "mmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
void mmInitMsgHandles(SMgmtWrapper *pWrapper);
SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex);
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_HANDLE_H_*/
\ No newline at end of file
...@@ -16,46 +16,26 @@ ...@@ -16,46 +16,26 @@
#ifndef _TD_DND_MNODE_MGMT_H_ #ifndef _TD_DND_MNODE_MGMT_H_
#define _TD_DND_MNODE_MGMT_H_ #define _TD_DND_MNODE_MGMT_H_
#include "dndInt.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "dndInt.h"
SMgmtFp mmGetMgmtFp();
// interface // interface
SMgmtFp mmGetMgmtFp();
int32_t mmInit(SDnode *pDnode); int32_t mmInit(SDnode *pDnode);
void mmCleanup(SDnode *pDnode); void mmCleanup(SDnode *pDnode);
int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg);
// mmFile int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmReadFile(SDnode *pDnode);
int32_t mmWriteFile(SDnode *pDnode);
// mmHandle
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo); SMonGrantInfo *pGrantInfo);
// mmMgmt
SMnode *mmAcquire(SDnode *pDnode);
void mmRelease(SDnode *pDnode, SMnode *pMnode);
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmDrop(SDnode *pDnode);
int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate);
// mmWorker
int32_t mmStartWorker(SDnode *pDnode);
void mmStopWorker(SDnode *pDnode);
void mmInitMsgFp(SMnodeMgmt *pMgmt);
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MNODE_MGMT_H_
#define _TD_DND_MNODE_MGMT_H_
#include "mmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
SMnode *mmAcquire(SDnode *pDnode);
void mmRelease(SDnode *pDnode, SMnode *pMnode);
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption);
int32_t mmDrop(SDnode *pDnode);
int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_MGMT_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_MNODE_WORKER_H_
#define _TD_DND_MNODE_WORKER_H_
#include "mmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mmStartWorker(SDnode *pDnode);
void mmStopWorker(SDnode *pDnode);
void mmInitMsgFp(SMnodeMgmt *pMgmt);
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_MNODE_WORKER_H_*/
\ No newline at end of file
...@@ -14,7 +14,8 @@ ...@@ -14,7 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmHandle.h"
#include "mmWorker.h"
#if 0 #if 0
#include "dndMgmt.h" #include "dndMgmt.h"
...@@ -122,7 +123,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro ...@@ -122,7 +123,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro
return code; return code;
} }
int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t mmGetUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnodeMgmt *pMgmt = &pDnode->mmgmt;
SMnode *pMnode = mmAcquire(pDnode); SMnode *pMnode = mmAcquire(pDnode);
...@@ -139,4 +140,94 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc ...@@ -139,4 +140,94 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc
return code; return code;
} }
#endif #endif
\ No newline at end of file
static void mmSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
}
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
// Requests handled by DNODE
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, mmProcessWriteMsg);
// Requests handled by MNODE
mmSetMsgHandle(pWrapper, TDMT_MND_CONNECT, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_USE_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_GRANT, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_AUTH, mmProcessReadMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, mmProcessReadMsg);
// Requests handled by VNODE
mmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, mmProcessWriteMsg);
mmSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, mmProcessWriteMsg);
}
SMsgHandle mmGetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgIndex) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return pMgmt->msgHandles[msgIndex];
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "mmHandle.h"
SMgmtFp mmGetMgmtFp() {
SMgmtFp mgmtFp = {0};
mgmtFp.getMsgHandleFp = mmGetMsgHandle;
}
int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
return 0;
}
...@@ -317,4 +317,5 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { ...@@ -317,4 +317,5 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
return 0; return 0;
} }
#endif #endif
\ No newline at end of file
...@@ -65,79 +65,7 @@ void mmStopWorker(SDnode *pDnode) { ...@@ -65,79 +65,7 @@ void mmStopWorker(SDnode *pDnode) {
} }
void mmInitMsgFp(SMnodeMgmt *pMgmt) { void mmInitMsgFp(SMnodeMgmt *pMgmt) {
// Requests handled by DNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_QNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_QNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_SNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_SNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_BNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_BNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_VNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_VNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = mmProcessWriteMsg;
// Requests handled by MNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONNECT)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_ACCT)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_ACCT)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_ACCT)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_USER)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_USER)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_USER)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CONFIG_DNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_MNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_MNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_QNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_QNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_SNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_SNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_BNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_BNODE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_USE_DB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_DB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYNC_DB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_COMPACT_DB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_FUNC)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_FUNC)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_STB)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TABLE_META)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = mmProcessReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_MQ_COMMIT_OFFSET)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = mmProcessReadMsg;
// Requests handled by VNODE
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_REB_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_STB_RSP)] = mmProcessWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg;
} }
static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) { static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册