提交 1f63859e 编写于 作者: L liuyao

feat:add buff swap

上级 ad164871
......@@ -37,13 +37,14 @@ typedef SList SStreamSnapshot;
typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
void releaseRowBuffPos(SRowBuffPos* pBuff);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
......@@ -828,7 +828,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
tsDisableStream = cfgGetItem(pCfg, "disableStream")->i64;
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64;
return 0;
......@@ -2881,7 +2881,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState, pInfo->twAggSup.deleteMark);
pOperator->operatorType = pPhyNode->type;
......@@ -5042,7 +5042,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs,
pInfo->pState, pInfo->twAggSup.deleteMark);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
......@@ -319,7 +319,9 @@ bool streamStateCheck(SStreamState* pState, const SWinKey* key) {
int32_t streamStateGetByPos(SStreamState* pState, void* pos, void** pVal) {
return getRowBuffByPos(pState->pFileState, pos, pVal);
int32_t code = getRowBuffByPos(pState->pFileState, pos, pVal);
return code;
// todo refactor
......@@ -22,6 +22,7 @@
#define FLUSH_RATIO 0.2
#define FLUSH_NUM 4
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024);
struct SStreamFileState {
......@@ -43,7 +44,7 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) {
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark) {
if (memSize <= 0) {
......@@ -62,12 +63,13 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFu
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
goto _error;
pFileState->keyLen = keySize;
pFileState->rowSize = rowSize;
pFileState->preCheckPointVersion = 0;
pFileState->checkPointVersion = 1;
pFileState->pFileStore = pFile;
pFileState->getTs = fp;
pFileState->maxRowCount = memSize / rowSize;
pFileState->maxRowCount = TMAX( (uint64_t)memSize / rowSize, FLUSH_NUM * 2);
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = -1;
......@@ -90,7 +92,9 @@ void destroyRowBuffPosPtr(void* ptr) {
SRowBuffPos* pPos = *(SRowBuffPos**)ptr;
if (!pPos->beUsed) {
void destroyRowBuff(void* ptr) {
......@@ -117,13 +121,14 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)(pNode->data);
if (all || (pFileState->getTs(pPos->pKey) <ts) ) {
tdListPopNode(pFileState->usedBuffs, pNode);
if (all || (pFileState->getTs(pPos->pKey) < ts) ) {
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &(pPos->pRowBuff));
pPos->pRowBuff = NULL;
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
tdListPopNode(pFileState->usedBuffs, pNode);
......@@ -133,27 +138,47 @@ void streamFileStateClear(SStreamFileState* pFileState) {
clearExpiredRowBuff(pFileState, 0, true);
int32_t flushRowBuff(SStreamFileState* pFileState) {
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
if (!pFlushList) {
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
uint64_t i = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && i < num) {
while ((pNode = tdListNext(&iter)) != NULL && i < max) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (!pPos->beUsed) {
if (pPos->beUsed == used) {
tdListAppend(pFlushList, &pPos);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
tSimpleHashRemove(pFileState->rowBuffMap, pPos->pKey, pFileState->keyLen);
tdListPopNode(pFileState->usedBuffs, pNode);
int32_t flushRowBuff(SStreamFileState* pFileState) {
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
if (!pFlushList) {
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
num = TMAX(num, FLUSH_NUM);
popUsedBuffs(pFileState, pFlushList, num, false);
if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, true);
flushSnapshot(pFileState, pFlushList, false);
SListIter fIter = {0};
tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
SListNode* pNode = NULL;
while ((pNode = tdListNext(&fIter)) != NULL) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff != NULL);
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
pPos->pRowBuff = NULL;
tdListFreeP(pFlushList, destroyRowBuffPosPtr);
......@@ -178,11 +203,11 @@ void* getFreeBuff(SList* lists, int32_t buffSize) {
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
tdListAppend(pFileState->usedBuffs, &pPos);
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
void* pBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
if (pBuff) {
pPos->pRowBuff = pBuff;
return pPos;
goto _end;
if (pFileState->curRowCount < pFileState->maxRowCount) {
......@@ -190,13 +215,17 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
if (pBuff) {
pPos->pRowBuff = pBuff;
return pPos;
goto _end;
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
tdListAppend(pFileState->usedBuffs, &pPos);
ASSERT(pPos->pRowBuff != NULL);
return pPos;
......@@ -204,23 +233,24 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
pFileState->maxTs = TMAX(pFileState->maxTs, pFileState->getTs(pKey));
SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen);
if (pos) {
if (pVal) {
*pVLen = pFileState->rowSize;
*pVal = *pos;
*pVLen = pFileState->rowSize;
*pVal = *pos;
(*pos)->beUsed = true;
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
ASSERT(pNewPos);// todo(liuyao) delete
pNewPos->pKey = taosMemoryCalloc(1, keyLen);
pNewPos->beUsed = true;
memcpy(pNewPos->pKey, pKey, keyLen);
TSKEY ts = pFileState->getTs(pKey);
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
int32_t len = 0;
void *pVal = NULL;
streamStateGet_rocksdb(pFileState->pFileStore, pKey, pVal, &len);
memcpy(pNewPos->pRowBuff, pVal, len);
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, pVal, len);
......@@ -244,15 +274,21 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
if (!pPos->pRowBuff) {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs, pFileState->rowSize);
int32_t len = 0;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, pVal, &len);
memcpy(pPos->pRowBuff, pVal, len);
void *pBuff = NULL;
streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pBuff, &len);
memcpy(pPos->pRowBuff, pBuff, len);
(*pVal) = pPos->pRowBuff;
tdListPrepend(pFileState->usedBuffs, &pPos);
......@@ -292,6 +328,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize);
if (flushState) {
......@@ -845,6 +845,8 @@ sql create stream streams7 trigger at_once IGNORE EXPIRED 0 into streamt7 as sel
sql insert into ts1 values(1648791211000,1,2,3);
sql_error insert into ts1 values(-1648791211000,1,2,3);
$loop_count = 0
sleep 200
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c debugflag -v 131
system sh/cfg.sh -n dnode1 -c streamBufferSize -v 10
system sh/exec.sh -n dnode1 -s start
sleep 10000
sql connect
sql create database test vgroups 1;
sql use test;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once into streamt as select _wstart, count(*) c1 from t1 interval(1s);
sql insert into t1 values(1648791211000,1,2,3,1.0);
sql insert into t1 values(1648791212001,2,2,3,1.1);
sql insert into t1 values(1648791213002,3,2,3,2.1);
sql insert into t1 values(1648791214003,4,2,3,3.1);
sql insert into t1 values(1648791215003,4,2,3,3.1);
sql insert into t1 values(1648791216004,4,2,3,4.1);
sql insert into t1 values(1648791217004,4,2,3,4.1);
sql insert into t1 values(1648791218004,4,2,3,4.1);
sql insert into t1 values(1648791221004,4,2,3,4.1);
sql insert into t1 values(1648791222004,4,2,3,4.1);
sql insert into t1 values(1648791223004,4,2,3,4.1);
sql insert into t1 values(1648791224004,4,2,3,4.1);
sql insert into t1 values(1648791225005,4,2,3,4.1);
sql insert into t1 values(1648791226005,4,2,3,4.1);
sql insert into t1 values(1648791227005,4,2,3,4.1);
sql insert into t1 values(1648791228005,4,2,3,4.1);
$loop_count = 0
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
print 1 select * from streamt
sql select * from streamt;
if $rows != 16 then
print =====rows=$rows
goto loop0
sql insert into t1 values(1648791231004,4,2,3,4.1) (1648791232004,4,2,3,4.1) (1648791233004,4,2,3,4.1) (1648791234004,4,2,3,4.1) (1648791235004,4,2,3,4.1) (1648791236004,4,2,3,4.1) (1648791237004,4,2,3,4.1) (1648791238004,4,2,3,4.1) (1648791239004,4,2,3,4.1) (1648791240004,4,2,3,4.1) (1648791241004,4,2,3,4.1) (1648791242004,4,2,3,4.1) (1648791243004,4,2,3,4.1);
$loop_count = 0
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
print 2 select * from streamt
sql select * from streamt;
if $rows != 29 then
print =====rows=$rows
goto loop0
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册