提交 502efb3d 编写于 作者: L liuyao

op stream selectivity buff

上级 4c64260a
......@@ -82,7 +82,7 @@ typedef struct STuplePos {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
SWinKey streamTupleKey;
};
} STuplePos;
......
......@@ -77,9 +77,8 @@ typedef struct {
int64_t number;
} SStreamStateCur;
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
......
......@@ -38,8 +38,8 @@ typedef SList SStreamSnapshot;
typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
TSKEY delMark);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
......@@ -56,6 +56,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
#ifdef __cplusplus
}
......
......@@ -2675,6 +2675,29 @@ TSKEY compareTs(void* pKey) {
return pWinKey->ts;
}
int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) {
if (pCtx->subsidiaries.rowLen == 0) {
int32_t rowLen = 0;
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
rowLen += pc->pExpr->base.resSchema.bytes;
}
return rowLen + pCtx->subsidiaries.num * sizeof(bool);
} else {
return pCtx->subsidiaries.rowLen;
}
}
int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
int32_t size = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
size = TMAX(size, resSize);
}
return size;
}
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -2721,8 +2744,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
pInfo->pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -2731,10 +2757,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->numOfChild = numOfChild;
......@@ -2767,7 +2789,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize,
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
pInfo->dataVersion = 0;
......@@ -4886,9 +4909,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pTaskInfo->streamInfo.pState);
pInfo->pState);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -4909,10 +4936,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
*(pInfo->pState) = *(pTaskInfo->streamInfo.pState);
streamStateSetNumber(pInfo->pState, -1);
pInfo->pPhyNode = NULL; // create new child
pInfo->pPullDataMap = NULL;
pInfo->pPullWins = NULL; // SPullWindowInfo
......@@ -4925,7 +4948,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize,
int32_t funResSize= getMaxFunResSize(pSup, numOfCols);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
......
......@@ -881,10 +881,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
}
pStart += pDstCol->info.bytes;
}
if (pCtx->saveHandle.pState) {
streamFreeVal((void*)p);
}
}
return TSDB_CODE_SUCCESS;
......@@ -3121,7 +3117,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf;
}
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key,
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key,
STuplePos* pPos) {
STuplePos p = {0};
if (pHandle->pBuf != NULL) {
......@@ -3169,7 +3165,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
prepareBuf(pCtx);
STupleKey key;
SWinKey key;
if (pCtx->saveHandle.pBuf == NULL) {
SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0);
if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
......@@ -3177,7 +3173,6 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
key.groupId = pSrcBlock->info.id.groupId;
key.ts = skey;
key.exprIdx = pCtx->exprIdx;
}
}
......
......@@ -262,26 +262,30 @@ int32_t streamStateCommit(SStreamState* pState) {
#endif
}
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) {
int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateFuncPut_rocksdb(pState, key, value, vLen);
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
memcpy(buf + len - rowSize, value, vLen);
return code;
#else
return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn);
#endif
}
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) {
int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen);
#else
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen);
#endif
}
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) {
#ifdef USE_ROCKSDB
return streamStateFuncDel_rocksdb(pState, key);
void* pVal = NULL;
int32_t len = 0;
int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len);
char* buf = ((SRowBuffPos*)pVal)->pRowBuff;
uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState);
*ppVal = buf + len - rowSize;
return code;
#else
return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn);
return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen);
#endif
}
......
......@@ -31,6 +31,7 @@ struct SStreamFileState {
SSHashObj* rowBuffMap;
void* pFileStore;
int32_t rowSize;
int32_t selectivityRowSize;
int32_t keyLen;
uint64_t preCheckPointVersion;
uint64_t checkPointVersion;
......@@ -44,7 +45,7 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile,
TSKEY delMark) {
if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
......@@ -57,6 +58,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState) {
goto _error;
}
rowSize += selectRowSize;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->usedBuffs = tdListNew(POINTER_BYTES);
pFileState->freeBuffs = tdListNew(POINTER_BYTES);
......@@ -68,11 +70,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
}
pFileState->keyLen = keySize;
pFileState->rowSize = rowSize;
pFileState->selectivityRowSize = selectRowSize;
pFileState->preCheckPointVersion = 0;
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->getTs = fp;
pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = INT64_MIN;
......@@ -440,7 +442,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
if (pFileState->maxTs != INT64_MIN) {
deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
}
void* pStVal = NULL;
int32_t len = 0;
......@@ -475,4 +479,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) {
streamStateFreeCur(pCur);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
}
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) {
return pFileState->selectivityRowSize;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册