提交 93e3a2f3 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj

...@@ -56,13 +56,14 @@ typedef struct { ...@@ -56,13 +56,14 @@ typedef struct {
ReleaseHandleFp releaseHandleFp; ReleaseHandleFp releaseHandleFp;
} SMsgCb; } SMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb);
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp); void tmsgSendRsp(const SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type); void tmsgReleaseHandle(void* handle, int8_t type);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType; typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType;
typedef struct SProcQueue SProcQueue; typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj; typedef struct SProcObj SProcObj;
......
...@@ -16,6 +16,10 @@ ...@@ -16,6 +16,10 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tmsgcb.h" #include "tmsgcb.h"
static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) { int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq); return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
} }
...@@ -32,12 +36,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { ...@@ -32,12 +36,12 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
} }
void tmsgSendRsp(const SMsgCb* pMsgCb, const SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg); (*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
} }
void tmsgReleaseHandle(const SMsgCb* pMsgCb, void* handle, int8_t type) { void tmsgReleaseHandle(void* handle, int8_t type) {
(*pMsgCb->releaseHandleFp)(pMsgCb->pWrapper, handle, type); (*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type);
} }
\ No newline at end of file
...@@ -33,6 +33,7 @@ ...@@ -33,6 +33,7 @@
#include "tthread.h" #include "tthread.h"
#include "ttime.h" #include "ttime.h"
#include "tworker.h" #include "tworker.h"
#include "tmsgcb.h"
#include "dnode.h" #include "dnode.h"
#include "monitor.h" #include "monitor.h"
...@@ -140,6 +141,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); ...@@ -140,6 +141,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
......
...@@ -77,6 +77,8 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { ...@@ -77,6 +77,8 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -120,8 +122,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ...@@ -120,8 +122,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
ProcFuncType ftype) { ProcFuncType ftype) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont; pRpc->pCont = pCont;
dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
pRpc->ahandle);
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pWrapper, pMsg); int32_t code = (*msgFp)(pWrapper, pMsg);
...@@ -142,13 +143,19 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ...@@ -142,13 +143,19 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) { ProcFuncType ftype) {
pMsg->pCont = pCont; pMsg->pCont = pCont;
dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle, dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle);
pMsg->ahandle);
switch (ftype) { switch (ftype) {
case PROC_REGISTER: case PROC_REG:
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
break; break;
case PROC_RELEASE:
rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code);
rpcFreeCont(pCont);
break;
case PROC_REQ:
// todo send to dnode
dndSendReqToMnode(pWrapper, pMsg);
default: default:
dndSendRpcRsp(pWrapper, pMsg); dndSendRpcRsp(pWrapper, pMsg);
break; break;
...@@ -164,6 +171,9 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -164,6 +171,9 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
pWrapper->required = dndRequireNode(pWrapper); pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
if (taosMkDir(pWrapper->path) != 0) { if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
...@@ -211,15 +221,23 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -211,15 +221,23 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dndClearNodesExecpt(pDnode, n); dndClearNodesExecpt(pDnode, n);
dInfo("node:%s, will be initialized in child process", pWrapper->name); dInfo("node:%s, will be initialized in child process", pWrapper->name);
dndOpenNode(pWrapper); if (dndOpenNode(pWrapper) != 0) {
dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr());
return -1;
}
if (taosProcRun(pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
break;
} else { } else {
dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc)); dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
pWrapper->procType = PROC_PARENT; pWrapper->procType = PROC_PARENT;
} if (taosProcRun(pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
if (taosProcRun(pProc) != 0) { return -1;
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); }
return -1;
} }
} }
......
...@@ -320,8 +320,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p ...@@ -320,8 +320,7 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p
} }
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
} else {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { if (dndGetStatus(pDnode) != DND_STAT_RUNNING) {
terrno = TSDB_CODE_DND_OFFLINE; terrno = TSDB_CODE_DND_OFFLINE;
...@@ -329,23 +328,24 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg ...@@ -329,23 +328,24 @@ int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg
return -1; return -1;
} }
return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); return dndSendRpcReq(&pDnode->trans, pEpSet, pReq);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) {
taosMsleep(1);
}
} }
} }
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
if (pWrapper->procType == PROC_CHILD) { SDnode *pDnode = pWrapper->pDnode;
} else { STransMgmt *pTrans = &pDnode->trans;
SDnode *pDnode = pWrapper->pDnode; SEpSet epSet = {0};
STransMgmt *pTrans = &pDnode->trans;
SEpSet epSet = {0};
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); SMgmtWrapper *pWrapper2 = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper != NULL) { if (pWrapper2 != NULL) {
dmGetMnodeEpSet(pWrapper->pMgmt, &epSet); dmGetMnodeEpSet(pWrapper2->pMgmt, &epSet);
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper2);
}
return dndSendRpcReq(pTrans, &epSet, pReq);
} }
return dndSendRpcReq(pTrans, &epSet, pReq);
} }
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
...@@ -363,29 +363,44 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { ...@@ -363,29 +363,44 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
} }
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
int32_t code = -1;
do {
code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
} else {
dndSendRpcRsp(pWrapper, pRsp); dndSendRpcRsp(pWrapper, pRsp);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) {
taosMsleep(1);
}
} }
} }
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
int32_t code = -1;
do {
code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
} else {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} else {
while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) {
taosMsleep(1);
}
} }
}
static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) {
if (pWrapper->procType != PROC_CHILD) {
rpcReleaseHandle(handle, type);
} else {
SRpcMsg msg = {.handle = handle, .code = type};
while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) {
taosMsleep(1);
}
}
}
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
SMsgCb msgCb = {
.pWrapper = pWrapper,
.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg,
.releaseHandleFp = dndReleaseHandle,
.sendMnodeReqFp = dndSendReqToMnode,
.sendReqFp = dndSendReqToDnode,
.sendRspFp = dndSendRsp,
};
return msgCb;
} }
\ No newline at end of file
...@@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
.ahandle = pTrans->rpcAHandle, .ahandle = pTrans->rpcAHandle,
.pCont = rpcCont, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen}; .contLen = pTrans->rpcRspLen};
tmsgSendRsp(&pMnode->msgCb, &rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcHandle = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
...@@ -898,7 +898,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -898,7 +898,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
} else { } else {
mDebug("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1; return -1;
} }
} }
...@@ -938,7 +938,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -938,7 +938,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return errCode; return errCode;
} }
} else { } else {
mDebug("trans:%d, %d of %d actions executed, code:0x%04x", pTrans->id, numOfReceived, numOfActions, errCode & 0XFFFF); mDebug("trans:%d, %d of %d actions executing", pTrans->id, numOfReceived, numOfActions);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
} }
......
...@@ -275,8 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -275,8 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg->pCont = NULL; pMsg->pCont = NULL;
pMsg->contLen = 0; pMsg->contLen = 0;
pMsg->code = -1; pMsg->code = -1;
ASSERT(0); tmsgSendRsp(pMsg);
rpcSendResponse(pMsg);
return 0; return 0;
} }
...@@ -341,7 +340,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -341,7 +340,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
rpcSendResponse(pMsg); tmsgSendRsp(pMsg);
return 0; return 0;
} }
} else { } else {
...@@ -369,7 +368,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -369,7 +368,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
rpcSendResponse(pMsg); tmsgSendRsp(pMsg);
/*}*/ /*}*/
return 0; return 0;
......
...@@ -205,8 +205,7 @@ _exit: ...@@ -205,8 +205,7 @@ _exit:
rpcMsg.contLen = rspLen; rpcMsg.contLen = rspLen;
rpcMsg.code = code; rpcMsg.code = code;
rpcSendResponse(&rpcMsg); tmsgSendRsp(&rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -276,8 +275,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -276,8 +275,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
.code = 0, .code = 0,
}; };
rpcSendResponse(&rpcMsg); tmsgSendRsp(&rpcMsg);
taosArrayDestroyEx(pArray, freeItemHelper); taosArrayDestroyEx(pArray, freeItemHelper);
return 0; return 0;
} }
...@@ -30,18 +30,18 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); ...@@ -30,18 +30,18 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
int32_t code); int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -432,7 +432,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -432,7 +432,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
rpcReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER);
ctx->connInfo.handle = NULL; ctx->connInfo.handle = NULL;
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
...@@ -764,7 +764,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -764,7 +764,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL; dropConnection = NULL;
qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
...@@ -802,7 +802,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -802,7 +802,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL; dropConnection = NULL;
qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
...@@ -830,12 +830,12 @@ _return: ...@@ -830,12 +830,12 @@ _return:
} }
if (dropConnection) { if (dropConnection) {
qwBuildAndSendDropRsp(&mgmt->msgCb, dropConnection, code); qwBuildAndSendDropRsp(dropConnection, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
} }
if (cancelConnection) { if (cancelConnection) {
qwBuildAndSendCancelRsp(&mgmt->msgCb, cancelConnection, code); qwBuildAndSendCancelRsp(cancelConnection, code);
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
} }
...@@ -886,7 +886,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -886,7 +886,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code); qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
...@@ -918,7 +918,7 @@ _return: ...@@ -918,7 +918,7 @@ _return:
} }
if (readyConnection) { if (readyConnection) {
qwBuildAndSendReadyRsp(&mgmt->msgCb, readyConnection, code); qwBuildAndSendReadyRsp(readyConnection, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
} }
...@@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { ...@@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_ERR_JRET(qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code)); QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
queryRsped = true; queryRsped = true;
...@@ -988,7 +988,7 @@ _return: ...@@ -988,7 +988,7 @@ _return:
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (!queryRsped) { if (!queryRsped) {
qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code); qwBuildAndSendQueryRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1051,7 +1051,7 @@ _return: ...@@ -1051,7 +1051,7 @@ _return:
} }
if (needRsp) { if (needRsp) {
qwBuildAndSendReadyRsp(&mgmt->msgCb, &qwMsg->connInfo, code); qwBuildAndSendReadyRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->connInfo; qwMsg->connInfo = ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else { } else {
atomic_store_8((int8_t*)&ctx->queryContinue, 1); atomic_store_8((int8_t*)&ctx->queryContinue, 1);
...@@ -1114,7 +1114,7 @@ _return: ...@@ -1114,7 +1114,7 @@ _return:
rsp = NULL; rsp = NULL;
qwMsg->connInfo = ctx->connInfo; qwMsg->connInfo = ctx->connInfo;
qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, 0, code); qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
} }
...@@ -1195,7 +1195,7 @@ _return: ...@@ -1195,7 +1195,7 @@ _return:
} }
if (code || rsp) { if (code || rsp) {
qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} }
...@@ -1226,7 +1226,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1226,7 +1226,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
...@@ -1261,7 +1261,7 @@ _return: ...@@ -1261,7 +1261,7 @@ _return:
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code); qwBuildAndSendDropRsp(&qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1282,7 +1282,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1282,7 +1282,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
rpcReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
} }
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
...@@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
qwBuildAndSendHbRsp(&mgmt->msgCb, &qwMsg->connInfo, &rsp, code); qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
...@@ -1351,7 +1351,7 @@ _return: ...@@ -1351,7 +1351,7 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
qwBuildAndSendHbRsp(&mgmt->msgCb, &rspList[j].connInfo, &rspList[j].rsp, code); qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
tFreeSSchedulerHbRsp(&rspList[j].rsp); tFreeSSchedulerHbRsp(&rspList[j].rsp);
......
...@@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) { ...@@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) {
} }
} }
int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code}; SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
...@@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t ...@@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code; pRsp->code = code;
...@@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t ...@@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
...@@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SScheduler ...@@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SScheduler
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) { if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
...@@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrie ...@@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrie
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code; pRsp->code = code;
...@@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_ ...@@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code; pRsp->code = code;
...@@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t ...@@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcRsp); tmsgSendRsp(&rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) { int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
int32_t numOfCols = 6; int32_t numOfCols = 6;
SVShowTablesRsp showRsp = {0}; SVShowTablesRsp showRsp = {0};
...@@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) ...@@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code)
.code = code, .code = code,
}; };
tmsgSendRsp(pMsgCb, &rpcMsg); tmsgSendRsp(&rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id); int32_t handle = htonl(pFetchReq->id);
...@@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTa ...@@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTa
.code = 0, .code = 0,
}; };
tmsgSendRsp(pMsgCb, &rpcMsg); tmsgSendRsp(&rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
_return: _return:
QW_ERR_RET(qwBuildAndSendCancelRsp(&mgmt->msgCb, &qwMsg.connInfo, code)); QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code));
QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code)); QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -584,11 +584,9 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -584,11 +584,9 @@ int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *pMgmt = qWorkerMgmt;
int32_t code = 0; int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont; SVShowTablesReq *pReq = pMsg->pCont;
QW_RET(qwBuildAndSendShowRsp(&pMgmt->msgCb, pMsg, code)); QW_RET(qwBuildAndSendShowRsp(pMsg, code));
} }
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...@@ -596,8 +594,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) ...@@ -596,8 +594,6 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *pMgmt = qWorkerMgmt;
SVShowTablesFetchReq *pFetchReq = pMsg->pCont; SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_RET(qwBuildAndSendShowFetchRsp(&pMgmt->msgCb, pMsg, pFetchReq)); QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
} }
...@@ -208,16 +208,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -208,16 +208,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
} }
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
int32_t rawBodyLen, ProcFuncType funcType) { int32_t rawBodyLen, ProcFuncType ftype) {
const int32_t headLen = CEIL8(rawHeadLen); const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
if (headLen <= 0 || bodyLen <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
taosThreadMutexLock(pQueue->mutex); taosThreadMutexLock(pQueue->mutex);
if (fullLen > pQueue->avail) { if (fullLen > pQueue->avail) {
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
...@@ -225,13 +220,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t ...@@ -225,13 +220,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
return -1; return -1;
} }
const int32_t pos = pQueue->tail;
if (pQueue->tail < pQueue->total) { if (pQueue->tail < pQueue->total) {
*(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen; *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen;
} else { } else {
*(int16_t *)(pQueue->pBuffer) = headLen; *(int16_t *)(pQueue->pBuffer) = headLen;
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen; *(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
} }
...@@ -250,19 +246,19 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t ...@@ -250,19 +246,19 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen); memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
pQueue->tail = headLen + bodyLen; pQueue->tail = headLen + bodyLen;
} else if (remain < 8 + headLen) { } else if (remain < 8 + headLen) {
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, remain - 8); memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8);
memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8)); memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
pQueue->tail = headLen - (remain - 8) + bodyLen; pQueue->tail = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->head + 8 + headLen, pBody, remain - 8 - headLen); memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen);
memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
pQueue->tail = bodyLen - (remain - 8 - headLen); pQueue->tail = bodyLen - (remain - 8 - headLen);
} else { } else {
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
memcpy(pQueue->pBuffer + pQueue->head + headLen + 8, pBody, rawBodyLen); memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen);
pQueue->tail = pQueue->head + headLen + bodyLen + 8; pQueue->tail = pQueue->tail + headLen + bodyLen + 8;
} }
} }
...@@ -271,8 +267,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t ...@@ -271,8 +267,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
headLen, pHead, bodyLen, pBody, funcType); pQueue->items, headLen, pHead, bodyLen, pBody);
return 0; return 0;
} }
...@@ -312,6 +308,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -312,6 +308,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
return -1; return -1;
} }
const int32_t pos = pQueue->head;
if (pQueue->head < pQueue->tail) { if (pQueue->head < pQueue->tail) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
...@@ -331,7 +328,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -331,7 +328,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
pQueue->head = headLen - (remain - 8) + bodyLen; pQueue->head = headLen - (remain - 8) + bodyLen;
} else if (remain < 8 + bodyLen) { } else if (remain < 8 + headLen + bodyLen) {
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
...@@ -353,8 +350,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea ...@@ -353,8 +350,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
*pBodyLen = bodyLen; *pBodyLen = bodyLen;
*pFuncType = (ProcFuncType)ftype; *pFuncType = (ProcFuncType)ftype;
uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
headLen, pHead, bodyLen, pBody, ftype); pQueue->items, headLen, pHead, bodyLen, pBody);
return 1; return 1;
} }
...@@ -472,11 +469,11 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -472,11 +469,11 @@ void taosProcCleanup(SProcObj *pProc) {
} }
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType funcType) { ProcFuncType ftype) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
} }
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
ProcFuncType funcType) { ProcFuncType ftype) {
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype);
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册