未验证 提交 59b0f99f 编写于 作者: L liuyao 提交者: GitHub

Merge pull request #15110 from taosdata/feature/TD-17357

feat(stream): add num of children
......@@ -40,6 +40,7 @@ typedef struct SReadHandle {
bool initMetaReader;
bool initTableReader;
bool initTqReader;
int32_t numOfVgroups;
} SReadHandle;
// in queue mode, data streams are seperated by msg
......
......@@ -262,6 +262,7 @@ typedef struct SStreamTask {
int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
int32_t numOfVgroups;
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......
......@@ -383,6 +383,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// exec
pInnerTask->execType = TASK_EXEC__PIPE;
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pSourceDb);
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
if (tsSchedStreamToSnode) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) {
......
......@@ -591,7 +591,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
} else {
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = pTask->numOfVgroups,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
}
ASSERT(pTask->exec.executor);
}
......
......@@ -4424,7 +4424,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.calTrigger = pTableScanNode->triggerType,
.maxTs = INT64_MIN,
};
if (pHandle) {
if (pHandle->vnode) {
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
if (code) {
......@@ -4590,7 +4590,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
int32_t children = 1;
int32_t children = pHandle->numOfVgroups;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
......
......@@ -1539,7 +1539,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
goto _error;
}
if (pHandle) {
if (pHandle->vnode) {
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
if (pHandle->version > 0) {
......
......@@ -64,6 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
......@@ -118,6 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;
int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册