提交 7f16081e 编写于 作者: 5 54liuyao

feat(stream): stream task and meta

上级 05bf2bc4
...@@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) ...@@ -137,6 +137,8 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamDispatch(pTask); streamDispatch(pTask);
} }
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
} }
} }
......
...@@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { ...@@ -262,12 +262,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) {
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur);
return -1; return -1;
} }
if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
tdbTbcClose(pCur);
return -1; return -1;
} }
} }
......
...@@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -119,7 +119,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
for (int32_t i = 0; i < epSz; i++) { for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1; if (pInfo == NULL) return -1;
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) return -1; if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->childEpInfo, &pInfo); taosArrayPush(pTask->childEpInfo, &pInfo);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册