diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 227ad1adefb1e3da5cd2475c3116bd76ca4d0e0d..265e6dbc13aba2d8baf8880da3ef40c76ea32c0a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -23,8 +23,8 @@ static int32_t tqInitialize(STQ* pTq); static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } -static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;} -static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;} +static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } +static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } int32_t tqInit() { int8_t old; @@ -78,7 +78,7 @@ static void destroyTqHandle(void* data) { taosMemoryFreeClear(pData->execHandle.execTb.qmsg); nodesDestroyNode(pData->execHandle.execTb.node); } - if(pData->msg != NULL) { + if (pData->msg != NULL) { rpcFreeCont(pData->msg->pCont); taosMemoryFree(pData->msg); pData->msg = NULL; @@ -240,14 +240,15 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever); + tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, + ever); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); + tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, + dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -263,8 +264,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, - vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId, + pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -336,8 +337,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); if (pHandle == NULL) { - tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, - pOffset->subKey); + tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -353,7 +353,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) } taosRUnLockLatch(&pTq->lock); - //3. check the offset info + // 3. check the offset info STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); if (pSavedOffset != NULL) { if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { @@ -381,7 +381,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version); } else { - tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version); + tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); } if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { @@ -423,7 +423,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SMqPollReq req = {0}; - int code = 0; + int code = 0; if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); terrno = TSDB_CODE_INVALID_MSG; @@ -449,7 +449,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // 2. check re-balance status if (pHandle->consumerId != consumerId) { - tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + tqError("ERROR tmq poll: consumer:0x%" PRIx64 + " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; taosWUnLockLatch(&pTq->lock); @@ -457,22 +458,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } bool exec = tqIsHandleExec(pHandle); - if(!exec) { + if (!exec) { tqSetHandleExec(pHandle); -// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, + req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; } taosWUnLockLatch(&pTq->lock); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 + "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", + consumerId, vgId, req.subKey, pHandle); taosMsleep(10); } // 3. update the epoch value if (pHandle->epoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, reqEpoch); + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, pHandle->epoch, + reqEpoch); pHandle->epoch = reqEpoch; } @@ -484,7 +489,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, + req.subKey, pHandle); return code; } @@ -548,7 +554,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { if (reqOffset.type == TMQ_OFFSET__LOG) { int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly + if (currentVer == -1) { // not start to read data from wal yet, return req offset directly dataRsp.rspOffset.version = reqOffset.version; } else { dataRsp.rspOffset.version = currentVer; // return current consume offset value @@ -572,7 +578,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = TD_VID(pTq->pVnode); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; @@ -581,7 +587,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); + tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + pHandle->subKey, pHandle); taosMsleep(10); } @@ -641,9 +648,9 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int ret = 0; + int ret = 0; SMqRebVgReq req = {0}; - SDecoder dc = {0}; + SDecoder dc = {0}; tDecoderInit(&dc, msg, msgLen); @@ -698,7 +705,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg);; + pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg); pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); @@ -720,7 +727,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.execTb.suid = req.suid; pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg); - if(strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { + if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) { if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) { tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(), pVnode->config.vgId, req.subKey, pHandle->consumerId); @@ -734,45 +741,47 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SArray* tbUidList = NULL; ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task); - if(ret != TDB_CODE_SUCCESS) { - tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, pHandle->consumerId); + if (ret != TDB_CODE_SUCCESS) { + tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey, + pHandle->consumerId); taosArrayDestroy(tbUidList); goto end; } - tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); + tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64, + pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid); pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, - pHandle->consumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { taosWLockLatch(&pTq->lock); if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, req.newConsumerId); + tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs, should not reach here", req.vgId, + req.newConsumerId); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); } -// atomic_add_fetch_32(&pHandle->epoch, 1); + // atomic_add_fetch_32(&pHandle->epoch, 1); // kill executing task -// if(tqIsHandleExec(pHandle)) { -// qTaskInfo_t pTaskInfo = pHandle->execHandle.task; -// if (pTaskInfo != NULL) { -// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); -// } - -// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { -// qStreamCloseTsdbReader(pTaskInfo); -// } -// } + // if(tqIsHandleExec(pHandle)) { + // qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + // if (pTaskInfo != NULL) { + // qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + // } + + // if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + // qStreamCloseTsdbReader(pTaskInfo); + // } + // } // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); taosWUnLockLatch(&pTq->lock); @@ -780,14 +789,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } end: - taosWUnLockLatch(&pTq->lock); tDecoderClear(&dc); return ret; } -void freePtr(void *ptr) { - taosMemoryFree(*(void**)ptr); -} +void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); @@ -810,7 +816,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->chkInfo.currentVer = ver; // expand executor - pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; + pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -876,8 +882,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, - pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, + pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); // next valid version will add one pTask->chkInfo.version += 1; @@ -990,7 +996,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, + (int32_t)sizeof(SStreamTask)); return -1; } @@ -1105,7 +1112,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t // do recovery step 2 int64_t st = taosGetTimestampMs(); - tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st); code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { @@ -1113,7 +1120,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion); + qDebug("s-task:%s set the start wal offset to be:%" PRId64, pTask->id.idStr, sversion); walReaderSeekVer(pTask->exec.pWalReader, sversion); pTask->chkInfo.currentVer = sversion; @@ -1137,7 +1144,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - double el = (taosGetTimestampMs() - st)/ 1000.0; + double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task @@ -1253,8 +1260,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, - pTask->id.idStr, pTask->chkInfo.version); + tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.version); streamProcessRunReq(pTask); } else { if (streamTaskShouldPause(&pTask->status)) { @@ -1273,9 +1280,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamDispatchReq req = {0}; @@ -1321,7 +1328,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 146552b0580e8ffb7db102eea23459915d70a762..6d17672513674d5c7d97ec6a2a84502f405dbd76 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +// clang-format off + #include "executorInt.h" #include "filter.h" #include "function.h" @@ -2444,6 +2446,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys int32_t len = 0; pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len); streamScanOperatorDecode(buff, len, pInfo); + taosMemoryFree(buff); } setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, @@ -3458,3 +3461,5 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); } + +// clang-format on diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f531f655659bd5fc70b822b03e0cf7234dbb4ce2..54832b042a718a3d855e0cbee874d6cb18713fab 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -16,12 +16,12 @@ #include "tstreamFileState.h" #include "query.h" +#include "storageapi.h" #include "streamBackendRocksdb.h" #include "taos.h" #include "tcommon.h" #include "thash.h" #include "tsimplehash.h" -#include "storageapi.h" #define FLUSH_RATIO 0.5 #define FLUSH_NUM 4 @@ -420,6 +420,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { return TSDB_CODE_FAILED; } sscanf(val, "%" PRId64 "", &maxCheckPointId); + taosMemoryFree(val); } for (int64_t i = maxCheckPointId; i > 0; i--) { char buf[128] = {0}; @@ -435,9 +436,11 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { if (ts < mark) { // statekey winkey.ts < mark forceRemoveCheckpoint(pFileState, i); + taosMemoryFreeClear(val); break; } else { } + taosMemoryFree(val); } return code; }