提交 c411b9ae 编写于 作者: L Liu Jicong

fix(stream): memory leak

上级 bb0ce395
...@@ -226,11 +226,36 @@ typedef struct { ...@@ -226,11 +226,36 @@ typedef struct {
int32_t nodeId; int32_t nodeId;
int32_t childId; int32_t childId;
int32_t taskId; int32_t taskId;
int64_t checkpointVer; // int64_t checkpointVer;
int64_t processedVer; // int64_t processedVer;
SEpSet epSet; SEpSet epSet;
} SStreamChildEpInfo; } SStreamChildEpInfo;
typedef struct {
int32_t nodeId;
int32_t childId;
int64_t stateSaveVer;
int64_t stateProcessedVer;
} SStreamCheckpointInfo;
typedef struct {
int64_t streamId;
int64_t checkTs;
int32_t checkpointId; // incremental
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamMultiVgCheckpointInfo;
typedef struct {
int32_t taskId;
int32_t checkpointId; // incremental
} SStreamCheckpointKey;
typedef struct {
int32_t taskId;
SArray* checkpointVer;
} SStreamRecoveringState;
typedef struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
...@@ -256,6 +281,8 @@ typedef struct SStreamTask { ...@@ -256,6 +281,8 @@ typedef struct SStreamTask {
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// exec // exec
STaskExec exec; STaskExec exec;
...@@ -445,6 +472,7 @@ typedef struct { ...@@ -445,6 +472,7 @@ typedef struct {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tFreeStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
...@@ -468,6 +496,7 @@ typedef struct SStreamMeta { ...@@ -468,6 +496,7 @@ typedef struct SStreamMeta {
TTB* pTaskDb; TTB* pTaskDb;
TTB* pStateDb; TTB* pStateDb;
SHashObj* pTasks; SHashObj* pTasks;
SHashObj* pRecoveringState;
void* ahandle; void* ahandle;
TXN txn; TXN txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
......
...@@ -859,8 +859,10 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -859,8 +859,10 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
goto FAIL; goto FAIL;
} }
tDecoderClear(&decoder);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
......
...@@ -136,6 +136,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* ...@@ -136,6 +136,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
pRsp->pCont = buf; pRsp->pCont = buf;
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
tmsgSendRsp(pRsp); tmsgSendRsp(pRsp);
tFreeStreamDispatchReq(pReq);
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
} }
......
...@@ -62,6 +62,12 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { ...@@ -62,6 +62,12 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0; return 0;
} }
void tFreeStreamDispatchReq(SStreamDispatchReq* pReq) {
taosArrayDestroyP(pReq->data, taosMemoryFree);
taosArrayDestroy(pReq->dataLen);
taosMemoryFree(pReq);
}
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) { int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "streamInc.h" #include "streamInc.h"
static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
void* exec = pTask->exec.executor; void* exec = pTask->exec.executor;
// set input // set input
...@@ -82,14 +82,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -82,14 +82,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
return 0; return 0;
} }
#if 0
static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) { static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) {
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
int32_t childId = pBlock->childId; int32_t childId = pBlock->childId;
int64_t ver = pBlock->sourceVer; int64_t ver = pBlock->sourceVer;
SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId); SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId);
pChildInfo->processedVer = ver; /*pChildInfo-> = ver;*/
return 0; return 0;
} }
#endif
int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
...@@ -198,6 +200,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -198,6 +200,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
streamFreeQitem(data);
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) { if (qRes == NULL) {
......
...@@ -87,63 +87,95 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp ...@@ -87,63 +87,95 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
return 0; return 0;
} }
typedef struct { int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
int32_t vgId; if (tEncodeI32(pEncoder, pCheckpoint->nodeId) < 0) return -1;
int32_t childId;
int64_t ver;
} SStreamVgVerCheckpoint;
int32_t tEncodeSStreamVgVerCheckpoint(SEncoder* pEncoder, const SStreamVgVerCheckpoint* pCheckpoint) {
if (tEncodeI32(pEncoder, pCheckpoint->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1; if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->ver) < 0) return -1; if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSStreamVgVerCheckpoint(SDecoder* pDecoder, SStreamVgVerCheckpoint* pCheckpoint) { int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
if (tDecodeI32(pDecoder, &pCheckpoint->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pCheckpoint->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->ver) < 0) return -1; if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
return 0; return 0;
} }
typedef struct { int32_t tEncodeSStreamMultiVgCheckpointInfo(SEncoder* pEncoder, const SStreamMultiVgCheckpointInfo* pCheckpoint) {
int64_t streamId;
int64_t checkTs;
int64_t checkpointId;
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamVgCheckpointVer>
} SStreamAggVerCheckpoint;
int32_t tEncodeSStreamAggVerCheckpoint(SEncoder* pEncoder, const SStreamAggVerCheckpoint* pCheckpoint) {
if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1; if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pCheckpoint->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1;
int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer); int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer);
if (tEncodeI32(pEncoder, sz) < 0) return -1; if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SStreamVgVerCheckpoint* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i); SStreamCheckpointInfo* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i);
if (tEncodeSStreamVgVerCheckpoint(pEncoder, pOneVgCkpoint) < 0) return -1; if (tEncodeSStreamCheckpointInfo(pEncoder, pOneVgCkpoint) < 0) return -1;
} }
return 0; return 0;
} }
int32_t tDecodeSStreamAggVerCheckpoint(SDecoder* pDecoder, SStreamAggVerCheckpoint* pCheckpoint) { int32_t tDecodeSStreamMultiVgCheckpointInfo(SDecoder* pDecoder, SStreamMultiVgCheckpointInfo* pCheckpoint) {
if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1; if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pCheckpoint->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1;
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SStreamVgVerCheckpoint oneVgCheckpoint; SStreamCheckpointInfo oneVgCheckpoint;
if (tDecodeSStreamVgVerCheckpoint(pDecoder, &oneVgCheckpoint) < 0) return -1; if (tDecodeSStreamCheckpointInfo(pDecoder, &oneVgCheckpoint) < 0) return -1;
taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint); taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint);
} }
return 0; return 0;
} }
int32_t streamCheckSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
int32_t sz = taosArrayGetSize(pTask->checkpointInfo);
SStreamMultiVgCheckpointInfo checkpoint;
checkpoint.checkpointId = 0;
checkpoint.checkTs = taosGetTimestampMs();
checkpoint.streamId = pTask->streamId;
checkpoint.taskId = pTask->taskId;
checkpoint.checkpointVer = pTask->checkpointInfo;
int32_t len;
int32_t code;
tEncodeSize(tEncodeSStreamMultiVgCheckpointInfo, &checkpoint, len, code);
if (code < 0) {
return -1;
}
buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, len);
tEncodeSStreamMultiVgCheckpointInfo(&encoder, &checkpoint);
tEncoderClear(&encoder);
SStreamCheckpointKey key = {
.taskId = pTask->taskId,
.checkpointId = checkpoint.checkpointId,
};
if (tdbTbUpsert(pMeta->pStateDb, &key, sizeof(SStreamCheckpointKey), buf, len, &pMeta->txn) < 0) {
ASSERT(0);
goto FAIL;
}
taosMemoryFree(buf);
return 0;
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
}
int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
// load status // load status
...@@ -154,9 +186,39 @@ int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -154,9 +186,39 @@ int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
} }
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, pVal, vLen); tDecoderInit(&decoder, pVal, vLen);
SStreamAggVerCheckpoint aggCheckpoint; SStreamMultiVgCheckpointInfo aggCheckpoint;
tDecodeSStreamAggVerCheckpoint(&decoder, &aggCheckpoint); tDecodeSStreamMultiVgCheckpointInfo(&decoder, &aggCheckpoint);
/*pTask->*/ tDecoderClear(&decoder);
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
return 0;
}
int32_t streamCheckAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// save and copy state
// save state info
return 0;
}
int32_t streamRecoverAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
// try recover sink level
// after all sink level recovered, choose current state backend to recover
return 0;
}
int32_t streamCheckSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
// try recover agg level
//
return 0;
}
int32_t streamRecoverSourceLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE);
return 0; return 0;
} }
......
...@@ -34,7 +34,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) ...@@ -34,7 +34,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo)
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1; /*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
return 0; return 0;
} }
...@@ -43,7 +43,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { ...@@ -43,7 +43,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1; /*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册