提交 53b4c06d 编写于 作者: S Shengliang Guan

add msg queue

上级 ab3378e0
......@@ -32,7 +32,7 @@ extern "C" {
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SVnode SVnode;
typedef struct SDnode SDnode;
typedef void (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg {
int32_t vgId;
......
......@@ -36,6 +36,8 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq);
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pReq);
#ifdef __cplusplus
}
#endif
......
......@@ -802,7 +802,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t
}
}
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) {
static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, bool sendRsp) {
int32_t code = 0;
if (pQueue == NULL) {
......@@ -819,13 +819,15 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg)
}
}
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS && sendRsp) {
if (pRpcMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pRpcMsg->pCont);
}
return code;
}
static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
......@@ -848,7 +850,7 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) {
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg);
(void)dndWriteRpcMsgToVnodeQueue(pVnode->pWriteQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode);
}
}
......@@ -856,7 +858,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg);
(void)dndWriteRpcMsgToVnodeQueue(pVnode->pSyncQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode);
}
}
......@@ -864,7 +866,7 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg);
(void)dndWriteRpcMsgToVnodeQueue(pVnode->pQueryQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode);
}
}
......@@ -872,11 +874,23 @@ void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
SVnodeObj *pVnode = dndAcquireVnodeFromMsg(pDnode, pMsg);
if (pVnode != NULL) {
dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg);
(void)dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, true);
dndReleaseVnode(pDnode, pVnode);
}
}
int32_t dndPutReqToVQueryQ(SDnode *pDnode, SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
// pHead->vgId = htonl(pHead->vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId);
if (pVnode == NULL) return -1;
int32_t code = dndWriteRpcMsgToVnodeQueue(pVnode->pFetchQ, pMsg, false);
dndReleaseVnode(pDnode, pVnode);
return code;
}
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) {
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) return -1;
......
......@@ -153,8 +153,6 @@ static void dndCleanupEnv(SDnode *pDnode) {
taosStopCacheRefreshWorker();
}
static void dndPutMsgToVQueryQ(SDnode *pDnode, SRpcMsg *pRpcMsg) { dndProcessVnodeQueryMsg(pDnode, pRpcMsg, NULL); }
SDnode *dndInit(SDnodeOpt *pOption) {
taosIgnSIGPIPE();
taosBlockSIGPIPE();
......@@ -204,7 +202,7 @@ SDnode *dndInit(SDnodeOpt *pOption) {
.locale = pDnode->opt.locale,
.charset = pDnode->opt.charset,
.nthreads = pDnode->opt.numOfCommitThreads,
.putReqToVQueryQFp = dndPutMsgToVQueryQ,
.putReqToVQueryQFp = dndPutReqToVQueryQ,
};
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode env");
......
......@@ -82,7 +82,7 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task);
void vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
#ifdef __cplusplus
}
......
......@@ -90,10 +90,12 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0;
}
void vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
assert(vnodeMgr.putReqToVQueryQFp);
assert(pVnode->pDnode);
(*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
if (pVnode == NULL || pVnode->pDnode == NULL || vnodeMgr.putReqToVQueryQFp == NULL) {
terrno = TSDB_CODE_VND_APP_ERROR;
return -1;
}
return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
}
/* ------------------------ STATIC METHODS ------------------------ */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册