提交 e10e8c8f 编写于 作者: S Shengliang Guan

shm

上级 003864f9
......@@ -2269,6 +2269,13 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
#pragma pack(pop)
struct SRpcMsg;
struct SEpSet;
typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq);
typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -50,7 +50,7 @@ typedef struct {
int32_t numOfDisks;
} SDnodeOpt;
typedef enum { DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent;
typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent;
/**
* @brief Initialize and start the dnode.
......
......@@ -53,10 +53,6 @@ typedef struct {
void *pNode;
} SNodeMsg;
typedef int32_t (*PutToQueueFp)(void *pMgmt, struct SRpcMsg *pReq);
typedef int32_t (*SendReqFp)(void *pMgmt, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendMnodeReqFp)(void *pMgmt, struct SRpcMsg *rpcMsg);
typedef int32_t (*SendRspFp)(void *pMgmt, struct SRpcMsg *rpcMsg);
typedef struct SRpcInit {
uint16_t localPort; // local port
char * label; // for debug purpose
......
......@@ -68,8 +68,8 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->numOfDisks = pOption->numOfDisks;
pDnode->rebootTime = taosGetTimestampMs();
if (pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL || pDnode->dataDir == NULL ||
pDnode->dataDir == NULL) {
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
pDnode->secondEp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -123,11 +123,6 @@ void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg);
......
......@@ -46,6 +46,7 @@ typedef struct SVnodesMgmt {
SMnode *pMnode;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
SDnodeWorker mgmtWorker;
} SVnodesMgmt;
typedef struct {
......@@ -96,11 +97,7 @@ void vmReleaseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl);
void vmCloseVnode(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -24,11 +24,11 @@ extern "C" {
void vmInitMsgHandles(SMgmtWrapper *pWrapper);
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq);
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
#ifdef __cplusplus
}
......
......@@ -35,6 +35,8 @@ int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
......
......@@ -55,7 +55,8 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S
pCfg->vgVersion = pCreate->vgVersion;
}
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SCreateVnodeReq createReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -113,7 +114,8 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
return 0;
}
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SAlterVnodeReq alterReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -154,7 +156,8 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
return code;
}
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -184,7 +187,8 @@ int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
return 0;
}
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SSyncVnodeReq syncReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
......@@ -207,7 +211,8 @@ int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
return 0;
}
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SRpcMsg *pReq) {
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SRpcMsg *pReq = &pMsg->rpcMsg;
SCompactVnodeReq compatcReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
......@@ -264,9 +269,9 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessCreateVnodeReq);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessAlterVnodeReq);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessDropVnodeReq);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessSyncVnodeReq);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessCompactVnodeReq);
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg);
}
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "vmWorker.h"
#include "vmMsg.h"
static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); }
......@@ -200,6 +201,46 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pQueryQ = NULL;
}
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed", pMsg);
switch (msgType) {
case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_ALTER_VNODE:
code = vmProcessAlterVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_DROP_VNODE:
code = vmProcessDropVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = vmProcessSyncVnodeReq(pMgmt, pMsg);
break;
case TDMT_DND_COMPACT_VNODE:
code = vmProcessCompactVnodeReq(pMgmt, pMsg);
break;
default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
code = -1;
dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType));
break;
}
if (msgType & 1u) {
if (code != 0) code = terrno;
SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
taosFreeQitem(pMsg);
dTrace("msg:%p, is freed", pMsg);
}
int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
int32_t maxFetchThreads = 4;
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
......@@ -230,14 +271,26 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
pWPool->max = maxSyncThreads;
if (tWWorkerInit(pWPool) != 0) return -1;
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr());
return -1;
}
dDebug("vnode workers is initialized");
return 0;
}
void vmStopWorker(SVnodesMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->mgmtWorker);
tFWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->queryPool);
tWWorkerCleanup(&pMgmt->writePool);
tWWorkerCleanup(&pMgmt->syncPool);
dDebug("vnode workers is closed");
}
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg, 0);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册