提交 7729ce2e 编写于 作者: S Shengliang Guan

refactor: adjust SRpcMsg

上级 fc94b71c
...@@ -41,7 +41,7 @@ typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq); ...@@ -41,7 +41,7 @@ typedef int32_t (*PutToQueueFp)(void *pMgmt, SRpcMsg* pReq);
typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void *pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, const SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp); typedef void (*SendRspFp)(const SRpcMsg* pRsp);
typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp); typedef void (*SendMnodeRecvFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq, SRpcMsg* pRsp);
typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet); typedef void (*SendRedirectRspFp)(SMgmtWrapper* pWrapper, const SRpcMsg* pRsp, const SEpSet* pNewEpSet);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg); typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg* pMsg);
......
...@@ -19,49 +19,30 @@ ...@@ -19,49 +19,30 @@
static SMsgCb tsDefaultMsgCb; static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
// if (tsDefaultMsgCb.pWrapper == NULL) {
tsDefaultMsgCb = *pMsgCb;
//}
}
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
// cannot be empty, but not checked for faster detect
PutToQueueFp fp = pMsgCb->queueFps[qtype]; PutToQueueFp fp = pMsgCb->queueFps[qtype];
if (fp != NULL) { return (*fp)(pMsgCb->pMgmt, pReq);
return (*fp)(pMsgCb->pMgmt, pReq);
} else {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
} }
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) { int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
// cannot be empty, but not checked for faster detect
GetQueueSizeFp fp = pMsgCb->qsizeFp; GetQueueSizeFp fp = pMsgCb->qsizeFp;
if (fp != NULL) { return (*fp)(pMsgCb->pMgmt, vgId, qtype);
return (*fp)(pMsgCb->pMgmt, vgId, qtype);
} else {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
} }
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
// cannot be empty, but not checked for faster detect
SendReqFp fp = pMsgCb->sendReqFp; SendReqFp fp = pMsgCb->sendReqFp;
if (fp != NULL) { return (*fp)(pMsgCb->pWrapper, epSet, pReq);
return (*fp)(pMsgCb->pWrapper, epSet, pReq);
} else {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
} }
void tmsgSendRsp(SRpcMsg* pRsp) { void tmsgSendRsp(SRpcMsg* pRsp) {
// cannot be empty, but not checked for faster detect
SendRspFp fp = tsDefaultMsgCb.sendRspFp; SendRspFp fp = tsDefaultMsgCb.sendRspFp;
if (fp != NULL) { return (*fp)(pRsp);
return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
} }
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) { void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
......
...@@ -17,13 +17,8 @@ ...@@ -17,13 +17,8 @@
#include "bmInt.h" #include "bmInt.h"
static void bmSendErrorRsp(SRpcMsg *pMsg, int32_t code) { static void bmSendErrorRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rsp = {.code = code, .info = pMsg->info};
.info.handle = pMsg->info.handle, tmsgSendRsp(&rsp);
.info.ahandle = pMsg->info.ahandle,
.code = code,
.info.refId = pMsg->info.refId,
};
tmsgSendRsp(&rpcRsp);
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
...@@ -41,12 +36,12 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) { ...@@ -41,12 +36,12 @@ static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
} }
static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void bmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.info.handle = pMsg->info.handle, SRpcMsg rsp = {
.info.ahandle = pMsg->info.ahandle, .code = code,
.info.refId = pMsg->info.refId, .info = pMsg->info,
.code = code, .pCont = pMsg->info.rsp,
.pCont = pMsg->info.rsp, .contLen = pMsg->info.rspLen,
.contLen = pMsg->info.rspLen,}; };
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
...@@ -18,10 +18,8 @@ ...@@ -18,10 +18,8 @@
static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void qmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
}; };
......
...@@ -18,10 +18,8 @@ ...@@ -18,10 +18,8 @@
static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void smSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
}; };
......
...@@ -21,10 +21,8 @@ ...@@ -21,10 +21,8 @@
static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.code = code, .code = code,
.info = pMsg->info,
.pCont = pMsg->info.rsp, .pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen, .contLen = pMsg->info.rspLen,
}; };
...@@ -124,9 +122,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -124,9 +122,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); pMsg = *(SRpcMsg **)taosArrayGet(pArray, i);
pRpc = pMsg; pRpc = pMsg;
rsp.info.ahandle = pRpc->info.ahandle; rsp.info = pRpc->info;
rsp.info.handle = pRpc->info.handle;
rsp.info.refId = pRpc->info.refId;
rsp.pCont = NULL; rsp.pCont = NULL;
rsp.contLen = 0; rsp.contLen = 0;
...@@ -193,9 +189,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -193,9 +189,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
// if leader, send response // if leader, send response
if (pMsg->info.handle != NULL && pMsg->info.ahandle != NULL) { if (pMsg->info.handle != NULL && pMsg->info.ahandle != NULL) {
rsp.info.ahandle = pMsg->info.ahandle; rsp.info = pMsg->info;
rsp.info.handle = pMsg->info.handle;
rsp.info.refId = pMsg->info.refId;
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
...@@ -172,12 +172,7 @@ _OVER: ...@@ -172,12 +172,7 @@ _OVER:
code = TSDB_CODE_NODE_REDIRECT; code = TSDB_CODE_NODE_REDIRECT;
} }
} }
SRpcMsg rspMsg = { SRpcMsg rspMsg = {.code = code, .info = pRpc->info};
.info.handle = pRpc->info.handle,
.code = code,
.info.ahandle = pRpc->info.ahandle,
.info.refId = pRpc->info.refId,
};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
} }
...@@ -282,7 +277,8 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR ...@@ -282,7 +277,8 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR
return 0; return 0;
} }
static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { static inline void dmSendRsp(const SRpcMsg *pRsp) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
if (InChildProc(pWrapper->proc.ptype)) { if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP); dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
} else { } else {
......
...@@ -144,31 +144,29 @@ typedef enum { ...@@ -144,31 +144,29 @@ typedef enum {
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnType type; ETrnType type;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
void* rpcHandle; SRpcHandleInfo rpcInfo;
void* rpcAHandle; void* rpcRsp;
int64_t rpcRefId; int32_t rpcRspLen;
void* rpcRsp; SArray* redoLogs;
int32_t rpcRspLen; SArray* undoLogs;
SArray* redoLogs; SArray* commitLogs;
SArray* undoLogs; SArray* redoActions;
SArray* commitLogs; SArray* undoActions;
SArray* redoActions; int64_t createdTime;
SArray* undoActions; int64_t lastExecTime;
int64_t createdTime; int64_t dbUid;
int64_t lastExecTime; char dbname[TSDB_DB_FNAME_LEN];
int64_t dbUid; char lastError[TSDB_TRANS_ERROR_LEN];
char dbname[TSDB_DB_FNAME_LEN]; int32_t startFunc;
char lastError[TSDB_TRANS_ERROR_LEN]; int32_t stopFunc;
int32_t startFunc; int32_t paramLen;
int32_t stopFunc; void* param;
int32_t paramLen;
void* param;
} STrans; } STrans;
typedef struct { typedef struct {
......
...@@ -563,9 +563,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S ...@@ -563,9 +563,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->policy = policy; pTrans->policy = policy;
pTrans->type = type; pTrans->type = type;
pTrans->createdTime = taosGetTimestampMs(); pTrans->createdTime = taosGetTimestampMs();
pTrans->rpcHandle = pReq->info.handle; pTrans->rpcInfo = pReq->info;
pTrans->rpcAHandle = pReq->info.ahandle;
pTrans->rpcRefId = pReq->info.refId;
pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
...@@ -783,9 +781,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -783,9 +781,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1; return -1;
} }
pNew->rpcHandle = pTrans->rpcHandle; pNew->rpcInfo = pTrans->rpcInfo;
pNew->rpcAHandle = pTrans->rpcAHandle;
pNew->rpcRefId = pTrans->rpcRefId;
pNew->rpcRsp = pTrans->rpcRsp; pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen; pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
...@@ -839,7 +835,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -839,7 +835,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} }
} }
if (sendRsp && pTrans->rpcHandle != NULL) { if (sendRsp && pTrans->rpcInfo.handle != NULL) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen); void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) { if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen); memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
...@@ -847,17 +843,15 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -847,17 +843,15 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
taosMemoryFree(pTrans->rpcRsp); taosMemoryFree(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage, mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle); pTrans->rpcInfo.ahandle);
SRpcMsg rspMsg = { SRpcMsg rspMsg = {
.info.handle = pTrans->rpcHandle, .info = pTrans->rpcInfo,
.info.ahandle = pTrans->rpcAHandle,
.info.refId = pTrans->rpcRefId,
.code = code, .code = code,
.pCont = rpcCont, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen, .contLen = pTrans->rpcRspLen,
}; };
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcInfo.handle = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
} }
......
...@@ -107,9 +107,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -107,9 +107,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp); tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
_exit: _exit:
rpcMsg.info.handle = pMsg->info.handle; rpcMsg.info = pMsg->info;
rpcMsg.info.ahandle = pMsg->info.ahandle;
rpcMsg.info.refId = pMsg->info.refId;
rpcMsg.pCont = pRsp; rpcMsg.pCont = pRsp;
rpcMsg.contLen = rspLen; rpcMsg.contLen = rspLen;
rpcMsg.code = code; rpcMsg.code = code;
......
...@@ -27,6 +27,8 @@ extern "C" { ...@@ -27,6 +27,8 @@ extern "C" {
#include "tref.h" #include "tref.h"
#include "plannodes.h" #include "plannodes.h"
#include "trpc.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000
#define QW_DEFAULT_SCH_TASK_NUMBER 10000 #define QW_DEFAULT_SCH_TASK_NUMBER 10000
...@@ -74,18 +76,12 @@ typedef struct SQWDebug { ...@@ -74,18 +76,12 @@ typedef struct SQWDebug {
bool dumpEnable; bool dumpEnable;
} SQWDebug; } SQWDebug;
typedef struct SQWConnInfo {
void * handle;
void * ahandle;
int64_t refId;
} SQWConnInfo;
typedef struct SQWMsg { typedef struct SQWMsg {
void * node; void *node;
int32_t code; int32_t code;
char * msg; char *msg;
int32_t msgLen; int32_t msgLen;
SQWConnInfo connInfo; SRpcHandleInfo connInfo;
} SQWMsg; } SQWMsg;
typedef struct SQWHbParam { typedef struct SQWHbParam {
...@@ -96,7 +92,7 @@ typedef struct SQWHbParam { ...@@ -96,7 +92,7 @@ typedef struct SQWHbParam {
typedef struct SQWHbInfo { typedef struct SQWHbInfo {
SSchedulerHbRsp rsp; SSchedulerHbRsp rsp;
SQWConnInfo connInfo; SRpcHandleInfo connInfo;
} SQWHbInfo; } SQWHbInfo;
typedef struct SQWPhaseInput { typedef struct SQWPhaseInput {
...@@ -127,8 +123,8 @@ typedef struct SQWTaskCtx { ...@@ -127,8 +123,8 @@ typedef struct SQWTaskCtx {
bool queryInQueue; bool queryInQueue;
int32_t rspCode; int32_t rspCode;
SQWConnInfo ctrlConnInfo; SRpcHandleInfo ctrlConnInfo;
SQWConnInfo dataConnInfo; SRpcHandleInfo dataConnInfo;
int8_t events[QW_EVENT_MAX]; int8_t events[QW_EVENT_MAX];
...@@ -140,10 +136,10 @@ typedef struct SQWTaskCtx { ...@@ -140,10 +136,10 @@ typedef struct SQWTaskCtx {
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second int32_t lastAccessTs; // timestamp in second
SRWLatch hbConnLock; SRWLatch hbConnLock;
SQWConnInfo hbConnInfo; SRpcHandleInfo hbConnInfo;
SQueryNodeEpId hbEpId; SQueryNodeEpId hbEpId;
SRWLatch tasksLock; SRWLatch tasksLock;
SHashObj * tasksHash; // key:queryId+taskId, value: SQWTaskStatus SHashObj *tasksHash; // key:queryId+taskId, value: SQWTaskStatus
} SQWSchStatus; } SQWSchStatus;
// Qnode/Vnode level task management // Qnode/Vnode level task management
......
...@@ -30,21 +30,21 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); ...@@ -30,21 +30,21 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t code); int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code);
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwGetSchTasksStatus(SQWorker *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn); int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -536,7 +536,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -536,7 +536,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t resNum = 0; int32_t resNum = 0;
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo)); QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
SQWConnInfo connInfo = {0}; SRpcHandleInfo connInfo = {0};
connInfo.handle = ctx->ctrlConnInfo.handle; connInfo.handle = ctx->ctrlConnInfo.handle;
connInfo.refId = ctx->ctrlConnInfo.refId; connInfo.refId = ctx->ctrlConnInfo.refId;
...@@ -723,8 +723,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void ...@@ -723,8 +723,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx * ctx = NULL; SQWTaskCtx * ctx = NULL;
SQWConnInfo *dropConnection = NULL; SRpcHandleInfo *dropConnection = NULL;
SQWConnInfo *cancelConnection = NULL; SRpcHandleInfo *cancelConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
...@@ -842,10 +842,10 @@ _return: ...@@ -842,10 +842,10 @@ _return:
} }
int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0; int32_t code = 0;
SQWTaskCtx * ctx = NULL; SQWTaskCtx *ctx = NULL;
SQWConnInfo connInfo = {0}; SRpcHandleInfo connInfo = {0};
SQWConnInfo *readyConnection = NULL; SRpcHandleInfo *readyConnection = NULL;
QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase));
......
...@@ -43,7 +43,7 @@ void qwFreeFetchRsp(void *msg) { ...@@ -43,7 +43,7 @@ void qwFreeFetchRsp(void *msg) {
} }
} }
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code}; SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
...@@ -52,9 +52,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -52,9 +52,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP, .msgType = TDMT_VND_QUERY_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = msg, .pCont = msg,
.contLen = contLen, .contLen = contLen,
.code = code, .code = code,
...@@ -65,14 +63,13 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -65,14 +63,13 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP, .msgType = TDMT_VND_RES_READY_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.refId = pConn->refId,
.info.ahandle = NULL, .info.ahandle = NULL,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
...@@ -84,7 +81,7 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -84,7 +81,7 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) { int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo}; SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
...@@ -93,9 +90,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, ...@@ -93,9 +90,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP, .msgType = TDMT_VND_EXPLAIN_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
.code = 0, .code = 0,
...@@ -106,16 +101,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, ...@@ -106,16 +101,14 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP, .msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = contLen, .contLen = contLen,
.code = code, .code = code,
...@@ -126,7 +119,7 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_ ...@@ -126,7 +119,7 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) { if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
...@@ -135,9 +128,7 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 ...@@ -135,9 +128,7 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP, .msgType = TDMT_VND_FETCH_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength, .contLen = sizeof(*pRsp) + dataLength,
.code = code, .code = code,
...@@ -148,15 +139,13 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 ...@@ -148,15 +139,13 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP, .msgType = TDMT_VND_CANCEL_TASK_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
...@@ -166,15 +155,13 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -166,15 +155,13 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code; pRsp->code = code;
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP, .msgType = TDMT_VND_DROP_TASK_RSP,
.info.handle = pConn->handle, .info = pConn,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = code, .code = code,
...@@ -228,9 +215,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -228,9 +215,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
tSerializeSShowRsp(pBuf, bufLen, &showRsp); tSerializeSShowRsp(pBuf, bufLen, &showRsp);
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.info.handle = pMsg->info.handle, .info = pMsg->info,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.pCont = pBuf, .pCont = pBuf,
.contLen = bufLen, .contLen = bufLen,
.code = code, .code = code,
...@@ -246,9 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe ...@@ -246,9 +231,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe
pRsp->numOfRows = 0; pRsp->numOfRows = 0;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.info.handle = pMsg->info.handle, .info = pMsg->info,
.info.ahandle = pMsg->info.ahandle,
.info.refId = pMsg->info.refId,
.pCont = pRsp, .pCont = pRsp,
.contLen = sizeof(*pRsp), .contLen = sizeof(*pRsp),
.code = 0, .code = 0,
...@@ -258,7 +241,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe ...@@ -258,7 +241,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq *pFetchRe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
...@@ -292,7 +275,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -292,7 +275,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
...@@ -320,7 +303,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -320,7 +303,7 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SQWConnInfo *pConn) { int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
SSchedulerHbReq req = {0}; SSchedulerHbReq req = {0};
req.header.vgId = mgmt->nodeId; req.header.vgId = mgmt->nodeId;
req.sId = sId; req.sId = sId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册