提交 6d0b51bf 编写于 作者: S Shengliang Guan

refactor: adjust SRpcMsg handle to info

上级 b765f21e
......@@ -38,24 +38,19 @@ typedef struct {
typedef struct {
// rpc info
struct {
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
};
void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
// app info
struct {
void *ahandle; // app handle set by client
void *proc; // proc handle
void *wrapper; // wrapper handle
void *node; // node mgmt handle
};
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
void *node; // node mgmt handle
// resp info
struct {
void *rsp;
int32_t rspLen;
};
void *rsp;
int32_t rspLen;
} SRpcHandleInfo;
typedef struct SRpcMsg {
......
......@@ -39,10 +39,10 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
return (*fp)(pMsgCb->pWrapper, epSet, pReq);
}
void tmsgSendRsp(SRpcMsg* pRsp) {
void tmsgSendRsp(SRpcMsg* pMsg) {
// cannot be empty, but not checked for faster detect
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
return (*fp)(pRsp);
return (*fp)(pMsg);
}
void tmsgSendRedirectRsp(SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
......
......@@ -143,7 +143,7 @@ int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SServerStatusRsp statusRsp = {0};
dmGetServerRunStatus(pMgmt, &statusRsp);
SRpcMsg rspMsg = {.info.handle = pMsg->info.handle, .info.ahandle = pMsg->info.ahandle, .info.refId = pMsg->info.refId};
SRpcMsg rspMsg = {.info = pMsg->info};
int32_t rspLen = tSerializeSServerStatusRsp(NULL, 0, &statusRsp);
if (rspLen < 0) {
rspMsg.code = TSDB_CODE_OUT_OF_MEMORY;
......
......@@ -150,10 +150,8 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
if (isRequest) {
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {
.info.handle = pMsg->info.handle,
.info.ahandle = pMsg->info.ahandle,
.code = code,
.info.refId = pMsg->info.refId,
.info = pMsg->info,
.pCont = pMsg->info.rsp,
.contLen = pMsg->info.rspLen,
};
......
......@@ -57,6 +57,12 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
dTrace("msg:%s is received, handle:%p cont:%p len:%d code:0x%04x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType),
pRpc->info.handle, pRpc->pCont, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId);
pRpc->info.noResp = 0;
pRpc->info.persistHandle = 0;
pRpc->info.wrapper = NULL;
pRpc->info.node = NULL;
pRpc->info.rsp = NULL;
pRpc->info.rspLen = 0;
if (pRpc->msgType == TDMT_DND_NET_TEST) {
dmProcessNetTestReq(pDnode, pRpc);
......@@ -72,12 +78,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
dmProcessServerStartupStatus(pDnode, pRpc);
} else {
SRpcMsg rspMsg = {
.info.handle = pRpc->info.handle,
.code = TSDB_CODE_APP_NOT_READY,
.info.ahandle = pRpc->info.ahandle,
.info.refId = pRpc->info.refId,
};
SRpcMsg rspMsg = {.info = pRpc->info, .code = TSDB_CODE_APP_NOT_READY};
rpcSendResponse(&rspMsg);
}
return;
......@@ -116,6 +117,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
goto _OVER;
} else {
needRelease = true;
pRpc->info.wrapper = pWrapper;
}
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
......@@ -251,12 +253,12 @@ static inline int32_t dmSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SR
return 0;
}
static inline void dmSendRsp(const SRpcMsg *pRsp) {
SMgmtWrapper *pWrapper = pRsp->info.wrapper;
static inline void dmSendRsp(const SRpcMsg *pMsg) {
SMgmtWrapper *pWrapper = pMsg->info.wrapper;
if (InChildProc(pWrapper->proc.ptype)) {
dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, DND_FUNC_RSP);
dmPutToProcPQueue(&pWrapper->proc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, DND_FUNC_RSP);
} else {
dmSendRpcRsp(pWrapper->pDnode, pRsp);
dmSendRpcRsp(pWrapper->pDnode, pMsg);
}
}
......
......@@ -34,19 +34,19 @@ int32_t mndProcessQueryMsg(SRpcMsg *pReq) {
}
}
int32_t mndProcessFetchMsg(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
mTrace("msg:%p, in fetch queue is processing", pReq);
int32_t mndProcessFetchMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
mTrace("msg:%p, in fetch queue is processing", pMsg);
switch (pReq->msgType) {
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pReq);
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pReq);
return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, pMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pReq);
return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, pMsg);
default:
mError("unknown msg type:%d in fetch queue", pReq->msgType);
mError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
......
......@@ -127,9 +127,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
SRpcMsg saveRpcMsg;
int32_t ret = syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &saveRpcMsg);
if (ret == 1 && cbMeta.state == TAOS_SYNC_STATE_LEADER) {
applyMsg.info.handle = saveRpcMsg.info.handle;
applyMsg.info.ahandle = saveRpcMsg.info.ahandle;
applyMsg.info.refId = saveRpcMsg.info.refId;
applyMsg.info = saveRpcMsg.info;
} else {
applyMsg.info.handle = NULL;
applyMsg.info.ahandle = NULL;
......
......@@ -535,12 +535,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
SExplainExecInfo *execInfo = NULL;
int32_t resNum = 0;
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
SRpcHandleInfo connInfo = {0};
connInfo.handle = ctx->ctrlConnInfo.handle;
connInfo.refId = ctx->ctrlConnInfo.refId;
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
QW_ERR_RET(qwBuildAndSendExplainRsp(&ctx->ctrlConnInfo, execInfo, resNum));
}
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
......@@ -865,8 +860,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
#else
connInfo.handle = ctx->ctrlConnInfo.handle;
connInfo.refId = ctx->ctrlConnInfo.refId;
connInfo = ctx->ctrlConnInfo;
readyConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
......@@ -943,9 +937,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
atomic_store_8(&ctx->taskType, taskType);
atomic_store_8(&ctx->explain, explain);
atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
atomic_store_64(&ctx->ctrlConnInfo.refId, qwMsg->connInfo.refId);
ctx->ctrlConnInfo = qwMsg->connInfo;
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -1010,8 +1002,7 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (ctx->phase == QW_PHASE_PRE_QUERY) {
ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
ctx->ctrlConnInfo = qwMsg->connInfo;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
needRsp = false;
QW_TASK_DLOG_E("ready msg will not rsp now");
......@@ -1244,8 +1235,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (!rsped) {
ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
ctx->ctrlConnInfo = qwMsg->connInfo;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......
......@@ -52,10 +52,10 @@ int32_t qwBuildAndSendQueryRsp(SRpcHandleInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_RSP,
.info = pConn,
.pCont = msg,
.contLen = contLen,
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
......@@ -69,12 +69,12 @@ int32_t qwBuildAndSendReadyRsp(SRpcHandleInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_RES_READY_RSP,
.info = pConn,
.info.ahandle = NULL,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
.info = *pConn,
};
rpcRsp.info.ahandle = NULL,
tmsgSendRsp(&rpcRsp);
......@@ -90,10 +90,10 @@ int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execIn
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_EXPLAIN_RSP,
.info = pConn,
.pCont = pRsp,
.contLen = contLen,
.code = 0,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
......@@ -108,10 +108,10 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_QUERY_HEARTBEAT_RSP,
.info = pConn,
.pCont = pRsp,
.contLen = contLen,
.pCont = pRsp,
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
......@@ -128,10 +128,10 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_FETCH_RSP,
.info = pConn,
.pCont = pRsp,
.contLen = sizeof(*pRsp) + dataLength,
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
......@@ -145,10 +145,10 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_CANCEL_TASK_RSP,
.info = pConn,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
......@@ -161,10 +161,10 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) {
SRpcMsg rpcRsp = {
.msgType = TDMT_VND_DROP_TASK_RSP,
.info = pConn,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
.info = *pConn,
};
tmsgSendRsp(&rpcRsp);
......@@ -254,13 +254,11 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->taskId = tId;
SRpcMsg pNewMsg = {
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.info.refId = pConn->refId,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
.info = *pConn,
};
int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
......@@ -289,13 +287,11 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
req->refId = htobe64(rId);
SRpcMsg pMsg = {
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.msgType = TDMT_VND_DROP_TASK,
.pCont = req,
.contLen = sizeof(STaskDropReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
.info = *pConn,
};
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
......@@ -325,13 +321,11 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
}
SRpcMsg pMsg = {
.info.handle = pConn->handle,
.info.ahandle = pConn->ahandle,
.info.refId = pConn->refId,
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = msgSize,
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
.info = *pConn,
};
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
......@@ -365,12 +359,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
char *sql = strndup(msg->msg, msg->sqlLen);
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
char *sql = strndup(msg->msg, msg->sqlLen);
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
taosMemoryFreeClear(sql);
......@@ -400,10 +390,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("processCQuery start, node:%p, handle:%p", node, pMsg->info.handle);
......@@ -435,10 +422,7 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("processReady start, node:%p, handle:%p", node, pMsg->info.handle);
......@@ -498,10 +482,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = 0;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
......@@ -540,10 +521,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
// QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
......@@ -579,10 +557,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
uint64_t tId = msg->taskId;
int64_t rId = msg->refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_TASK_DLOG("receive drop task due to network broken, error:%s", tstrerror(pMsg->code));
......@@ -618,11 +593,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->info.handle;
qwMsg.connInfo.ahandle = pMsg->info.ahandle;
qwMsg.connInfo.refId = pMsg->info.refId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册