提交 a273559e 编写于 作者: M Minghao Li

add send redirect

上级 b96e434a
...@@ -42,6 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy ...@@ -42,6 +42,7 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type); typedef void (*ReleaseHandleFp)(SMgmtWrapper* pWrapper, void* handle, int8_t type);
typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc); typedef void (*ReportStartup)(SMgmtWrapper* pWrapper, const char* name, const char* desc);
...@@ -52,6 +53,7 @@ typedef struct { ...@@ -52,6 +53,7 @@ typedef struct {
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
SendRedirectRspFp sendRedirectRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp; ReportStartup reportStartupFp;
...@@ -62,6 +64,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); ...@@ -62,6 +64,7 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgSendRsp(const SRpcMsg* pRsp);
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(void* handle, int8_t type); void tmsgReleaseHandle(void* handle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc); void tmsgReportStartup(const char* name, const char* desc);
......
...@@ -34,6 +34,10 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { ...@@ -34,6 +34,10 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
return (*tsDefaultMsgCb.sendRedirectRspFp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
}
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
} }
......
...@@ -16,8 +16,8 @@ ...@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmImp.h" #include "dmImp.h"
#define INTERNAL_USER "_dnd" #define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key" #define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd" #define INTERNAL_SECRET "_pwd"
static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dmGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) {
...@@ -128,10 +128,10 @@ _OVER: ...@@ -128,10 +128,10 @@ _OVER:
} }
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans * pTrans = &pDnode->trans; SDnodeTrans *pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType; tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u; bool isReq = msgType & 1u;
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pNdWrapper; SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
if (msgType == TDMT_DND_SERVER_STATUS) { if (msgType == TDMT_DND_SERVER_STATUS) {
...@@ -309,6 +309,15 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { ...@@ -309,6 +309,15 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
} }
} }
static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) {
ASSERT(pRsp->code == TSDB_CODE_NODE_REDIRECT);
if (pWrapper->procType != DND_PROC_CHILD) {
rpcSendRedirectRsp(pRsp->handle, pNewEpSet);
} else {
taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP);
}
}
static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { static inline void dmRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != DND_PROC_CHILD) { if (pWrapper->procType != DND_PROC_CHILD) {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
...@@ -464,7 +473,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s ...@@ -464,7 +473,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq authReq = {0}; SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq); int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen); void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq); tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
...@@ -538,6 +547,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { ...@@ -538,6 +547,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = { SMsgCb msgCb = {
.sendReqFp = dmSendReq, .sendReqFp = dmSendReq,
.sendRspFp = dmSendRsp, .sendRspFp = dmSendRsp,
.sendRedirectRspFp = dmSendRedirectRsp,
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg, .registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
.releaseHandleFp = dmReleaseHandle, .releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartupByWrapper, .reportStartupFp = dmReportStartupByWrapper,
......
...@@ -136,7 +136,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -136,7 +136,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// sync integration response // sync integration response
for (int i = 0; i < taosArrayGetSize(pArray); i++) { for (int i = 0; i < taosArrayGetSize(pArray); i++) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg * pRpc; SRpcMsg *pRpc;
pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
pRpc = &pMsg->rpcMsg; pRpc = &pMsg->rpcMsg;
...@@ -151,6 +151,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -151,6 +151,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) {
rsp.code = TSDB_CODE_SYN_NOT_LEADER; rsp.code = TSDB_CODE_SYN_NOT_LEADER;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
// SEpSet epSet; TODO
} else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR; rsp.code = TSDB_CODE_SYN_INTERNAL_ERROR;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
...@@ -175,7 +177,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -175,7 +177,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg * pMsg = NULL; SNodeMsg *pMsg = NULL;
SRpcMsg rsp; SRpcMsg rsp;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
...@@ -218,7 +220,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -218,7 +220,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg * pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -231,7 +233,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -231,7 +233,7 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle; SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg * pMsg = NULL; SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
...@@ -248,7 +250,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -248,7 +250,7 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
} }
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg * pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
pHead->contLen = ntohl(pHead->contLen); pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId); pHead->vgId = ntohl(pHead->vgId);
...@@ -317,7 +319,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -317,7 +319,7 @@ int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt * pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->mgmtWorker; SSingleWorker *pWorker = &pMgmt->mgmtWorker;
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
...@@ -325,7 +327,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -325,7 +327,7 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt * pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
...@@ -335,7 +337,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -335,7 +337,7 @@ int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHead * pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) return -1; if (pVnode == NULL) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册