提交 c37fbd1a 编写于 作者: S Shengliang

for fetch rpc client

上级 e777ee72
...@@ -24,6 +24,7 @@ extern "C" { ...@@ -24,6 +24,7 @@ extern "C" {
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan; struct SSubplan;
/** /**
...@@ -208,6 +209,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); ...@@ -208,6 +209,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
*/ */
void** qDeregisterQInfo(void* pMgmt, void* pQInfo); void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
void qProcessFetchRsp(struct SRpcMsg* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -49,9 +49,10 @@ typedef struct { ...@@ -49,9 +49,10 @@ typedef struct {
} SQWorkerStat; } SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *);
typedef int32_t (*sendReqToDnodeFp)(void *, struct SEpSet *, struct SRpcMsg *);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp); putReqToQueryQFp fp1, sendReqToDnodeFp fp2);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
...@@ -65,6 +66,8 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); ...@@ -65,6 +66,8 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
......
...@@ -289,6 +289,7 @@ int32_t dndInit(const SDnodeEnvCfg *pCfg) { ...@@ -289,6 +289,7 @@ int32_t dndInit(const SDnodeEnvCfg *pCfg) {
.charset = pCfg->charset, .charset = pCfg->charset,
.nthreads = pCfg->numOfCommitThreads, .nthreads = pCfg->numOfCommitThreads,
.putReqToVQueryQFp = dndPutReqToVQueryQ, .putReqToVQueryQFp = dndPutReqToVQueryQ,
.sendReqToDnodeFp = dndSendReqToDnode
}; };
if (vnodeInit(&vnodeOpt) != 0) { if (vnodeInit(&vnodeOpt) != 0) {
......
...@@ -121,6 +121,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { ...@@ -121,6 +121,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH_RSP)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg;
......
...@@ -33,6 +33,7 @@ extern "C" { ...@@ -33,6 +33,7 @@ extern "C" {
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef struct STqCfg { typedef struct STqCfg {
// TODO // TODO
...@@ -64,6 +65,7 @@ typedef struct { ...@@ -64,6 +65,7 @@ typedef struct {
const char *charset; const char *charset;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp; PutReqToVQueryQFp putReqToVQueryQFp;
SendReqToDnodeFp sendReqToDnodeFp;
} SVnodeOpt; } SVnodeOpt;
typedef struct STqReadHandle { typedef struct STqReadHandle {
......
...@@ -55,6 +55,7 @@ typedef struct SVnodeMgr { ...@@ -55,6 +55,7 @@ typedef struct SVnodeMgr {
// For vnode Mgmt // For vnode Mgmt
SDnode* pDnode; SDnode* pDnode;
PutReqToVQueryQFp putReqToVQueryQFp; PutReqToVQueryQFp putReqToVQueryQFp;
SendReqToDnodeFp sendReqToDnodeFp;
} SVnodeMgr; } SVnodeMgr;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
...@@ -85,6 +86,7 @@ struct SVnode { ...@@ -85,6 +86,7 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
// For Log // For Log
extern int32_t vDebugFlag; extern int32_t vDebugFlag;
......
...@@ -26,6 +26,7 @@ int vnodeInit(const SVnodeOpt *pOption) { ...@@ -26,6 +26,7 @@ int vnodeInit(const SVnodeOpt *pOption) {
vnodeMgr.stop = false; vnodeMgr.stop = false;
vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp;
vnodeMgr.sendReqToDnodeFp = pOption->sendReqToDnodeFp;
// Start commit handers // Start commit handers
if (pOption->nthreads > 0) { if (pOption->nthreads > 0) {
...@@ -96,6 +97,10 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) { ...@@ -96,6 +97,10 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
} }
void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
(*vnodeMgr.sendReqToDnodeFp)(pVnode->pDnode, epSet, pReq);
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) { static void* loop(void* arg) {
setThreadName("vnode-commit"); setThreadName("vnode-commit");
......
...@@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); ...@@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode,
(putReqToQueryQFp)vnodePutReqToVQueryQ); (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode);
} }
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
...@@ -43,6 +43,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -43,6 +43,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_RES_READY: case TDMT_VND_RES_READY:
return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_TASKS_STATUS: case TDMT_VND_TASKS_STATUS:
......
...@@ -5119,7 +5119,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { ...@@ -5119,7 +5119,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree(pMsgBody); tfree(pMsgBody);
} }
void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void qProcessFetchRsp(SRpcMsg* pMsg) {
SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
assert(pMsg->ahandle != NULL); assert(pMsg->ahandle != NULL);
...@@ -5289,6 +5289,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5289,6 +5289,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
pOperator->exec = doLoadRemoteData; pOperator->exec = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
#if 0
{ // todo refactor { // todo refactor
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
...@@ -5309,7 +5310,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* ...@@ -5309,7 +5310,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
return NULL; // todo return NULL; // todo
} }
} }
#endif
return pOperator; return pOperator;
} }
......
...@@ -137,6 +137,7 @@ typedef struct SQWorkerMgmt { ...@@ -137,6 +137,7 @@ typedef struct SQWorkerMgmt {
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj; void *nodeObj;
putReqToQueryQFp putToQueueFp; putReqToQueryQFp putToQueueFp;
sendReqToDnodeFp sendReqFp;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId #define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId
......
...@@ -1311,12 +1311,13 @@ _return: ...@@ -1311,12 +1311,13 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp) { putReqToQueryQFp fp1, sendReqToDnodeFp fp2) {
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) { if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
...@@ -1359,7 +1360,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW ...@@ -1359,7 +1360,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->nodeType = nodeType; mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId; mgmt->nodeId = nodeId;
mgmt->nodeObj = nodeObj; mgmt->nodeObj = nodeObj;
mgmt->putToQueueFp = fp; mgmt->putToQueueFp = fp1;
mgmt->sendReqFp = fp2;
*qWorkerMgmt = mgmt; *qWorkerMgmt = mgmt;
......
...@@ -421,6 +421,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -421,6 +421,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qProcessFetchRsp(pMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
......
...@@ -1081,7 +1081,7 @@ TEST(rcTest, shortExecshortDelay) { ...@@ -1081,7 +1081,7 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;
...@@ -1162,7 +1162,7 @@ TEST(rcTest, longExecshortDelay) { ...@@ -1162,7 +1162,7 @@ TEST(rcTest, longExecshortDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000; qwtTestMaxExecTaskUsec = 1000000;
...@@ -1245,7 +1245,7 @@ TEST(rcTest, shortExeclongDelay) { ...@@ -1245,7 +1245,7 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册