提交 95b8754d 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'fix/newCheckpoint' into enh/triggerCheckPoint2

...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode; extern bool tsDeployOnSnode;
static int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream); static int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream);
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory); SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
...@@ -270,13 +270,13 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas ...@@ -270,13 +270,13 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setEpToDownstreamTask(pTask, pSinkTask); setTaskUpstreamEpInfo(pTask, pSinkTask);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) { static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo)); SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -301,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { ...@@ -301,7 +301,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
} }
int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { int32_t setTaskUpstreamEpInfo(const SStreamTask* pTask, SStreamTask* pDownstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask); SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) { if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
...@@ -424,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui ...@@ -424,7 +424,7 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return -1; return -1;
} }
return setEpToDownstreamTask(pTask, pDownstreamTask); return setTaskUpstreamEpInfo(pTask, pDownstreamTask);
} }
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
...@@ -592,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr ...@@ -592,6 +592,14 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setSinkTaskUpstreamInfo(SArray* pTasksList, const SStreamTask* pUpstreamTask) {
SArray* pSinkTaskList = taosArrayGetP(pTasksList, SINK_NODE_LEVEL);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setTaskUpstreamEpInfo(pUpstreamTask, pSinkTask);
}
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
...@@ -643,11 +651,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* ...@@ -643,11 +651,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
return code; return code;
} }
SArray* pSinkTaskList = taosArrayGetP(pStream->tasks, 0); setSinkTaskUpstreamInfo(pStream->tasks, pAggTask);
for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { setSinkTaskUpstreamInfo(pStream->pHTasksList, pHAggTask);
SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i);
setEpToDownstreamTask(pAggTask, pSinkTask);
}
// source level // source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey); return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
......
...@@ -1673,7 +1673,8 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1673,7 +1673,8 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
return code; return code;
} }
tqDebug("vgId:%d s-task:%s received the checkpoint ready msg, handle it", vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask); streamProcessCheckpointReadyMsg(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
......
...@@ -2648,6 +2648,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera ...@@ -2648,6 +2648,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera
for (int32_t i = 0; i < mapSize; i++) { for (int32_t i = 0; i < mapSize; i++) {
SWinKey key = {0}; SWinKey key = {0};
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey));
buf = decodeSWinKey(buf, &key); buf = decodeSWinKey(buf, &key);
buf = decodeSRowBuffPos(buf, pPos); buf = decodeSRowBuffPos(buf, pPos);
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES);
......
...@@ -113,6 +113,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i ...@@ -113,6 +113,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
// TODO // TODO
} }
continue; continue;
} else if (output->info.type == STREAM_CHECKPOINT) {
continue; // checkpoint block not dispatch to downstream tasks
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册