diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 020f9f7ccdceef9c8950c96a8078c64c377e5d5f..b8a5f28ceb159ee67be149617b2b562ea90dcc30 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -24,6 +24,7 @@ extern "C" { typedef void* qTaskInfo_t; typedef void* DataSinkHandle; +struct SRpcMsg; struct SSubplan; /** @@ -208,6 +209,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); */ void** qDeregisterQInfo(void* pMgmt, void* pQInfo); +void qProcessFetchRsp(struct SRpcMsg* pMsg); + #ifdef __cplusplus } #endif diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 5d815d15e0ce3ae99696444beefc3ef598edc79d..5e3320ffdb0a1307123473157c767f16b9dc23d1 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -49,9 +49,10 @@ typedef struct { } SQWorkerStat; typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); +typedef int32_t (*sendReqToDnodeFp)(void *, struct SEpSet *, struct SRpcMsg *); - -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp); +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, + putReqToQueryQFp fp1, sendReqToDnodeFp fp2); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); @@ -65,6 +66,8 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index 74fb5f1437d7acd32e33171c68cbf088fe471514..02dced53c2ecf62e24f954eeb6555fb60474fd02 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -289,6 +289,7 @@ int32_t dndInit(const SDnodeEnvCfg *pCfg) { .charset = pCfg->charset, .nthreads = pCfg->numOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, + .sendReqToDnodeFp = dndSendReqToDnode }; if (vnodeInit(&vnodeOpt) != 0) { diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 1da13bc3d93b4bc7e083f32477b5cdbf9e70624c..ede3bf5a791c12b5f0c7eacd45b172118d641849 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -121,6 +121,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH_RSP)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2accfd6279a8d31c075e1c1a47f927394b77ad22..b56c5b30fa4fc3ce8584bdb2273a134f177420c2 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -33,6 +33,7 @@ extern "C" { typedef struct SVnode SVnode; typedef struct SDnode SDnode; typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); +typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef struct STqCfg { // TODO @@ -64,6 +65,7 @@ typedef struct { const char *charset; uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) PutReqToVQueryQFp putReqToVQueryQFp; + SendReqToDnodeFp sendReqToDnodeFp; } SVnodeOpt; typedef struct STqReadHandle { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 1fa65b2a73377a33a1eb26b128851a09889a7732..f442697fb01423a265f95bd676379661cc838f4d 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -55,6 +55,7 @@ typedef struct SVnodeMgr { // For vnode Mgmt SDnode* pDnode; PutReqToVQueryQFp putReqToVQueryQFp; + SendReqToDnodeFp sendReqToDnodeFp; } SVnodeMgr; extern SVnodeMgr vnodeMgr; @@ -85,6 +86,7 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); +void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); // For Log extern int32_t vDebugFlag; diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index d7628441206ac9bd9a6945d9813811e402ed5b5e..477deed8c8944fc511c8aa3f78951d2131171f40 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -26,6 +26,7 @@ int vnodeInit(const SVnodeOpt *pOption) { vnodeMgr.stop = false; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; + vnodeMgr.sendReqToDnodeFp = pOption->sendReqToDnodeFp; // Start commit handers if (pOption->nthreads > 0) { @@ -96,6 +97,10 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); } +void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) { + (*vnodeMgr.sendReqToDnodeFp)(pVnode->pDnode, epSet, pReq); +} + /* ------------------------ STATIC METHODS ------------------------ */ static void* loop(void* arg) { setThreadName("vnode-commit"); diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 48ff7252bef76706034a2b64920e9ac52f8f864c..c329b47835660c474280fa028569f1a74a863816 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, - (putReqToQueryQFp)vnodePutReqToVQueryQ); + (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode); } int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { @@ -43,6 +43,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { case TDMT_VND_FETCH: return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_FETCH_RSP: + return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg); case TDMT_VND_RES_READY: return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); case TDMT_VND_TASKS_STATUS: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9a2bb75b34a4155cc415f5019bd7054f16cc8df4..edce69f02643eb9bfe84facb5bf0a08563f21371 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5119,7 +5119,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { tfree(pMsgBody); } -void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { +void qProcessFetchRsp(SRpcMsg* pMsg) { SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; assert(pMsg->ahandle != NULL); @@ -5289,6 +5289,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pOperator->exec = doLoadRemoteData; pOperator->pTaskInfo = pTaskInfo; +#if 0 { // todo refactor SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); @@ -5309,7 +5310,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* return NULL; // todo } } - +#endif return pOperator; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 83c7a4208e3da2cbd8b2c58040afd1520bb01cc5..c54ef1cacc1834c6fb16aba26f89582b69ac389b 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -137,6 +137,7 @@ typedef struct SQWorkerMgmt { SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx void *nodeObj; putReqToQueryQFp putToQueueFp; + sendReqToDnodeFp sendReqFp; } SQWorkerMgmt; #define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index d09f749805ae2fa38a2b203dbf8c196fa0c4d03f..21fd4e6dccd80d0d62c60f0812afc1f4a0034097 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1311,12 +1311,13 @@ _return: QW_RET(code); } -int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) { - if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp) { +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, + putReqToQueryQFp fp1, sendReqToDnodeFp fp2) { + if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) { qError("invalid param to init qworker"); QW_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); if (NULL == mgmt) { qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); @@ -1359,7 +1360,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW mgmt->nodeType = nodeType; mgmt->nodeId = nodeId; mgmt->nodeObj = nodeObj; - mgmt->putToQueueFp = fp; + mgmt->putToQueueFp = fp1; + mgmt->sendReqFp = fp2; *qWorkerMgmt = mgmt; diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index d11bee6dce22454646d5080cb87314c8beb73685..92e1edc1393b969e9dd20304060056d0d575809a 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -421,6 +421,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } +int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + qProcessFetchRsp(pMsg); + return TSDB_CODE_SUCCESS; +} + int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index b9d84b25aa499199745f76c08020c515af30d834..4c81123bf23b1d1834bb97c12bada6ec57c985c0 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -1081,7 +1081,7 @@ TEST(rcTest, shortExecshortDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0; @@ -1162,7 +1162,7 @@ TEST(rcTest, longExecshortDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 1000000; @@ -1245,7 +1245,7 @@ TEST(rcTest, shortExeclongDelay) { qwtTestStop = false; qwtTestQuitThreadNum = 0; - code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); ASSERT_EQ(code, 0); qwtTestMaxExecTaskUsec = 0;