diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2bc67e439f5da6d9873730acaf2a108eeb4fce8b..51d31ca91fd19691bad03f09b4dbc852affe174e 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -82,7 +82,7 @@ typedef struct STuplePos { int32_t pageId; int32_t offset; }; - STupleKey streamTupleKey; + SWinKey streamTupleKey; }; } STuplePos; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 9b80ce27860a639a8a406ee886004f586940afec..0de091bc4e39633fcbaa2ef9f35ee869407a5bff 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -77,9 +77,8 @@ typedef struct { int64_t number; } SStreamStateCur; -int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); -int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); +int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index d50f0e0a312017cd76ee2b9aee52c74b0f7c5f65..7124e2d2517c10de4b837f1dcfbc686988e9760c 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -38,8 +38,8 @@ typedef SList SStreamSnapshot; typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, - TSKEY delMark); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, + GetTsFun fp, void* pFile, TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState); @@ -56,6 +56,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); +int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); #ifdef __cplusplus } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ca3165bf92bd6ce6faa6905a8c9eb541bb3a77b2..7570e3c1123f08a88da4f9f66e86054156d993df 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2675,6 +2675,29 @@ TSKEY compareTs(void* pKey) { return pWinKey->ts; } +int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) { + if (pCtx->subsidiaries.rowLen == 0) { + int32_t rowLen = 0; + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + rowLen += pc->pExpr->base.resSchema.bytes; + } + + return rowLen + pCtx->subsidiaries.num * sizeof(bool); + } else { + return pCtx->subsidiaries.rowLen; + } +} + +int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { + int32_t size = 0; + for (int32_t i = 0; i < numOfCols; ++i) { + int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); + size = TMAX(size, resSize); + } + return size; +} + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -2721,8 +2744,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); + pInfo->pState); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2731,10 +2757,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - streamStateSetNumber(pInfo->pState, -1); - initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->numOfChild = numOfChild; @@ -2767,7 +2789,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; @@ -4886,9 +4909,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); + pInfo->pState); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4909,10 +4936,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - streamStateSetNumber(pInfo->pState, -1); - pInfo->pPhyNode = NULL; // create new child pInfo->pPullDataMap = NULL; pInfo->pPullWins = NULL; // SPullWindowInfo @@ -4925,7 +4948,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + int32_t funResSize= getMaxFunResSize(pSup, numOfCols); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af318a6bc5326e57211189938d7f9b81c042922d..e50eb0b7847068ad4d3e52e0eea175279dc9687b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -881,10 +881,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu } pStart += pDstCol->info.bytes; } - - if (pCtx->saveHandle.pState) { - streamFreeVal((void*)p); - } } return TSDB_CODE_SUCCESS; @@ -3121,7 +3117,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key, +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key, STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { @@ -3169,7 +3165,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { prepareBuf(pCtx); - STupleKey key; + SWinKey key; if (pCtx->saveHandle.pBuf == NULL) { SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { @@ -3177,7 +3173,6 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* key.groupId = pSrcBlock->info.id.groupId; key.ts = skey; - key.exprIdx = pCtx->exprIdx; } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fa9bfe6ca5ee93a75a23419e3ae5c1d303c02d59..a607d6af74fdcec4dd0ee3e57dd3bc151e2e6338 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -262,26 +262,30 @@ int32_t streamStateCommit(SStreamState* pState) { #endif } -int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { +int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - return streamStateFuncPut_rocksdb(pState, key, value, vLen); + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + memcpy(buf + len - rowSize, value, vLen); + return code; #else return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn); #endif } -int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { +int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen); -#else - return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); -#endif -} - -int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { -#ifdef USE_ROCKSDB - return streamStateFuncDel_rocksdb(pState, key); + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + *ppVal = buf + len - rowSize; + return code; #else - return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn); + return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen); #endif } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b7401ec5d97a3f73733483828c6977ce4d636b62..1b62b80b3de1a2b86561489c2468fa22ca918cb2 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -31,6 +31,7 @@ struct SStreamFileState { SSHashObj* rowBuffMap; void* pFileStore; int32_t rowSize; + int32_t selectivityRowSize; int32_t keyLen; uint64_t preCheckPointVersion; uint64_t checkPointVersion; @@ -44,7 +45,7 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; @@ -57,6 +58,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ if (!pFileState) { goto _error; } + rowSize += selectRowSize; pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); @@ -68,11 +70,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ } pFileState->keyLen = keySize; pFileState->rowSize = rowSize; + pFileState->selectivityRowSize = selectRowSize; pFileState->preCheckPointVersion = 0; pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; pFileState->getTs = fp; - pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = INT64_MIN; @@ -440,7 +442,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; - deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); + if (pFileState->maxTs != INT64_MIN) { + deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); + } void* pStVal = NULL; int32_t len = 0; @@ -475,4 +479,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { streamStateFreeCur(pCur); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} + +int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { + return pFileState->selectivityRowSize; +}