提交 2c694a50 编写于 作者: S Shengliang Guan

refactor: remove rpc client in executor and scanoperator

上级 00704f9d
......@@ -57,6 +57,7 @@ typedef struct {
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
ReleaseHandleFp releaseHandleFp;
ReportStartup reportStartupFp;
void* clientRpc;
} SMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
......
......@@ -26,8 +26,10 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper);
void dmCloseNode(SMgmtWrapper *pWrapper);
// dmTransport.c
int32_t dmInitTrans(SDnode *pDnode);
void dmCleanupTrans(SDnode *pDnode);
int32_t dmInitServer(SDnode *pDnode);
void dmCleanupServer(SDnode *pDnode);
int32_t dmInitClient(SDnode *pDnode);
void dmCleanupClient(SDnode *pDnode);
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper);
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper);
int32_t dmInitMsgHandle(SDnode *pDnode);
......
......@@ -213,10 +213,12 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
}
pWrapper->procType = DND_PROC_CHILD;
if (dmInitClient(pDnode) != 0) {
return -1;
}
SMsgCb msgCb = pDnode->data.msgCb;
msgCb.pWrapper = pWrapper;
tmsgSetDefaultMsgCb(&msgCb);
pDnode->data.msgCb = dmGetMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
......@@ -234,6 +236,15 @@ static int32_t dmOpenNodes(SDnode *pDnode) {
pWrapper->procType = DND_PROC_SINGLE;
}
if (n == DNODE) {
if (dmInitClient(pDnode) != 0) {
return -1;
}
pDnode->data.msgCb = dmGetMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
}
if (dmOpenNode(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
......@@ -281,21 +292,21 @@ static void dmProcessProcHandle(void *handle) {
}
static void dmWatchNodes(SDnode *pDnode) {
if (pDnode->ptype != DND_PROC_PARENT) return;
if (pDnode->ntype == NODE_END) return;
taosThreadMutexLock(&pDnode->mutex);
if (pDnode->ptype == DND_PROC_PARENT) {
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->procType != DND_PROC_PARENT) continue;
if (pDnode->ntype == NODE_END) continue;
for (EDndNodeType n = DNODE + 1; n < NODE_END; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->procType != DND_PROC_PARENT) continue;
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
if (pWrapper->procObj) {
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
}
dmNewNodeProc(pWrapper, n);
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
if (pWrapper->procObj) {
taosProcCloseHandles(pWrapper->procObj, dmProcessProcHandle);
}
dmNewNodeProc(pWrapper, n);
}
}
taosThreadMutexUnlock(&pDnode->mutex);
......
......@@ -381,7 +381,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
return -1;
}
if (dmInitTrans(pDnode) != 0) {
if (dmInitServer(pDnode) != 0) {
dError("failed to init transport since %s", terrstr());
return -1;
}
......@@ -412,7 +412,8 @@ static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
}
taosWUnLockLatch(&pDnode->data.latch);
dmCleanupTrans(pDnode);
dmCleanupClient(pDnode);
dmCleanupServer(pDnode);
dInfo("dnode-mgmt is cleaned up");
}
......
......@@ -124,9 +124,6 @@ SDnode *dmCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
dInfo("dnode is created, data:%p", pDnode);
code = 0;
......
......@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE
#include "dmImp.h"
#include "qworker.h"
#define INTERNAL_USER "_dnd"
#define INTERNAL_CKEY "_key"
#define INTERNAL_SECRET "_pwd"
......@@ -130,22 +132,27 @@ _OVER:
}
static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SDnodeTrans * pTrans = &pDnode->trans;
SDnodeTrans *pTrans = &pDnode->trans;
tmsg_t msgType = pMsg->msgType;
bool isReq = msgType & 1u;
SMsgHandle * pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)];
SMgmtWrapper *pWrapper = pHandle->pNdWrapper;
if (msgType == TDMT_DND_SERVER_STATUS) {
dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessServerStatusReq(pDnode, pMsg);
return;
}
if (msgType == TDMT_DND_NET_TEST) {
dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessNetTestReq(pDnode, pMsg);
return;
switch (msgType) {
case TDMT_DND_SERVER_STATUS:
dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessServerStatusReq(pDnode, pMsg);
return;
case TDMT_DND_NET_TEST:
dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle);
dmProcessNetTestReq(pDnode, pMsg);
return;
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
case TDMT_VND_FETCH_RSP:
dTrace("retrieve rsp is received");
qWorkerProcessFetchRsp(NULL, NULL, pMsg);
pMsg->pCont = NULL; // already freed in qworker
return;
}
if (pDnode->status != DND_STAT_RUNNING) {
......@@ -233,16 +240,6 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
return 0;
}
static inline int32_t dmSendRpcReq(SDnode *pDnode, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pDnode->trans.clientRpc == NULL) {
terrno = TSDB_CODE_NODE_OFFLINE;
return -1;
}
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
return 0;
}
static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) {
SEpSet epSet = {0};
dmGetMnodeEpSet(pDnode, &epSet);
......@@ -288,28 +285,20 @@ void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) {
}
static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pWrapper->pDnode->status != DND_STAT_RUNNING) {
SDnode *pDnode = pWrapper->pDnode;
if (pDnode->status != DND_STAT_RUNNING) {
terrno = TSDB_CODE_NODE_OFFLINE;
dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle);
return -1;
}
if (pWrapper->procType != DND_PROC_CHILD) {
return dmSendRpcReq(pWrapper->pDnode, pEpSet, pReq);
} else {
char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet));
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(pHead, pReq, sizeof(SRpcMsg));
memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen,
PROC_FUNC_REQ);
taosMemoryFree(pHead);
return 0;
if (pDnode->trans.clientRpc == NULL) {
terrno = TSDB_CODE_NODE_OFFLINE;
return -1;
}
rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL);
return 0;
}
static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
......@@ -396,9 +385,10 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
pMsg->pCont = pCont;
if (ftype == PROC_FUNC_REQ) {
ASSERT(1);
dTrace("msg:%p, get from parent queue, send req:%s handle:%p code:0x%04x, app:%p", pMsg, TMSG_INFO(pMsg->msgType),
pMsg->handle, code, pMsg->ahandle);
dmSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
dmSendReq(pWrapper, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg);
} else if (ftype == PROC_FUNC_RSP) {
dTrace("msg:%p, get from parent queue, rsp handle:%p code:0x%04x, app:%p", pMsg, pMsg->handle, code, pMsg->ahandle);
pMsg->refId = taosProcRemoveHandle(pWrapper->procObj, pMsg->handle);
......@@ -421,23 +411,25 @@ static void dmConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
}
SProcCfg dmGenProcCfg(SMgmtWrapper *pWrapper) {
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->procShm,
.parent = pWrapper,
.name = pWrapper->name};
SProcCfg cfg = {
.childConsumeFp = (ProcConsumeFp)dmConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dmConsumeParentQueue,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->procShm,
.parent = pWrapper,
.name = pWrapper->name,
};
return cfg;
}
bool rpcRfp(int32_t code) {
static bool rpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
......@@ -445,7 +437,7 @@ bool rpcRfp(int32_t code) {
}
}
static int32_t dmInitClient(SDnode *pDnode) {
int32_t dmInitClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
......@@ -471,11 +463,15 @@ static int32_t dmInitClient(SDnode *pDnode) {
return -1;
}
pDnode->data.msgCb = dmGetMsgcb(&pDnode->wrappers[DNODE]);
tmsgSetDefaultMsgCb(&pDnode->data.msgCb);
dDebug("dnode rpc client is initialized");
return 0;
}
static void dmCleanupClient(SDnode *pDnode) {
void dmCleanupClient(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->clientRpc) {
rpcClose(pTrans->clientRpc);
......@@ -517,7 +513,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void * pReq = rpcMallocCont(contLen);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
......@@ -543,7 +539,7 @@ static inline int32_t dmRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *s
return rpcRsp.code;
}
static int32_t dmInitServer(SDnode *pDnode) {
int32_t dmInitServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
SRpcInit rpcInit = {0};
......@@ -569,7 +565,7 @@ static int32_t dmInitServer(SDnode *pDnode) {
return 0;
}
static void dmCleanupServer(SDnode *pDnode) {
void dmCleanupServer(SDnode *pDnode) {
SDnodeTrans *pTrans = &pDnode->trans;
if (pTrans->serverRpc) {
rpcClose(pTrans->serverRpc);
......@@ -578,17 +574,6 @@ static void dmCleanupServer(SDnode *pDnode) {
}
}
int32_t dmInitTrans(SDnode *pDnode) {
if (dmInitServer(pDnode) != 0) return -1;
if (dmInitClient(pDnode) != 0) return -1;
return 0;
}
void dmCleanupTrans(SDnode *pDnode) {
dmCleanupServer(pDnode);
dmCleanupClient(pDnode);
}
SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.sendReqFp = dmSendReq,
......@@ -598,6 +583,7 @@ SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) {
.releaseHandleFp = dmReleaseHandle,
.reportStartupFp = dmReportStartupByWrapper,
.pWrapper = pWrapper,
.clientRpc = pWrapper->pDnode->trans.clientRpc,
};
return msgCb;
}
......@@ -171,16 +171,17 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
}
}
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pRpc) {
void dmProcessNetTestReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("net test req is received");
SRpcMsg rsp = {.handle = pRpc->handle, .refId = pRpc->refId, .ahandle = pRpc->ahandle, .code = 0};
rsp.pCont = rpcMallocCont(pRpc->contLen);
SRpcMsg rsp = {.handle = pReq->handle, .refId = pReq->refId, .ahandle = pReq->ahandle, .code = 0};
rsp.pCont = rpcMallocCont(pReq->contLen);
if (rsp.pCont == NULL) {
rsp.code = TSDB_CODE_OUT_OF_MEMORY;
} else {
rsp.contLen = pRpc->contLen;
rsp.contLen = pReq->contLen;
}
rpcSendResponse(&rsp);
rpcFreeCont(pReq->pCont);
}
void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
......@@ -208,6 +209,7 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
_OVER:
rpcSendResponse(&rspMsg);
rpcFreeCont(pReq->pCont);
}
void dmGetMonitorSysInfo(SMonSysInfo *pInfo) {
......
......@@ -320,7 +320,6 @@ typedef struct SExchangeInfo {
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
SMsgCb* pMsgCb;
SSDataBlock* pResult;
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
......@@ -661,8 +660,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(SMsgCb *pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
......
......@@ -3259,7 +3259,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createExchangeOperatorInfo(SMsgCb* pMsgCb, const SNodeList* pSources, SSDataBlock* pBlock,
SOperatorInfo* createExchangeOperatorInfo(void *pTransporter, const SNodeList* pSources, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
......@@ -3303,7 +3303,7 @@ SOperatorInfo* createExchangeOperatorInfo(SMsgCb* pMsgCb, const SNodeList* pSour
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
pInfo->pMsgCb = pMsgCb;
pInfo->pTransporter = pTransporter;
return pOperator;
......@@ -4770,7 +4770,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pHandle->pMsgCb, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
......
......@@ -1057,7 +1057,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pMsgSendInfo->fp = loadSysTableCallback;
int64_t transporterId = 0;
int32_t code = asyncSendMsgToServer(pInfo->readHandle.pMsgCb, NULL, &pInfo->epSet, &transporterId, pMsgSendInfo);
int32_t code =
asyncSendMsgToServer(pInfo->readHandle.pMsgCb->clientRpc, &pInfo->epSet, &transporterId, pMsgSendInfo);
tsem_wait(&pInfo->ready);
if (pTaskInfo->code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册