未验证 提交 379cdc05 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15889 from taosdata/feature/stream

fix(stream): memory error
......@@ -42,8 +42,8 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__PREPARE_RECOVER,
TASK_STATUS__RECOVERING,
TASK_STATUS__RECOVER_DOWNSTREAM,
TASK_STATUS__RECOVER_SELF,
};
enum {
......@@ -232,8 +232,8 @@ typedef struct {
} SStreamChildEpInfo;
typedef struct {
int32_t nodeId;
int32_t childId;
int32_t srcNodeId;
int32_t srcChildId;
int64_t stateSaveVer;
int64_t stateProcessedVer;
} SStreamCheckpointInfo;
......@@ -394,9 +394,6 @@ typedef struct {
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
#if 0
int64_t sourceVer;
#endif
int32_t blockNum;
SArray* dataLen; // SArray<int32_t>
SArray* data; // SArray<SRetrieveTableRsp*>
......@@ -426,6 +423,7 @@ typedef struct {
int32_t rspToTaskId;
} SStreamRetrieveRsp;
#if 0
typedef struct {
int64_t streamId;
int32_t taskId;
......@@ -462,13 +460,25 @@ int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
#endif
typedef struct {
int64_t streamId;
} SPStreamTaskRecoverReq;
int32_t downstreamTaskId;
int32_t taskId;
} SStreamRecoverDownstreamReq;
typedef struct {
int8_t reserved;
} SPStreamTaskRecoverRsp;
int64_t streamId;
int32_t downstreamTaskId;
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
......@@ -479,8 +489,6 @@ int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
......
......@@ -504,6 +504,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
#if 0
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
if (pReq == NULL) {
......@@ -540,7 +541,6 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
#if 0
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks);
......
......@@ -777,6 +777,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
}
}
#if 0
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
......@@ -789,25 +790,26 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
streamProcessRecoverRsp(pTask, pRsp);
return 0;
} else {
return -1;
}
}
#endif
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
streamProcessDispatchRsp(pTask, pRsp);
return 0;
} else {
return -1;
......@@ -873,6 +875,8 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp, false);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
return;
}
......@@ -883,4 +887,6 @@ FAIL:
.info = pMsg->info,
};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
......@@ -334,14 +334,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_STREAM_TASK_DISPATCH:
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_RECOVER:
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
/*case TDMT_STREAM_TASK_RECOVER:*/
/*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
/*case TDMT_STREAM_TASK_RECOVER_RSP:*/
/*return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);*/
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
default:
......
......@@ -229,6 +229,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
return 0;
}
#if 0
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
......@@ -267,6 +268,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
return 0;
}
#endif
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
......
......@@ -200,14 +200,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId);
streamFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) {
// TODO log failed ver
streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes);
streamFreeQitem(data);
return -1;
}
qRes->type = STREAM_INPUT__DATA_BLOCK;
......@@ -224,10 +223,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
streamFreeQitem(data);
return -1;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/
} else {
taosArrayDestroy(pRes);
}
streamFreeQitem(data);
}
return 0;
}
......
......@@ -15,6 +15,7 @@
#include "streamInc.h"
#if 0
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
......@@ -86,17 +87,18 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
tEndDecode(pDecoder);
return 0;
}
#endif
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
if (tEncodeI32(pEncoder, pCheckpoint->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->srcChildId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
if (tDecodeI32(pDecoder, &pCheckpoint->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->srcNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->srcChildId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
return 0;
}
......@@ -221,11 +223,17 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0;
}
int32_t streamFetchSinkStatus(SStreamTask* pTask) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
int32_t streamFetchDownstreamStatus(SStreamTask* pTask) {
// set self status to recover_phase1
// build fetch status msg
// send fetch msg
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
} else {
ASSERT(0);
}
return 0;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册