diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ae84299c1c02d6cc7c1f1b30ac07bbf621dc294f..b61faf957f4a219d90a91d699bc97905fb359a37 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -88,6 +88,8 @@ int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal); int32_t streamStateDel(SStreamState* pState, const SWinKey* key); int32_t streamStateClear(SStreamState* pState); void streamStateSetNumber(SStreamState* pState, int32_t number); +int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); +int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index bc19572394418f67e35f23c4f721a889972f8171..49623dc61979587990071b17ac0539d8ec5dcc62 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -36,6 +36,7 @@ int32_t scanDebug = 0; #define MULTI_READER_MAX_TABLE_NUM 5000 #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) +#define STREAM_SCAN_OP_NAME "StreamScanOperator" typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; @@ -1771,7 +1772,7 @@ int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { // other properties are recovered from the execution plan void streamScanOperatorDeocde(void* pBuff, int32_t len, SStreamScanInfo* pInfo) { - if (!pBuff) { + if (!pBuff || len == 0) { return; } @@ -2054,10 +2055,12 @@ FETCH_NEXT_BLOCK: updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); doClearBufferedBlocks(pInfo); qDebug("stream scan return empty, consume block %d", totBlockNum); - // void* buff = NULL; - // int32_t len = streamScanOperatorEncode(pInfo, &buff); - // todo(liuyao) save buff - // taosMemoryFreeClear(buff); + void* buff = NULL; + int32_t len = streamScanOperatorEncode(pInfo, &buff); + if (len > 0) { + streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), buff, len); + } + taosMemoryFreeClear(buff); return NULL; } @@ -2484,12 +2487,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->twAggSup.maxTs = INT64_MIN; pInfo->pState = NULL; - // todo(liuyao) get buff from rocks db; void* buff = NULL; int32_t len = 0; + streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len); streamScanOperatorDeocde(buff, len, pInfo); - setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, + setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 2a0bfe4856c3ea04e487d18d1239f3c96314d9cc..f6f4a75cd97f62c50d89ffef61477f3fe0836e69 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2846,6 +2846,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; + pScanInfo->pState = pAggSup->pState; if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fee65d6c58e95097f3d6e7614252da97f67934a5..9e49ae9e9784290f98a70c447d76d1136f78c312 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -395,6 +395,32 @@ int32_t streamStateClear(SStreamState* pState) { void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; } +int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen) { +#ifdef USE_ROCKSDB + int32_t code = 0; + void* batch = streamStateCreateBatch(); + code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); + if (code != 0) { + return code; + } + code = streamStatePutBatch_rocksdb(pState, batch); + streamStateDestroyBatch(batch); + return code; +#else + return 0; +#endif +} + +int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen) { +#ifdef USE_ROCKSDB + int32_t code = 0; + code = streamDefaultGet_rocksdb(pState, pKey, pVal, pLen); + return code; +#else + return 0; +#endif +} + int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB return streamStateGet(pState, key, pVal, pVLen); @@ -1066,7 +1092,7 @@ void streamStateDestroy(SStreamState* pState) { #ifdef USE_ROCKSDB streamFileStateDestroy(pState->pFileState); streamStateDestroy_rocksdb(pState); - taosMemoryFreeClear(pState->parNameMap); + tSimpleHashCleanup(pState->parNameMap); // do nothong #endif taosMemoryFreeClear(pState->pTdbState); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 70a1c543f648104064c311084b5d634517be4c2e..fff666ec9f3f90bd60d921a011fbd8925c52696f 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -274,7 +274,10 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { } int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) { - ASSERT(pInfo); + if(!pInfo) { + return 0; + } + SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; diff --git a/tests/script/tsim/stream/basic2.sim b/tests/script/tsim/stream/basic2.sim index 20e8c953912e07dd469d6eaf042cb28bdb45d436..8d0df2697be8a2ffceea69701e78c20fae4efff1 100644 --- a/tests/script/tsim/stream/basic2.sim +++ b/tests/script/tsim/stream/basic2.sim @@ -48,23 +48,34 @@ sleep 100 #=================================================================== print =============== query data from child table +$loop_count = 0 + +loop0: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb print rows: $rows print $data00 $data01 $data02 $data03 if $rows != 1 then - return -1 + goto loop0 endi if $data01 != 234 then - return -1 + goto loop0 endi if $data02 != 234 then - return -1 + goto loop0 endi if $data03 != 234 then - return -1 + goto loop0 endi #=================================================================== @@ -77,36 +88,47 @@ sleep 100 #=================================================================== print =============== query data from child table +$loop_count = 0 + +loop1: + +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + sql select `_wstart`,`min(k)`,`max(k)`,sum_alias from outstb print rows: $rows print $data00 $data01 $data02 $data03 print $data10 $data11 $data12 $data13 if $rows != 2 then - return -1 + goto loop1 endi if $data01 != 234 then - return -1 + goto loop1 endi if $data02 != 234 then - return -1 + goto loop1 endi if $data03 != 234 then - return -1 + goto loop1 endi if $data11 != -111 then - return -1 + goto loop1 endi if $data12 != -111 then - return -1 + goto loop1 endi if $data13 != -111 then - return -1 + goto loop1 endi system sh/exec.sh -n dnode1 -s stop -x SIGINT