提交 daafe240 编写于 作者: H Haojun Liao

fix(stream): fix memory leak.

上级 f9be16be
......@@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey);
#ifdef __cplusplus
}
......
......@@ -250,12 +250,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
}
// todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs();
// pTask->dataRange.window.ekey = firstWindowSkey - 1;//taosGetTimestampMs();
STimeWindow* pWindow = &pTask->dataRange.window;
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey,
pTask->dataRange.window.ekey);
pWindow->skey = INT64_MIN;
pWindow->ekey = firstWindowSkey - 1;
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pWindow->skey, pWindow->ekey);
// sink or dispatch
if (hasExtraSink) {
......@@ -334,16 +333,16 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
}
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
bool hasExtraSink, int64_t lastTs) {
bool hasExtraSink, int64_t nextWindowSkey) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = addNewTaskList(pStream->tasks);
SSdb* pSdb = pMnode->pSdb;
SArray* pHTaskList = NULL;
if (pStream->conf.fillHistory) {
pHTaskList = addNewTaskList(pStream->pHTasksList);
}
SSdb* pSdb = pMnode->pSdb;
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
......@@ -372,7 +371,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0,
hasExtraSink, lastTs);
hasExtraSink, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
......@@ -381,8 +380,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pStream->conf.fillHistory, hasExtraSink, lastTs);
setHTasksId(pTaskList, pHTaskList);
1, hasExtraSink, nextWindowSkey);
}
sdbRelease(pSdb, pVgroup);
......@@ -391,11 +389,15 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
}
}
if (pStream->conf.fillHistory) {
setHTasksId(pTaskList, pHTaskList);
}
return TSDB_CODE_SUCCESS;
}
static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask,
SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup) {
SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup, int64_t nextWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -403,11 +405,12 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
}
// todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs();
STimeWindow* pWindow = &pTask->dataRange.window;
pWindow->skey = INT64_MIN;
pWindow->ekey = nextWindowSkey - 1;
mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel,
pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
pWindow->skey, pWindow->ekey);
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pDownstreamTask);
......@@ -507,7 +510,7 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
}
static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPlan, SStreamObj* pStream,
SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask) {
SStreamTask* pDownstreamTask, SStreamTask* pHDownstreamTask, int64_t nextWindowSkey) {
SArray* pSourceTaskList = addNewTaskList(pStream->tasks);
SArray* pHSourceTaskList = NULL;
......@@ -536,7 +539,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
continue;
}
int32_t code = doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup);
int32_t code =
doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup, nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
terrno = code;
......@@ -544,9 +548,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
}
if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode,
plan, pVgroup);
code = doAddSourceTask(pHSourceTaskList, 1, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup,
nextWindowSkey);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return code;
......@@ -581,7 +584,7 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return TSDB_CODE_SUCCESS;
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t lastTs) {
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t nextWindowSkey) {
SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
......@@ -612,7 +615,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
// check for fill history
if (pStream->conf.fillHistory) {
SArray* pHSinkTaskList = NULL;
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pStream->conf.fillHistory);
code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -633,22 +636,22 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
}
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask);
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask, nextWindowSkey);
} else if (numOfPlanLevel == 1) {
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, lastTs);
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, nextWindowSkey);
}
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey) {
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
int32_t code = doScheduleStream(pStream, pMnode, pPlan, 0);
int32_t code = doScheduleStream(pStream, pMnode, pPlan, nextWindowSkey);
qDestroyQueryPlan(pPlan);
return code;
......
......@@ -633,7 +633,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......
......@@ -784,7 +784,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, &streamObj) < 0) {
if (mndScheduleStream(pMnode, &streamObj, createStreamReq.lastTs) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
......
......@@ -582,13 +582,21 @@ int32_t streamTryExec(SStreamTask* pTask) {
}
int32_t streamTaskReleaseState(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
int32_t code = qStreamOperatorReleaseState(exec);
return code;
void* pExecutor = pTask->exec.pExecutor;
if (pExecutor != NULL) {
int32_t code = qStreamOperatorReleaseState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t streamTaskReloadState(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
int32_t code = qStreamOperatorReloadState(exec);
return code;
void* pExecutor = pTask->exec.pExecutor;
if (pExecutor != NULL) {
int32_t code = qStreamOperatorReloadState(pExecutor);
return code;
} else {
return TSDB_CODE_SUCCESS;
}
}
......@@ -461,9 +461,11 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &pInfo->taskId, sizeof(int32_t));
if (ppTask) {
ASSERT((*ppTask)->status.timerActive == 1);
if (streamTaskShouldStop(&(*ppTask)->status)) {
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr,
streamGetTaskStatusStr((*ppTask)->status.taskStatus));
const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus);
qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus);
(*ppTask)->status.timerActive = 0;
taosWUnLockLatch(&pMeta->lock);
return;
......@@ -494,24 +496,27 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
} else {
qError("s-task:0x%x failed to load task", pInfo->taskId);
}
taosMemoryFree(pInfo);
}
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId;
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &hTaskId, sizeof(hTaskId));
if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, pTask->historyTaskId.taskId);
pMeta->vgId, hTaskId);
if (pTask->timer == NULL) {
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
pInfo->taskId = pTask->id.taskId;
pInfo->pMeta = pTask->pMeta;
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
pInfo->taskId = pTask->id.taskId;
pInfo->pMeta = pTask->pMeta;
if (pTask->timer == NULL) {
pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer);
if (pTask->timer == NULL) {
// todo failed to create timer
......@@ -519,6 +524,10 @@ int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
pTask->status.timerActive = 1; // timer is active
qDebug("s-task:%s set time active flag", pTask->id.idStr);
}
} else { // timer exists
pTask->status.timerActive = 1;
qDebug("s-task:%s set time active flag", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->timer);
}
// try again in 500ms
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册