未验证 提交 a5af4547 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11071 from taosdata/feature/shm

remove rpcSendResponse in qnode
...@@ -41,14 +41,16 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy ...@@ -41,14 +41,16 @@ typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueTy
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq); typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp); typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
typedef void (*RegisterBrokenLinkArgFp)(SMgmtWrapper* pWrapper, SRpcMsg *pMsg);
typedef struct { typedef struct {
SMgmtWrapper* pWrapper; SMgmtWrapper* pWrapper;
PutToQueueFp queueFps[QUEUE_MAX]; PutToQueueFp queueFps[QUEUE_MAX];
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp; SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
} SMsgCb; } SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
...@@ -56,6 +58,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); ...@@ -56,6 +58,7 @@ int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq); int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp); void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -49,11 +49,13 @@ typedef struct SRpcMsg { ...@@ -49,11 +49,13 @@ typedef struct SRpcMsg {
} SRpcMsg; } SRpcMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
SRpcMsg rpcMsg; uint32_t clientIp;
int32_t rspLen; uint16_t clientPort;
void * pRsp; SRpcMsg rpcMsg;
void * pNode; int32_t rspLen;
void *pRsp;
void *pNode;
} SNodeMsg; } SNodeMsg;
typedef struct SRpcInit { typedef struct SRpcInit {
......
...@@ -22,11 +22,14 @@ ...@@ -22,11 +22,14 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { PROC_REQ, PROC_RSP, PROC_REGISTER } ProcFuncType;
typedef struct SProcQueue SProcQueue; typedef struct SProcQueue SProcQueue;
typedef struct SProcObj SProcObj; typedef struct SProcObj SProcObj;
typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcMallocFp)(int32_t contLen);
typedef void *(*ProcFreeFp)(void *pCont); typedef void *(*ProcFreeFp)(void *pCont);
typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype);
typedef struct { typedef struct {
int32_t childQueueSize; int32_t childQueueSize;
...@@ -52,9 +55,10 @@ int32_t taosProcRun(SProcObj *pProc); ...@@ -52,9 +55,10 @@ int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc); void taosProcStop(SProcObj *pProc);
bool taosProcIsChild(SProcObj *pProc); bool taosProcIsChild(SProcObj *pProc);
int32_t taosProcChildId(SProcObj *pProc); int32_t taosProcChildId(SProcObj *pProc);
int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); ProcFuncType ftype);
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
ProcFuncType ftype);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -33,3 +33,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { ...@@ -33,3 +33,7 @@ int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
} }
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); } void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
}
\ No newline at end of file
...@@ -24,6 +24,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { ...@@ -24,6 +24,7 @@ static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }
......
...@@ -139,6 +139,7 @@ void dndSendMonitorReport(SDnode *pDnode); ...@@ -139,6 +139,7 @@ void dndSendMonitorReport(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp); void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
......
...@@ -56,6 +56,7 @@ void dndCleanupServer(SDnode *pDnode); ...@@ -56,6 +56,7 @@ void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode); int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -116,11 +116,12 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { ...@@ -116,11 +116,12 @@ static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
} }
} }
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
pRpc->pCont = pCont; pRpc->pCont = pCont;
dTrace("msg:%p, get from child process queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), dTrace("msg:%p, get from child queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle,
pRpc->handle, pRpc->ahandle); pRpc->ahandle);
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pWrapper, pMsg); int32_t code = (*msgFp)(pWrapper, pMsg);
...@@ -138,13 +139,21 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t ...@@ -138,13 +139,21 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
} }
} }
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, int32_t msgLen, void *pCont, int32_t contLen) { static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
pRpc->pCont = pCont; ProcFuncType ftype) {
dTrace("msg:%p, get from parent process queue, type:%s handle:%p app:%p", pRpc, TMSG_INFO(pRpc->msgType), pMsg->pCont = pCont;
pRpc->handle, pRpc->ahandle); dTrace("msg:%p, get from parent queue, type:%s handle:%p app:%p", pMsg, TMSG_INFO(pMsg->msgType), pMsg->handle,
pMsg->ahandle);
dndSendRsp(pWrapper, pRpc); switch (ftype) {
taosMemoryFree(pRpc); case PROC_REGISTER:
rpcRegisterBrokenLinkArg(pMsg);
break;
default:
dndSendRpcRsp(pWrapper, pMsg);
break;
}
taosMemoryFree(pMsg);
} }
static int32_t dndRunInMultiProcess(SDnode *pDnode) { static int32_t dndRunInMultiProcess(SDnode *pDnode) {
......
...@@ -42,6 +42,8 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { ...@@ -42,6 +42,8 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
} }
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->clientIp = connInfo.clientIp;
pMsg->clientPort = connInfo.clientPort;
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg)); memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0; return 0;
} }
...@@ -66,7 +68,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { ...@@ -66,7 +68,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
} else if (pWrapper->procType == PROC_PARENT) { } else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle, dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
pRpc->ahandle, pMsg->user); pRpc->ahandle, pMsg->user);
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen); code = taosProcPutToChildQ(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, PROC_REQ);
} else { } else {
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
pMsg->user); pMsg->user);
......
...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { ...@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
} }
} }
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) { if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
if (pDnodeWrapper != NULL) { if (pDnodeWrapper != NULL) {
...@@ -366,7 +366,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { ...@@ -366,7 +366,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pWrapper->procType == PROC_CHILD) { if (pWrapper->procType == PROC_CHILD) {
int32_t code = -1; int32_t code = -1;
do { do {
code = taosProcPutToParentQueue(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen); code = taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP);
if (code != 0) { if (code != 0) {
taosMsleep(10); taosMsleep(10);
} }
...@@ -375,3 +375,17 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { ...@@ -375,3 +375,17 @@ void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
dndSendRpcRsp(pWrapper, pRsp); dndSendRpcRsp(pWrapper, pRsp);
} }
} }
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType == PROC_CHILD) {
int32_t code = -1;
do {
code = taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGISTER);
if (code != 0) {
taosMsleep(10);
}
} while (code != 0);
} else {
rpcRegisterBrokenLinkArg(pMsg);
}
}
\ No newline at end of file
...@@ -52,6 +52,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { ...@@ -52,6 +52,7 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }
......
...@@ -27,6 +27,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { ...@@ -27,6 +27,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }
......
...@@ -24,6 +24,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { ...@@ -24,6 +24,7 @@ static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
pOption->msgCb = msgCb; pOption->msgCb = msgCb;
} }
......
...@@ -137,6 +137,7 @@ static void *vmOpenVnodeFunc(void *param) { ...@@ -137,6 +137,7 @@ static void *vmOpenVnodeFunc(void *param) {
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
......
...@@ -91,6 +91,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -91,6 +91,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
msgCb.sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg;
vnodeCfg.msgCb = msgCb; vnodeCfg.msgCb = msgCb;
vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.pTfs = pMgmt->pTfs;
......
...@@ -44,7 +44,8 @@ typedef struct { ...@@ -44,7 +44,8 @@ typedef struct {
SQueryDesc *pQueries; SQueryDesc *pQueries;
} SConnObj; } SConnObj;
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime); static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
const char *app, int64_t startTime);
static void mndFreeConn(SConnObj *pConn); static void mndFreeConn(SConnObj *pConn);
static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId); static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId);
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
...@@ -94,7 +95,8 @@ void mndCleanupProfile(SMnode *pMnode) { ...@@ -94,7 +95,8 @@ void mndCleanupProfile(SMnode *pMnode) {
} }
} }
static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, const char *app, int64_t startTime) { static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, uint32_t ip, uint16_t port, int32_t pid,
const char *app, int64_t startTime) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt; SProfileMgmt *pMgmt = &pMnode->profileMgmt;
int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1);
...@@ -104,8 +106,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, ...@@ -104,8 +106,8 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
SConnObj connObj = {.id = connId, SConnObj connObj = {.id = connId,
.appStartTimeMs = startTime, .appStartTimeMs = startTime,
.pid = pid, .pid = pid,
.ip = pInfo->clientIp, .ip = ip,
.port = pInfo->clientPort, .port = port,
.killed = 0, .killed = 0,
.loginTimeMs = taosGetTimestampMs(), .loginTimeMs = taosGetTimestampMs(),
.lastAccessTimeMs = 0, .lastAccessTimeMs = 0,
...@@ -114,17 +116,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid, ...@@ -114,17 +116,17 @@ static SConnObj *mndCreateConn(SMnode *pMnode, SRpcConnInfo *pInfo, int32_t pid,
.pQueries = NULL}; .pQueries = NULL};
connObj.lastAccessTimeMs = connObj.loginTimeMs; connObj.lastAccessTimeMs = connObj.loginTimeMs;
tstrncpy(connObj.user, pInfo->user, TSDB_USER_LEN); tstrncpy(connObj.user, user, TSDB_USER_LEN);
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
int32_t keepTime = tsShellActivityTimer * 3; int32_t keepTime = tsShellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) { if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("conn:%d, failed to put into cache since %s, user:%s", connId, pInfo->user, terrstr()); mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
return NULL; return NULL;
} else { } else {
mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, pInfo->user); mTrace("conn:%d, is created, data:%p user:%s", pConn->id, pConn, user);
return pConn; return pConn;
} }
} }
...@@ -184,20 +186,14 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -184,20 +186,14 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t code = -1; int32_t code = -1;
SConnectReq connReq = {0}; SConnectReq connReq = {0};
char ip[30] = {0};
if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) { if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto CONN_OVER; goto CONN_OVER;
} }
SRpcConnInfo info = {0}; taosIp2String(pReq->clientIp, ip);
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
goto CONN_OVER;
}
char ip[30];
taosIp2String(info.clientIp, ip);
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
...@@ -216,7 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -216,7 +212,8 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
} }
} }
pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime); pConn =
mndCreateConn(pMnode, pReq->user, pReq->clientIp, pReq->clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER; goto CONN_OVER;
...@@ -241,7 +238,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) { ...@@ -241,7 +238,7 @@ static int32_t mndProcessConnectReq(SNodeMsg *pReq) {
pReq->rspLen = contLen; pReq->rspLen = contLen;
pReq->pRsp = pRsp; pReq->pRsp = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app); mDebug("user:%s, login from %s, conn:%d, app:%s", pReq->user, ip, pConn->id, connReq.app);
code = 0; code = 0;
......
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
*/ */
#include "mndQuery.h" #include "mndQuery.h"
#include "mndMnode.h"
#include "executor.h" #include "executor.h"
#include "mndMnode.h"
#include "qworker.h" #include "qworker.h"
int32_t mndProcessQueryMsg(SNodeMsg *pReq) { int32_t mndProcessQueryMsg(SNodeMsg *pReq) {
mTrace("message in query queue is processing"); SMnode *pMnode = pReq->pNode;
SMnode *pMnode = pReq->pNode;
SReadHandle handle = {0}; SReadHandle handle = {0};
mTrace("msg:%p, in query queue is processing", pReq);
switch (pReq->rpcMsg.msgType) { switch (pReq->rpcMsg.msgType) {
case TDMT_VND_QUERY: case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg); return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg);
...@@ -35,9 +35,9 @@ int32_t mndProcessQueryMsg(SNodeMsg *pReq) { ...@@ -35,9 +35,9 @@ int32_t mndProcessQueryMsg(SNodeMsg *pReq) {
} }
int32_t mndProcessFetchMsg(SNodeMsg *pReq) { int32_t mndProcessFetchMsg(SNodeMsg *pReq) {
mTrace("message in fetch queue is processing");
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
mTrace("msg:%p, in fetch queue is processing", pReq);
switch (pReq->rpcMsg.msgType) { switch (pReq->rpcMsg.msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg); return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
...@@ -52,9 +52,9 @@ int32_t mndProcessFetchMsg(SNodeMsg *pReq) { ...@@ -52,9 +52,9 @@ int32_t mndProcessFetchMsg(SNodeMsg *pReq) {
} }
int32_t mndInitQuery(SMnode *pMnode) { int32_t mndInitQuery(SMnode *pMnode) {
int32_t code = qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb); if (qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
if (code) { mError("failed to init qworker in mnode since %s", terrstr());
return code; return -1;
} }
mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg); mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg);
...@@ -67,4 +67,3 @@ int32_t mndInitQuery(SMnode *pMnode) { ...@@ -67,4 +67,3 @@ int32_t mndInitQuery(SMnode *pMnode) {
} }
void mndCleanupQuery(SMnode *pMnode) { qWorkerDestroy((void **)&pMnode->pQuery); } void mndCleanupQuery(SMnode *pMnode) { qWorkerDestroy((void **)&pMnode->pQuery); }
...@@ -54,7 +54,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); ...@@ -54,7 +54,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
static void mndTransSendRpcRsp(STrans *pTrans); static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransReq(SNodeMsg *pReq); static int32_t mndProcessTransReq(SNodeMsg *pReq);
static int32_t mndProcessKillTransReq(SNodeMsg *pReq); static int32_t mndProcessKillTransReq(SNodeMsg *pReq);
...@@ -737,7 +737,7 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { ...@@ -737,7 +737,7 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
return 0; return 0;
} }
static void mndTransSendRpcRsp(STrans *pTrans) { static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
bool sendRsp = false; bool sendRsp = false;
if (pTrans->stage == TRN_STAGE_FINISHED) { if (pTrans->stage == TRN_STAGE_FINISHED) {
...@@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(STrans *pTrans) { ...@@ -771,7 +771,7 @@ static void mndTransSendRpcRsp(STrans *pTrans) {
.ahandle = pTrans->rpcAHandle, .ahandle = pTrans->rpcAHandle,
.pCont = rpcCont, .pCont = rpcCont,
.contLen = pTrans->rpcRspLen}; .contLen = pTrans->rpcRspLen};
rpcSendResponse(&rspMsg); tmsgSendRsp(&pMnode->msgCb, &rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcHandle = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0; pTrans->rpcRspLen = 0;
...@@ -1158,7 +1158,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { ...@@ -1158,7 +1158,7 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
} }
} }
mndTransSendRpcRsp(pTrans); mndTransSendRpcRsp(pMnode, pTrans);
} }
static int32_t mndProcessTransReq(SNodeMsg *pReq) { static int32_t mndProcessTransReq(SNodeMsg *pReq) {
......
...@@ -30,21 +30,20 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); ...@@ -30,21 +30,20 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg);
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req); int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req);
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code); int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code); int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code);
void qwFreeFetchRsp(void *msg); void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -763,8 +763,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -763,8 +763,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
dropConnection = &ctx->connInfo; dropConnection = &ctx->connInfo;
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL; dropConnection = NULL;
qwBuildAndSendDropRsp(&ctx->connInfo, code); qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
...@@ -802,9 +802,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu ...@@ -802,9 +802,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL; dropConnection = NULL;
qwBuildAndSendDropRsp(&ctx->connInfo, code); qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
...@@ -830,12 +830,12 @@ _return: ...@@ -830,12 +830,12 @@ _return:
} }
if (dropConnection) { if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, code); qwBuildAndSendDropRsp(&mgmt->msgCb, dropConnection, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", dropConnection->handle, code, tstrerror(code));
} }
if (cancelConnection) { if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, code); qwBuildAndSendCancelRsp(&mgmt->msgCb, cancelConnection, code);
QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code));
} }
...@@ -886,9 +886,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp ...@@ -886,9 +886,9 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
qwBuildAndSendDropRsp(&ctx->connInfo, code); qwBuildAndSendDropRsp(&mgmt->msgCb, &ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
} }
...@@ -918,7 +918,7 @@ _return: ...@@ -918,7 +918,7 @@ _return:
} }
if (readyConnection) { if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, code); qwBuildAndSendReadyRsp(&mgmt->msgCb, readyConnection, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", readyConnection->handle, code, tstrerror(code));
} }
...@@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { ...@@ -970,7 +970,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); QW_ERR_JRET(qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code));
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
queryRsped = true; queryRsped = true;
...@@ -986,9 +986,9 @@ _return: ...@@ -986,9 +986,9 @@ _return:
input.code = code; input.code = code;
code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL);
if (!queryRsped) { if (!queryRsped) {
qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); qwBuildAndSendQueryRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1051,7 +1051,7 @@ _return: ...@@ -1051,7 +1051,7 @@ _return:
} }
if (needRsp) { if (needRsp) {
qwBuildAndSendReadyRsp(&qwMsg->connInfo, code); qwBuildAndSendReadyRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("ready msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1095,7 +1095,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
qwMsg->connInfo = ctx->connInfo; qwMsg->connInfo = ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} else { } else {
atomic_store_8((int8_t*)&ctx->queryContinue, 1); atomic_store_8((int8_t*)&ctx->queryContinue, 1);
...@@ -1114,7 +1114,7 @@ _return: ...@@ -1114,7 +1114,7 @@ _return:
rsp = NULL; rsp = NULL;
qwMsg->connInfo = ctx->connInfo; qwMsg->connInfo = ctx->connInfo;
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code); qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
} }
...@@ -1195,7 +1195,7 @@ _return: ...@@ -1195,7 +1195,7 @@ _return:
} }
if (code || rsp) { if (code || rsp) {
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code); qwBuildAndSendFetchRsp(&mgmt->msgCb, &qwMsg->connInfo, rsp, dataLen, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen); QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), dataLen);
} }
...@@ -1226,9 +1226,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1226,9 +1226,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING);
} else if (ctx->phase > 0) { } else if (ctx->phase > 0) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code); qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS())); QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
rsped = true; rsped = true;
} else { } else {
...@@ -1241,7 +1241,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -1241,7 +1241,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
} }
_return: _return:
if (code) { if (code) {
...@@ -1261,7 +1261,7 @@ _return: ...@@ -1261,7 +1261,7 @@ _return:
} }
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qwBuildAndSendDropRsp(&qwMsg->connInfo, code); qwBuildAndSendDropRsp(&mgmt->msgCb, &qwMsg->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
} }
...@@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1297,7 +1297,7 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); qwBuildAndSendHbRsp(&mgmt->msgCb, &qwMsg->connInfo, &rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
...@@ -1351,8 +1351,9 @@ _return: ...@@ -1351,8 +1351,9 @@ _return:
QW_UNLOCK(QW_READ, &mgmt->schLock); QW_UNLOCK(QW_READ, &mgmt->schLock);
for (int32_t j = 0; j < i; ++j) { for (int32_t j = 0; j < i; ++j) {
qwBuildAndSendHbRsp(&rspList[j].connInfo, &rspList[j].rsp, code); qwBuildAndSendHbRsp(&mgmt->msgCb, &rspList[j].connInfo, &rspList[j].rsp, code);
QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code), (rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0)); QW_DLOG("hb rsp send, handle:%p, code:%x - %s, taskNum:%d", rspList[j].connInfo.handle, code, tstrerror(code),
(rspList[j].rsp.taskStatus ? (int32_t)taosArrayGetSize(rspList[j].rsp.taskStatus) : 0));
tFreeSSchedulerHbRsp(&rspList[j].rsp); tFreeSSchedulerHbRsp(&rspList[j].rsp);
} }
......
...@@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) { ...@@ -46,7 +46,7 @@ void qwFreeFetchRsp(void *msg) {
} }
} }
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendQueryRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp rsp = {.code = code}; SQueryTableRsp rsp = {.code = code};
int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp);
...@@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -62,12 +62,12 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); tmsgSendRsp(pMsgCb, &rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendReadyRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code; pRsp->code = code;
...@@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -80,12 +80,12 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); tmsgSendRsp(pMsgCb, &rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) { int32_t qwBuildAndSendHbRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus); int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
void *pRsp = rpcMallocCont(contLen); void *pRsp = rpcMallocCont(contLen);
tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus); tSerializeSSchedulerHbRsp(pRsp, contLen, pStatus);
...@@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_ ...@@ -99,12 +99,12 @@ int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); tmsgSendRsp(pMsgCb, &rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) { int32_t qwBuildAndSendFetchRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
if (NULL == pRsp) { if (NULL == pRsp) {
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp)); memset(pRsp, 0, sizeof(SRetrieveTableRsp));
...@@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3 ...@@ -120,12 +120,12 @@ int32_t qwBuildAndSendFetchRsp(SQWConnInfo *pConn, SRetrieveTableRsp *pRsp, int3
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); tmsgSendRsp(pMsgCb, &rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendCancelRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code; pRsp->code = code;
...@@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -138,11 +138,11 @@ int32_t qwBuildAndSendCancelRsp(SQWConnInfo *pConn, int32_t code) {
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); tmsgSendRsp(pMsgCb, &rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(const SMsgCb *pMsgCb, SQWConnInfo *pConn, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code; pRsp->code = code;
...@@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) { ...@@ -155,11 +155,11 @@ int32_t qwBuildAndSendDropRsp(SQWConnInfo *pConn, int32_t code) {
.code = code, .code = code,
}; };
rpcSendResponse(&rpcRsp); tmsgSendRsp(pMsgCb, &rpcRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { int32_t qwBuildAndSendShowRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, int32_t code) {
int32_t numOfCols = 6; int32_t numOfCols = 6;
SVShowTablesRsp showRsp = {0}; SVShowTablesRsp showRsp = {0};
...@@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) { ...@@ -210,11 +210,11 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
.code = code, .code = code,
}; };
rpcSendResponse(&rpcMsg); tmsgSendRsp(pMsgCb, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) { int32_t qwBuildAndSendShowFetchRsp(const SMsgCb *pMsgCb, SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp)); SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
int32_t handle = htonl(pFetchReq->id); int32_t handle = htonl(pFetchReq->id);
...@@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe ...@@ -227,7 +227,7 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
.code = 0, .code = 0,
}; };
rpcSendResponse(&rpcMsg); tmsgSendRsp(pMsgCb, &rpcMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -287,7 +287,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -287,7 +287,7 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL, .code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
}; };
rpcRegisterBrokenLinkArg(&pMsg); tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -498,7 +498,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
_return: _return:
QW_ERR_RET(qwBuildAndSendCancelRsp(&qwMsg.connInfo, code)); QW_ERR_RET(qwBuildAndSendCancelRsp(&mgmt->msgCb, &qwMsg.connInfo, code));
QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code)); QW_SCH_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", qwMsg.connInfo.handle, code, tstrerror(code));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -579,15 +579,16 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -579,15 +579,16 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *pMgmt = qWorkerMgmt;
int32_t code = 0; int32_t code = 0;
SVShowTablesReq *pReq = pMsg->pCont; SVShowTablesReq *pReq = pMsg->pCont;
QW_RET(qwBuildAndSendShowRsp(pMsg, code)); QW_RET(qwBuildAndSendShowRsp(&pMgmt->msgCb, pMsg, code));
} }
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...@@ -595,8 +596,8 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) ...@@ -595,8 +596,8 @@ int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg)
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SQWorkerMgmt *pMgmt = qWorkerMgmt;
SVShowTablesFetchReq *pFetchReq = pMsg->pCont; SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
QW_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq)); QW_RET(qwBuildAndSendShowFetchRsp(&pMgmt->msgCb, pMsg, pFetchReq));
} }
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
#include <sys/wait.h> #include <sys/wait.h>
#define SHM_DEFAULT_SIZE (20 * 1024 * 1024) #define SHM_DEFAULT_SIZE (20 * 1024 * 1024)
#define CEIL8(n) (ceil((float)(n) / 8) * 8)
typedef void *(*ProcThreadFp)(void *param); typedef void *(*ProcThreadFp)(void *param);
typedef struct SProcQueue { typedef struct SProcQueue {
...@@ -58,6 +57,11 @@ typedef struct SProcObj { ...@@ -58,6 +57,11 @@ typedef struct SProcObj {
bool stopFlag; bool stopFlag;
} SProcObj; } SProcObj;
static inline int32_t CEIL8(int32_t v) {
const int32_t c = ceil((float)(v) / 8) * 8;
return c < 8 ? 8 : c;
}
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) { static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
TdThreadMutex *pMutex = NULL; TdThreadMutex *pMutex = NULL;
TdThreadMutexAttr mattr = {0}; TdThreadMutexAttr mattr = {0};
...@@ -203,7 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { ...@@ -203,7 +207,8 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
} }
} }
static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHeadLen, char *pBody, int32_t rawBodyLen) { static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int16_t rawHeadLen, char *pBody, int32_t rawBodyLen,
ProcFuncType funcType) {
const int32_t headLen = CEIL8(rawHeadLen); const int32_t headLen = CEIL8(rawHeadLen);
const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8; const int32_t fullLen = headLen + bodyLen + 8;
...@@ -221,10 +226,12 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -221,10 +226,12 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
} }
if (pQueue->tail < pQueue->total) { if (pQueue->tail < pQueue->total) {
*(int32_t *)(pQueue->pBuffer + pQueue->head) = headLen; *(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen;
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType;
*(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; *(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen;
} else { } else {
*(int32_t *)(pQueue->pBuffer) = headLen; *(int16_t *)(pQueue->pBuffer) = headLen;
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType;
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen; *(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
} }
...@@ -264,13 +271,13 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea ...@@ -264,13 +271,13 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
taosThreadMutexUnlock(pQueue->mutex); taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem); tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items,
headLen, pHead, bodyLen, pBody); headLen, pHead, bodyLen, pBody, funcType);
return 0; return 0;
} }
static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHeadLen, void **ppBody, static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody,
int32_t *pBodyLen) { int32_t *pBodyLen, ProcFuncType *pFuncType) {
tsem_wait(&pQueue->sem); tsem_wait(&pQueue->sem);
taosThreadMutexLock(pQueue->mutex); taosThreadMutexLock(pQueue->mutex);
...@@ -281,13 +288,16 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea ...@@ -281,13 +288,16 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
return 0; return 0;
} }
int32_t headLen = 0; int16_t headLen = 0;
int8_t ftype = 0;
int32_t bodyLen = 0; int32_t bodyLen = 0;
if (pQueue->head < pQueue->total) { if (pQueue->head < pQueue->total) {
headLen = *(int32_t *)(pQueue->pBuffer + pQueue->head); headLen = *(int16_t *)(pQueue->pBuffer + pQueue->head);
ftype = *(int8_t *)(pQueue->pBuffer + pQueue->head + 2);
bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4); bodyLen = *(int32_t *)(pQueue->pBuffer + pQueue->head + 4);
} else { } else {
headLen = *(int32_t *)(pQueue->pBuffer); headLen = *(int16_t *)(pQueue->pBuffer);
ftype = *(int8_t *)(pQueue->pBuffer + 2);
bodyLen = *(int32_t *)(pQueue->pBuffer + 4); bodyLen = *(int32_t *)(pQueue->pBuffer + 4);
} }
...@@ -341,9 +351,10 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea ...@@ -341,9 +351,10 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int32_t *pHea
*ppBody = pBody; *ppBody = pBody;
*pHeadLen = headLen; *pHeadLen = headLen;
*pBodyLen = bodyLen; *pBodyLen = bodyLen;
*pFuncType = (ProcFuncType)ftype;
uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p", pQueue->name, pQueue, pQueue->items, uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items,
headLen, pHead, bodyLen, pBody); headLen, pHead, bodyLen, pBody, ftype);
return 1; return 1;
} }
...@@ -396,12 +407,14 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ...@@ -396,12 +407,14 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
ProcConsumeFp consumeFp = pQueue->consumeFp; ProcConsumeFp consumeFp = pQueue->consumeFp;
void *pParent = pQueue->pParent; void *pParent = pQueue->pParent;
void *pHead, *pBody; void *pHead, *pBody;
int32_t headLen, bodyLen; int16_t headLen;
ProcFuncType ftype;
int32_t bodyLen;
uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue); uDebug("proc:%s, start to get msg from queue:%p", pQueue->name, pQueue);
while (1) { while (1) {
int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen); int32_t numOfMsgs = taosProcQueuePop(pQueue, &pHead, &headLen, &pBody, &bodyLen, &ftype);
if (numOfMsgs == 0) { if (numOfMsgs == 0) {
uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue); uDebug("proc:%s, get no msg from queue:%p and exit the proc thread", pQueue->name, pQueue);
break; break;
...@@ -410,7 +423,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { ...@@ -410,7 +423,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
taosMsleep(1); taosMsleep(1);
continue; continue;
} else { } else {
(*consumeFp)(pParent, pHead, headLen, pBody, bodyLen); (*consumeFp)(pParent, pHead, headLen, pBody, bodyLen, ftype);
} }
} }
} }
...@@ -458,10 +471,12 @@ void taosProcCleanup(SProcObj *pProc) { ...@@ -458,10 +471,12 @@ void taosProcCleanup(SProcObj *pProc) {
} }
} }
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { int32_t taosProcPutToChildQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen); ProcFuncType funcType) {
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType);
} }
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen) { int32_t taosProcPutToParentQ(SProcObj *pProc, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen,
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen); ProcFuncType funcType) {
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType);
} }
...@@ -41,7 +41,7 @@ print =============== insert data, mode1: one row one table in sql ...@@ -41,7 +41,7 @@ print =============== insert data, mode1: one row one table in sql
print =============== insert data, mode1: mulit rows one table in sql print =============== insert data, mode1: mulit rows one table in sql
print =============== insert data, mode1: one rows mulit table in sql print =============== insert data, mode1: one rows mulit table in sql
print =============== insert data, mode1: mulit rows mulit table in sql print =============== insert data, mode1: mulit rows mulit table in sql
sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) sql insert into c1 values(now-1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) sql insert into c1 values(now+0s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+1s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40) (now+2s, true, -1, -2, -3, -4, -6.0, -7.0, 'child tbl 1', 'child tbl 1', '2022-02-25 18:00:00.000', 10, 20, 30, 40)
print =============== query data print =============== query data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册