提交 ae5155ed 编写于 作者: H Haojun Liao

fix(tmq): do some internal refactor.

上级 4265fcb6
...@@ -67,9 +67,10 @@ typedef enum { ...@@ -67,9 +67,10 @@ typedef enum {
* Create the exec task for stream mode * Create the exec task for stream mode
* @param pMsg * @param pMsg
* @param SReadHandle * @param SReadHandle
* @param vgId
* @return * @return
*/ */
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId);
/** /**
* Create the exec task for queue mode * Create the exec task for queue mode
...@@ -77,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); ...@@ -77,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle * @param SReadHandle
* @return * @return
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/** /**
......
...@@ -75,9 +75,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -75,9 +75,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb; pTask->pMsgCb = &pSnode->msgCb;
pTask->startVer = ver; pTask->startVer = ver;
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
...@@ -90,11 +88,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -90,11 +88,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState, .pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0);
ASSERT(pTask->exec.executor); ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask); streamSetupTrigger(pTask);
return 0; return 0;
} }
......
...@@ -282,7 +282,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -282,7 +282,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
.initTqReader = 1, .initTqReader = 1,
.pStateBackend = pStreamState, .pStateBackend = pStreamState,
}; };
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -864,7 +864,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t ...@@ -864,7 +864,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
if (!dstTaskInfo) { if (!dstTaskInfo) {
code = TSDB_CODE_RSMA_QTASKINFO_CREATE; code = TSDB_CODE_RSMA_QTASKINFO_CREATE;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
......
...@@ -906,7 +906,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -906,7 +906,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, SVnode* pVnode = pTq->pVnode;
int32_t vgId = TD_VID(pVnode);
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId); req.oldConsumerId, req.newConsumerId);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
...@@ -935,7 +938,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -935,7 +938,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->fetchMeta = req.withMeta; pHandle->fetchMeta = req.withMeta;
// TODO version should be assigned and refed during preprocess // TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
if (pRef == NULL) { if (pRef == NULL) {
taosMemoryFree(req.qmsg); taosMemoryFree(req.qmsg);
return -1; return -1;
...@@ -945,8 +948,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -945,8 +948,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
pHandle->pRef = pRef; pHandle->pRef = pRef;
SReadHandle handle = { SReadHandle handle = {
.meta = pTq->pVnode->pMeta, .meta = pVnode->pMeta,
.vnode = pTq->pVnode, .vnode = pVnode,
.initTableReader = true, .initTableReader = true,
.initTqReader = true, .initTqReader = true,
.version = ver, .version = ver,
...@@ -959,38 +962,38 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -959,38 +962,38 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.qmsg = NULL; req.qmsg = NULL;
pHandle->execHandle.task = pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, NULL); qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner); qExtractStreamScanner(pHandle->execHandle.task, &scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
pHandle->execHandle.execDb.pFilterOutTbUid = pHandle->execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext)); (SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.suid = req.suid;
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); vnodeGetCtbIdList(pVnode, req.suid, tbUidList);
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, req.suid); tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pVnode->config.vgId, req.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
} }
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.pExecReader = tqOpenReader(pVnode);
tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext)); (SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
} }
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
...@@ -1043,6 +1046,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -1043,6 +1046,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
#endif #endif
int32_t vgId = TD_VID(pTq->pVnode);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
...@@ -1055,9 +1059,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -1055,9 +1059,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->startVer = ver; pTask->startVer = ver;
// expand executor // expand executor
...@@ -1077,7 +1079,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -1077,7 +1079,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.initTqReader = 1, .initTqReader = 1,
.pStateBackend = pTask->pState, .pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
if (pTask->exec.executor == NULL) { if (pTask->exec.executor == NULL) {
return -1; return -1;
} }
...@@ -1092,7 +1095,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -1092,7 +1095,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
.pStateBackend = pTask->pState, .pStateBackend = pTask->pState,
}; };
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId);
if (pTask->exec.executor == NULL) { if (pTask->exec.executor == NULL) {
return -1; return -1;
} }
...@@ -1122,9 +1126,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -1122,9 +1126,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
streamSetupTrigger(pTask); streamSetupTrigger(pTask);
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel);
tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", TD_VID(pTq->pVnode), pTask->taskId,
pTask->selfChildId, pTask->taskLevel);
return 0; return 0;
} }
......
...@@ -274,6 +274,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -274,6 +274,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
return -1; return -1;
} }
int32_t vgId = TD_VID(pTq->pVnode);
void* pKey = NULL; void* pKey = NULL;
int kLen = 0; int kLen = 0;
void* pVal = NULL; void* pVal = NULL;
...@@ -304,7 +305,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -304,7 +305,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task = handle.execHandle.task =
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, NULL); qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL);
if (handle.execHandle.task == NULL) { if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey); tqError("cannot create exec task for %s", handle.subKey);
return -1; return -1;
...@@ -324,7 +325,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -324,7 +325,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext)); (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
...@@ -341,7 +342,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -341,7 +342,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext)); handle.fetchMeta, (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, NULL, NULL); handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
......
...@@ -115,6 +115,7 @@ typedef struct STaskIdInfo { ...@@ -115,6 +115,7 @@ typedef struct STaskIdInfo {
uint64_t subplanId; uint64_t subplanId;
uint64_t templateId; uint64_t templateId;
char* str; char* str;
int32_t vgId;
} STaskIdInfo; } STaskIdInfo;
enum { enum {
...@@ -834,8 +835,8 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); ...@@ -834,8 +835,8 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model); int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
......
...@@ -218,7 +218,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, ...@@ -218,7 +218,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
return code; return code;
} }
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) { if (msg == NULL) {
// create raw scan // create raw scan
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
...@@ -231,7 +231,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -231,7 +231,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
pTaskInfo->cost.created = taosGetTimestampUs(); pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(readers, pTaskInfo); pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
if (NULL == pTaskInfo->pRoot) { if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo); taosMemoryFree(pTaskInfo);
...@@ -248,7 +248,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -248,7 +248,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE); code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_QUEUE);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo); qDestroyTask(pTaskInfo);
...@@ -274,13 +274,11 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -274,13 +274,11 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
return pTaskInfo; return pTaskInfo;
} }
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) {
if (msg == NULL) { if (msg == NULL) {
return NULL; return NULL;
} }
/*qDebugL("stream task string %s", (const char*)msg);*/
struct SSubplan* pPlan = NULL; struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan); int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -289,7 +287,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) { ...@@ -289,7 +287,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, 0, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo); qDestroyTask(pTaskInfo);
...@@ -468,11 +466,11 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, ...@@ -468,11 +466,11 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
taosThreadOnce(&initPoolOnce, initRefPool); taosThreadOnce(&initPoolOnce, initRefPool);
qDebug("start to create subplan task, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model); int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to createExecTaskInfoImpl, code: %s", tstrerror(code)); qError("failed to createExecTaskInfo, code: %s", tstrerror(code));
goto _error; goto _error;
} }
......
...@@ -1975,7 +1975,7 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) { ...@@ -1975,7 +1975,7 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) {
return p; return p;
} }
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) { static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1990,6 +1990,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -1990,6 +1990,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId; pTaskInfo->id.queryId = queryId;
pTaskInfo->id.str = buildTaskId(taskId, queryId); pTaskInfo->id.str = buildTaskId(taskId, queryId);
return pTaskInfo; return pTaskInfo;
...@@ -2178,7 +2179,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -2178,7 +2179,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
#ifndef NDEBUG #ifndef NDEBUG
int32_t sz = tableListGetSize(pTableListInfo); int32_t sz = tableListGetSize(pTableListInfo);
qDebug("create stream task, total:%d", sz); qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
...@@ -2439,17 +2440,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT ...@@ -2439,17 +2440,14 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
char* sql, EOPTR_EXEC_MODEL model) { int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId; *pTaskInfo = doCreateExecTaskInfo(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName);
*pTaskInfo = createExecTaskInfo(queryId, taskId, model, pPlan->dbFName);
if (*pTaskInfo == NULL) { if (*pTaskInfo == NULL) {
goto _complete; goto _complete;
} }
if (pHandle) { if (pHandle) {
/*(*pTaskInfo)->streamInfo.fillHistoryVer1 = pHandle->fillHistoryVer1;*/
if (pHandle->pStateBackend) { if (pHandle->pStateBackend) {
(*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册