提交 4c4be3c5 编写于 作者: L Liu Jicong

put createStreamExecTaskInfo into right place

上级 7361019e
......@@ -32,7 +32,9 @@ struct SSubplan;
* @param pStreamBlockReadHandle
* @return
*/
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input);
/**
* Create the exec task object according to task json
......
......@@ -82,27 +82,12 @@ typedef struct STqSubscribeReq {
int64_t topic[];
} STqSubscribeReq;
typedef struct STqSubscribeRsp {
STqMsgHead head;
int64_t vgId;
char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
} STqSubscribeRsp;
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
typedef struct STqTopicVhandle {
int64_t topicId;
// executor for filter
void* filterExec;
// callback for mnode
// trigger when vnode list associated topic change
void* (*mCallback)(void*, void*);
} STqTopicVhandle;
#define TQ_BUFFER_SIZE 8
typedef struct STqExec {
......
......@@ -633,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
// read until find TDMT_VND_SUBMIT
}
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
void* task = pHandle->buffer.output[pos].task;
/*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/
qStreamExecTaskSetInput(task, pCont);
SSDataBlock* pDataBlock;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
}
// TODO: launch query and get output data
void* outputData;
pHandle->buffer.output[pos].dst = outputData;
pHandle->buffer.output[pos].dst = pDataBlock;
if (pHandle->buffer.firstOffset == -1
|| pReq->offset < pHandle->buffer.firstOffset) {
pHandle->buffer.firstOffset = pReq->offset;
......@@ -674,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pTopic->buffer.output[i].status = 0;
pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL);
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL);
}
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
// write mq meta
......
......@@ -13,10 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planner.h"
#include "executor.h"
#include "planner.h"
void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {}
qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) {
if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
return NULL;
}
......@@ -27,8 +29,8 @@ qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadH
pMsg->taskId = be64toh(pMsg->taskId);
pMsg->contentLen = ntohl(pMsg->contentLen);
struct SSubplan *plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
struct SSubplan* plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册