From 95d81604c75878fce5776e9cef1dcd43abc2f9b6 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 11 Apr 2023 10:33:10 +0800 Subject: [PATCH] feat:encode&&decode stream scan op --- include/libs/executor/executor.h | 4 +++ include/libs/stream/streamState.h | 1 + source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executor.c | 6 ++++ source/libs/executor/src/scanoperator.c | 29 +++++++++++++++++++ source/libs/executor/src/timewindowoperator.c | 7 +++-- source/libs/stream/src/streamState.c | 2 ++ 7 files changed, 48 insertions(+), 4 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index c3d2010351..e3ac462e9b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -80,6 +80,10 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); + +// todo refactor +void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId); + /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 62b555d437..86e4144687 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -59,6 +59,7 @@ typedef struct { SStreamFileState* pFileState; int32_t number; SSHashObj* parNameMap; + int64_t checkPointId; } SStreamState; SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0a0df58d4d..cbfd78429e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -143,7 +143,8 @@ typedef struct { int64_t fillHistoryVer1; int64_t fillHistoryVer2; int64_t dataVersion; - SStreamState* pState; + SStreamState* pState; + int64_t checkPointId; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ac78ddc23c..55f87162e6 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -176,6 +176,12 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { return code; } +void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) { + SExecTaskInfo* pTaskInfo = tinfo; + *dataVer = pTaskInfo->streamInfo.dataVersion; + *ckId = pTaskInfo->streamInfo.checkPointId; +} + int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 35448a9e07..0a688bd67c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1742,6 +1742,26 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl } } +int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { + int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); + *pBuff = taosMemoryCalloc(1, len); + updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo); + return len; +} + +// other properties are recovered from the execution plan +void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) { + if (!pBuff) { + return; + } + + SUpdateInfo *pUpInfo = updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0); + int32_t code = updateInfoDeserialize(pBuff, len, pUpInfo); + if (code == TSDB_CODE_SUCCESS) { + pInfo->pUpdateInfo = pUpInfo; + } +} + static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2000,6 +2020,10 @@ FETCH_NEXT_BLOCK: updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); qDebug("stream scan return empty, consume block %d", totBlockNum); + void* buff = NULL; + int32_t len = streamScanOperatorEncode(pInfo, &buff); + //todo(liuyao) save buff + taosMemoryFreeClear(buff); return NULL; } @@ -2419,6 +2443,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; + //todo(liuyao) get buff from rocks db; + void* buff = NULL; + int32_t len = 0; + streamScanOperatorDeocde(buff, len, pInfo); + setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fcdc70788d..e086328741 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2409,8 +2409,9 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) { +static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { pTaskInfo->streamInfo.dataVersion = version; + pTaskInfo->streamInfo.checkPointId = ckId; } static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, @@ -2604,7 +2605,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, @@ -4870,7 +4871,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } return NULL; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index a68e16c0e6..537ebb31d7 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -203,6 +203,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int } pState->pTdbState->pOwner = pTask; + pState->checkPointId = 0; return pState; @@ -266,6 +267,7 @@ int32_t streamStateCommit(SStreamState* pState) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } + pState->checkPointId++; return 0; #endif } -- GitLab