提交 2e6263b4 编写于 作者: H Haojun Liao

fix(stream): fix memory leak.

上级 a1e554fb
......@@ -211,12 +211,13 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer;
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, pTask->dataRange.range.maxVer);
id, ver, maxVer);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
......@@ -224,7 +225,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
/*int32_t code = */streamSchedExec(pTask);
} else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
id, ver, pTask->dataRange.range.maxVer);
id, ver, maxVer);
}
}
}
......
......@@ -757,10 +757,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
streamFreeQitem(pTask->msgInfo.pData);
return TSDB_CODE_SUCCESS;
}
......
......@@ -534,16 +534,21 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
} else {
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else { // level == TASK_LEVEL__SINK
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else { // non-dispatch task, do task state transfer directly
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
streamFreeQitem((SStreamQueueItem*)pBlock);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (level != TASK_LEVEL__SINK) {
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
} else {
qDebug("s-task:%d sink task does not transfer state", id);
}
}
......
......@@ -400,7 +400,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
}
pTask->status.appendTranstateBlock = true;
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册