提交 728112ed 编写于 作者: H Haojun Liao

fix(stream): set correct upstream epinfo.

上级 1c1bf63e
...@@ -309,7 +309,6 @@ struct SStreamTask { ...@@ -309,7 +309,6 @@ struct SStreamTask {
SHistDataRange dataRange; SHistDataRange dataRange;
SStreamId historyTaskId; SStreamId historyTaskId;
SStreamId streamTaskId; SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId; int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo> SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
int64_t initTs; int64_t initTs;
......
...@@ -937,10 +937,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -937,10 +937,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
taosThreadMutexInit(&pTask->lock, NULL); // checkpoint ver is the kept version, handled data should be the next version.
if (pTask->chkInfo.checkpointId != 0) { if (pTask->chkInfo.checkpointId != 0) {
// checkpoint ver is the kept version, handled data should be the next version.
pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer);
......
...@@ -395,7 +395,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { ...@@ -395,7 +395,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
// agg // agg
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", qDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
...@@ -426,7 +426,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ...@@ -426,7 +426,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList); int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList);
qDebug( qDebug(
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send "
"rsp to all upstream tasks", "rsp to all upstream tasks",
......
...@@ -270,7 +270,6 @@ void tFreeStreamTask(SStreamTask* pTask) { ...@@ -270,7 +270,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
walCloseReader(pTask->exec.pWalReader); walCloseReader(pTask->exec.pWalReader);
} }
taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree);
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
taosMemoryFree(pTask->tbSink.pTSchema); taosMemoryFree(pTask->tbSink.pTSchema);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册