提交 1892b806 编写于 作者: 5 54liuyao

feat:add stream state buff

上级 429b5cd6
......@@ -167,6 +167,7 @@ extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......@@ -81,6 +81,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
bool streamStateCheck(SStreamState* pState, const SWinKey* key);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal);
int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number);
......@@ -35,13 +35,16 @@ typedef struct SRowBuffPos {
typedef SList SStreamSnapshot;
typedef bool (*ExpiredFun)(void*, TSKEY);
typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile);
void destroyStreamFileState(SStreamFileState* pFileState);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize);
......@@ -197,6 +197,7 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf
char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsCheckpointInterval = 24 * 60 * 60 * 1000;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......@@ -497,7 +498,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
if (cfgAddBool(pCfg, "streamBufferSize", tsStreamBufferSize, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, 0) != 0) return -1;
return 0;
......@@ -860,13 +860,9 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
......@@ -2694,144 +2694,12 @@ int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult,
// taosMemoryFree(buf);
int32_t getOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow** pResult, int32_t* resSize) {
char* pVal = NULL;
int32_t size = 0;
int32_t code = streamStateGet(pState, pKey, (void**)&pVal, &size);
if (code != 0) {
return 0;
*pResult = (SResultRow*)pVal;
// memcpy((char*)*pResult, (char*)pVal, size);
// int tlen = resultRowDecode((void**)pResult, size, pVal);
*resSize = size;
return code;
int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
// qWarn("streamStateAddIfNotExist");
char* tVal = NULL;
int32_t size = 0;
int32_t code = streamStateGet(pState, key, (void**)&tVal, &size);
if (code != 0) {
*pVal = taosMemoryCalloc(1, *pVLen);
} else {
*pVal = (void*)tVal;
// resultRowDecode((void**)pVal, size, tVal);
*pVLen = size;
return 0;
int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SWinKey key = {
.ts = win->skey,
.groupId = tableGroupId,
char* value = NULL;
int32_t size = pAggSup->resultRowSize;
if (streamStateAddIfNotExist2(pState, &key, (void**)&value, &size) < 0) {
} else {
// getOutputBuf(pState, &key, (SResultRow**)&value, &size);
*pResult = (SResultRow*)value;
// set time window for current result
(*pResult)->win = (*win);
setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult) {
streamStateReleaseBuf(pState, pKey, pResult);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SWinKey* pKey = taosArrayGet(pGroupResInfo->pRows, i);
int32_t size = 0;
void* pVal = NULL;
int32_t code = getOutputBuf(pState, pKey, (SResultRow**)&pVal, &size);
// streamStateGet(pState, pKey, &pVal, &size);
ASSERT(code == 0);
SResultRow* pRow = (SResultRow*)pVal;
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseOutputBuf(pState, pKey, pRow);
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pKey->groupId) {
releaseOutputBuf(pState, pKey, pRow);
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseOutputBuf(pState, pKey, pRow);
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete,
pEnryInfo->isNullRes, pEnryInfo->numOfRes);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
pBlock->info.rows += pRow->numOfRows;
releaseOutputBuf(pState, pKey, pRow);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size) {
streamStateSessionPut(pState, key, (const void*)buf, size);
releaseOutputBuf(pState, NULL, (SResultRow*)buf);
......@@ -2919,7 +2787,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
pBlock->info.dataLoad = 1;
pBlock->info.rows += pRow->numOfRows;
// saveSessionDiscBuf(pState, pKey, pVal, size);
releaseOutputBuf(pState, NULL, pRow);
blockDataUpdateTsWindow(pBlock, 0);
......@@ -844,14 +844,15 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
static int32_t saveWinResult(int64_t ts, uint64_t groupId, SSHashObj* pUpdatedMap) {
SWinKey key = {.ts = ts, .groupId = groupId};
tSimpleHashPut(pUpdatedMap, &key, sizeof(SWinKey), NULL, 0);
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SSHashObj* pUpdatedMap) {
return saveWinResult(ts, groupId, pUpdatedMap);
static int32_t saveWinResultInfo(TSKEY ts, uint64_t groupId, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
SWinKey key = {.ts = ts, .groupId = groupId};
saveWinResult(&key, pPos, pUpdatedMap);
static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) {
......@@ -1397,7 +1398,7 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SSHashObj* resWins) {
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
uint64_t groupId = pKey->groupId;
TSKEY ts = pKey->ts;
int32_t code = saveWinResult(ts, groupId, resWins);
int32_t code = saveWinResultInfo(ts, groupId, *(SRowBuffPos**)pIte, resWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -1443,7 +1444,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
int32_t code = saveWinResultInfo(pWinKey->ts, pWinKey->groupId, closeWins);
int32_t code = saveWinResult(pWinKey, *(SRowBuffPos**)pIte, closeWins);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -1492,6 +1493,8 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
if (code == TSDB_CODE_SUCCESS) {
*key = next;
tw = getFinalTimeWindow(key->ts, pInterval);
} else {
......@@ -1604,6 +1607,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
if (pInfo->pChildren) {
......@@ -2122,7 +2126,27 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) {
return TSDB_CODE_SUCCESS == streamStateGet(pState, pKey, NULL, 0);
return streamStateCheck(pState, pKey);
int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SWinKey key = {
.ts = win->skey,
.groupId = groupId,
char* value = NULL;
int32_t size = pAggSup->resultRowSize;
if (streamStateAddIfNotExist(pState, &key, (void**)&value, &size) < 0) {
*pResult = (SRowBuffPos*)value;
SResultRow* res = (SResultRow*)((*pResult)->pRowBuff);
// set time window for current result
res-> win = (*win);
setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) {
......@@ -2135,9 +2159,10 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
for (int32_t i = 0; i < size; i++) {
SWinKey* pWinRes = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
SWinKey* pWinRes = taosArrayGet(pWinArray, i);
SRowBuffPos* pCurResPos = NULL;
SResultRow* pCurResult = NULL;
STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval);
if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) {
......@@ -2152,25 +2177,27 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S
if (num == 0) {
int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput,
int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
ASSERT(pCurResult != NULL);
ASSERT(pCurResPos != NULL);
pCurResult = (SResultRow*) pCurResPos->pRowBuff;
if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) {
SResultRow* pChResult = NULL;
setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
SRowBuffPos* pChResPos = NULL;
SResultRow* pChResult = NULL;
setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs,
pChildSup->rowEntryInfoOffset, &pChInfo->aggSup);
pChResult = (SResultRow*) pChResPos->pRowBuff;
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true);
compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
releaseOutputBuf(pChInfo->pState, pWinRes, pChResult);
if (num > 0 && pUpdatedMap) {
saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pUpdatedMap);
saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pCurResPos, pUpdatedMap);
saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pInfo->pState, pWinRes, pCurResult);
......@@ -2185,11 +2212,10 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) {
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) {
if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
if (streamStateGet(pState, &key, NULL, 0) == TSDB_CODE_SUCCESS) {
qWarn("get from dele");
return false;
if (streamStateCheck(pState, &key)) {
return true;
return true;
return false;
return false;
......@@ -2313,6 +2339,87 @@ static void clearFunctionContext(SExprSupp* pSup) {
int32_t getOutputBuf(SStreamState* pState, SRowBuffPos* pPos, SResultRow** pResult) {
return streamStateGetByPos(pState, pPos, (void**)pResult);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, i);
SResultRow* pRow = NULL;
int32_t code = getOutputBuf(pState, pPos, &pRow);
uint64_t groupId = ((SWinKey*)pPos->pKey)->groupId;
ASSERT(code == 0);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = groupId;
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != groupId) {
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo;
qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete,
pEnryInfo->isNullRes, pEnryInfo->numOfRes);
if (pCtx[j].fpSet.finalize) {
int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code1)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code1));
T_LONG_JMP(pTaskInfo->env, code1);
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
pBlock->info.rows += pRow->numOfRows;
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
void doBuildStreamIntervalResult(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -2350,6 +2457,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
TSKEY* tsCols = NULL;
SRowBuffPos* pResPos = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
......@@ -2413,8 +2521,9 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput,
int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput,
pSup->rowEntryInfoOffset, &pInfo->aggSup);
pResult = (SResultRow*) pResPos->pRowBuff;
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
......@@ -2424,28 +2533,24 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey,
SWinKey key = {
.ts = pResult->win.skey,
.groupId = groupId,
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) {
saveWinResultInfo(pResult->win.skey, groupId, pUpdatedMap);
saveWinResult(&key, pResPos, pUpdatedMap);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
SWinKey key = {
.ts = pResult->win.skey,
.groupId = groupId,
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0);
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
SWinKey key = {
.ts = nextWin.skey,
.groupId = groupId,
key.ts = nextWin.skey;
saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize);
releaseOutputBuf(pInfo->pState, &key, pResult);
if (pInfo->delKey.ts > key.ts) {
pInfo->delKey = key;
......@@ -2475,6 +2580,27 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1;
SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2;
SWinKey* pWin1 = (SWinKey*)pos1->pKey;
SWinKey* pWin2 = (SWinKey*)pos2->pKey;
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
return -1;
if (pWin1->ts > pWin2->ts) {
return 1;
} else if (pWin1->ts < pWin2->ts) {
return -1;
return 0;
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
......@@ -2536,7 +2662,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......@@ -2640,7 +2766,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->pUpdatedMap = NULL;
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
......@@ -2678,6 +2804,11 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) {
return deleteMark;
TSKEY compareTs(void* pKey) {
SWinKey* pWinKey = (SWinKey*) pKey;
return pWinKey->ts;
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
......@@ -2783,6 +2914,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState, pInfo->twAggSup.deleteMark);
pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true;
......@@ -4761,7 +4894,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (!pInfo->pUpdated) {
pInfo->pUpdated = taosArrayInit(4, sizeof(SWinKey));
pInfo->pUpdated = taosArrayInit(4, POINTER_BYTES);
if (!pInfo->pUpdatedMap) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
......@@ -4822,10 +4955,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
SWinKey* pKey = tSimpleHashGetKey(pIte, NULL);
taosArrayPush(pInfo->pUpdated, pKey);
taosArrayPush(pInfo->pUpdated, pIte);
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
......@@ -4854,11 +4986,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return NULL;
bool compareTs(void* pKey, TSKEY mark) {
SWinKey* pWinKey = (SWinKey*) pKey;
return pWinKey->ts < mark;
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo) {
SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo));
......@@ -4945,7 +5072,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, pInfo->pState);
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState, pInfo->twAggSup.deleteMark);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
......@@ -132,6 +132,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
qWarn("open stream state2, %s", statePath);
pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL;
return pState;
......@@ -297,20 +298,32 @@ int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* val
// todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen);
// return streamStateGet_rocksdb(pState, key, pVal, pVLen);
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
return hasRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey));
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
return getRowBuffByPos(pState->pFileState, pos, pVal);
// todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
return streamStateDel_rocksdb(pState, key);
return deleteRowBuff(pState->pFileState, key, sizeof(SWinKey));
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
......@@ -346,6 +359,7 @@ int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear(SStreamState* pState) {
return streamStateClear_rocksdb(pState);
SWinKey key = {.ts = 0, .groupId = 0};
......@@ -369,7 +383,7 @@ void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
return streamStateGet(pState, key, pVal, pVLen);
// todo refactor
int32_t size = *pVLen;
......@@ -1040,6 +1054,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal
void streamStateDestroy(SStreamState* pState) {
// do nothong
......@@ -35,14 +35,15 @@ struct SStreamFileState {
uint64_t checkPointVersion;
TSKEY maxTs;
TSKEY deleteMark;
TSKEY flushMark;
uint64_t maxRowCount;
uint64_t curRowCount;
ExpiredFun expFunc;
GetTsFun getTs;
typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile) {
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) {
if (memSize <= 0) {
......@@ -65,17 +66,20 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, Expired
pFileState->preCheckPointVersion = 0;
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->expFunc = fp;
pFileState->getTs = fp;
pFileState->maxRowCount = memSize / rowSize;
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = -1;
return pFileState;
return NULL;
void destroyRowBuffPos(SRowBuffPos* pPos) {
......@@ -84,32 +88,50 @@ void destroyRowBuffPosPtr(void* ptr) {
if (!ptr) {
void* tmp = *(void**)ptr;
SRowBuffPos* pPos = (SRowBuffPos*)tmp;
SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
void destroyStreamFileState(SStreamFileState* pFileState) {
void destroyRowBuff(void* ptr) {
if (!ptr) {
void streamFileStateDestroy(SStreamFileState* pFileState) {
if (!pFileState) {
tdListFreeP(pFileState->usedBuffs, destroyRowBuffPosPtr);
tdListFreeP(pFileState->freeBuffs, taosMemoryFree);
tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts) {
void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (pFileState->expFunc(pPos->pKey, ts)) {
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) <ts) ) {
tdListPopNode(pFileState->usedBuffs, pNode);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true);
int32_t flushRowBuff(SStreamFileState* pFileState) {
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
if (!pFlushList) {
......@@ -125,6 +147,8 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (!pPos->beUsed) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
......@@ -133,19 +157,20 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
int32_t clearRowBuff(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark);
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
if (isListEmpty(pFileState->freeBuffs)) {
return flushRowBuff(pFileState);
void* getFreeBuff(SList* lists) {
void* getFreeBuff(SList* lists, int32_t buffSize) {
SListNode* pNode = tdListPopHead(lists);
if (!pNode) {
return NULL;
void* ptr = *(void**)pNode->data;
memset(ptr, 0, buffSize);
return ptr;
......@@ -153,7 +178,7 @@ void* getFreeBuff(SList* lists) {
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
tdListAppend(pFileState->usedBuffs, &pPos);
void* pBuff = getFreeBuff(pFileState->freeBuffs);
void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
if (pBuff) {
pPos->pRowBuff = pBuff;
return pPos;
......@@ -170,40 +195,72 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
return pPos;
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
if (pos) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
if (pVal) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
ASSERT(pNewPos);// todo(liuyao) delete
pNewPos->pKey = taosMemoryCalloc(1, keyLen);
memcpy(pNewPos->pKey, pKey, keyLen);
TSKEY ts = pFileState->getTs(pKey);
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
int32_t len = 0;
void *pVal = NULL;
streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len);
memcpy(pNewPos->pRowBuff, pVal, len);
tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES);
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
if (pVal) {
*pVLen = pFileState->rowSize;
*pVal = pNewPos;
void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen) {
int32_t code_buff = tSimpleHashRemove(pFileState->rowBuffMap, pKey, keyLen);
int32_t code_rocks = streamStateDel_rocksdb(pFileState->pFileStore, pKey);
return code_buff == TSDB_CODE_SUCCESS ? code_buff : code_rocks;
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
if (pPos->pRowBuff) {
return pPos->pRowBuff;
(*pVal) = pPos->pRowBuff;
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
void* pVal = NULL;
int32_t len = 0;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pVal, &len);
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, pVal, &len);
memcpy(pPos->pRowBuff, pVal, len);
return pPos->pRowBuff;
(*pVal) = pPos->pRowBuff;
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
if (pos) {
return true;
return false;
void releaseRowBuffPos(SRowBuffPos* pBuff) {
......@@ -211,7 +268,7 @@ void releaseRowBuffPos(SRowBuffPos* pBuff) {
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark);
clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark, false);
return pFileState->usedBuffs;
......@@ -229,7 +286,5 @@ int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize)
int32_t recoverSnapshot(SStreamFileState* pFileState) {
// 设置一个时间戳标记,小于这个时间戳的,如果缓存里没有,需要从rocks db里读取状态,大于这个时间戳的,不需要
// 这个还需要考虑一下,如果rocks db中也没有,说明真的是新的,那么这次读取是冗余的。
\ No newline at end of file
......@@ -2,7 +2,7 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode1 -s start
#==system sh/exec.sh -n dnode1 -s start -v
sleep 50
......@@ -275,37 +275,4 @@ endi
print loop4 over
#==system sh/exec.sh -n dnode1 -s stop -x SIGINT
#==print =============== check
#==system_content sh/checkValgrind.sh -n dnode1
#==print cmd return result ----> [ $system_content ]
#==if $system_content > 0 then
#== return -1
#==if $system_content == $null then
#== return -1
#==system sh/exec.sh -n dnode2 -s stop -x SIGINT
#==print =============== check
#==system_content sh/checkValgrind.sh -n dnode2
#==print cmd return result ----> [ $system_content ]
#==if $system_content > 0 then
#== return -1
#==if $system_content == $null then
#== return -1
#==return 1
system sh/stop_dnodes.sh
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册