提交 c9aa59cb 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 6726edae
...@@ -367,7 +367,7 @@ typedef struct SStateStore { ...@@ -367,7 +367,7 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark); uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState); void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState); void (*streamFileStateClear)(struct SStreamFileState* pFileState);
......
...@@ -28,11 +28,10 @@ extern "C" { ...@@ -28,11 +28,10 @@ extern "C" {
#endif #endif
typedef struct SStreamFileState SStreamFileState; typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot; typedef SList SStreamSnapshot;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark); GetTsFun fp, void* pFile, TSKEY delMark, const char* id);
void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState);
......
...@@ -2785,7 +2785,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, ...@@ -2785,7 +2785,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo));
pInfo->dataVersion = 0; pInfo->dataVersion = 0;
pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
...@@ -4974,8 +4974,10 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -4974,8 +4974,10 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
int32_t funResSize= getMaxFunResSize(pSup, numOfCols); int32_t funResSize= getMaxFunResSize(pSup, numOfCols);
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo));
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
......
...@@ -43,12 +43,13 @@ struct SStreamFileState { ...@@ -43,12 +43,13 @@ struct SStreamFileState {
uint64_t maxRowCount; uint64_t maxRowCount;
uint64_t curRowCount; uint64_t curRowCount;
GetTsFun getTs; GetTsFun getTs;
char* id;
}; };
typedef SRowBuffPos SRowBuffInfo; typedef SRowBuffPos SRowBuffInfo;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark) { GetTsFun fp, void* pFile, TSKEY delMark, const char* idstr) {
if (memSize <= 0) { if (memSize <= 0) {
memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE;
} }
...@@ -70,6 +71,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ ...@@ -70,6 +71,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) {
goto _error; goto _error;
} }
pFileState->keyLen = keySize; pFileState->keyLen = keySize;
pFileState->rowSize = rowSize; pFileState->rowSize = rowSize;
pFileState->selectivityRowSize = selectRowSize; pFileState->selectivityRowSize = selectRowSize;
...@@ -81,6 +83,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ ...@@ -81,6 +83,8 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->deleteMark = delMark; pFileState->deleteMark = delMark;
pFileState->flushMark = INT64_MIN; pFileState->flushMark = INT64_MIN;
pFileState->maxTs = INT64_MIN; pFileState->maxTs = INT64_MIN;
pFileState->id = taosStrdup(idstr);
recoverSnapshot(pFileState); recoverSnapshot(pFileState);
return pFileState; return pFileState;
...@@ -124,6 +128,8 @@ void streamFileStateDestroy(SStreamFileState* pFileState) { ...@@ -124,6 +128,8 @@ void streamFileStateDestroy(SStreamFileState* pFileState) {
if (!pFileState) { if (!pFileState) {
return; return;
} }
taosMemoryFree(pFileState->id);
tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr); tdListFreeP(pFileState->usedBuffs, destroyRowBuffAllPosPtr);
tdListFreeP(pFileState->freeBuffs, destroyRowBuff); tdListFreeP(pFileState->freeBuffs, destroyRowBuff);
tSimpleHashCleanup(pFileState->rowBuffMap); tSimpleHashCleanup(pFileState->rowBuffMap);
...@@ -177,7 +183,8 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin ...@@ -177,7 +183,8 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
i++; i++;
} }
} }
qInfo("do stream state flush %d rows to disck. is used: %d", listNEles(pFlushList), used);
qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
} }
int32_t flushRowBuff(SStreamFileState* pFileState) { int32_t flushRowBuff(SStreamFileState* pFileState) {
...@@ -185,13 +192,17 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { ...@@ -185,13 +192,17 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
if (!pFlushList) { if (!pFlushList) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
num = TMAX(num, FLUSH_NUM); num = TMAX(num, FLUSH_NUM);
popUsedBuffs(pFileState, pFlushList, num, false); popUsedBuffs(pFileState, pFlushList, num, false);
if (isListEmpty(pFlushList)) { if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, true); popUsedBuffs(pFileState, pFlushList, num, true);
} }
flushSnapshot(pFileState, pFlushList, false); flushSnapshot(pFileState, pFlushList, false);
SListIter fIter = {0}; SListIter fIter = {0};
tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD); tdListInitIter(pFlushList, &fIter, TD_LIST_FORWARD);
SListNode* pNode = NULL; SListNode* pNode = NULL;
...@@ -201,6 +212,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { ...@@ -201,6 +212,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff); tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff);
pPos->pRowBuff = NULL; pPos->pRowBuff = NULL;
} }
tdListFreeP(pFlushList, destroyRowBuffPosPtr); tdListFreeP(pFlushList, destroyRowBuffPosPtr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -269,13 +281,13 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi ...@@ -269,13 +281,13 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
TSKEY ts = pFileState->getTs(pKey); TSKEY ts = pFileState->getTs(pKey);
if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) { if (ts > pFileState->maxTs - pFileState->deleteMark && ts < pFileState->flushMark) {
int32_t len = 0; int32_t len = 0;
void* pVal = NULL; void* p = NULL;
int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &pVal, &len); int32_t code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
qDebug("===stream===get %" PRId64 " from disc, res %d", ts, code); qDebug("===stream===get %" PRId64 " from disc, res %d", ts, code);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, pVal, len); memcpy(pNewPos->pRowBuff, p, len);
} }
taosMemoryFree(pVal); taosMemoryFree(p);
} }
tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES); tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES);
...@@ -348,12 +360,16 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -348,12 +360,16 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD); tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
const int32_t BATCH_LIMIT = 256; const int32_t BATCH_LIMIT = 256;
SListNode* pNode = NULL;
int64_t st = taosGetTimestampMs();
int32_t numOfElems = listNEles(pSnapshot);
SListNode* pNode = NULL;
void* batch = streamStateCreateBatch(); void* batch = streamStateCreateBatch();
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
ASSERT(pPos->pRowBuff && pFileState->rowSize > 0); ASSERT(pPos->pRowBuff && pFileState->rowSize > 0);
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch); streamStateClearBatch(batch);
...@@ -361,7 +377,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -361,7 +377,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0); code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, code:%d, size:%d", sKey.key.ts, code, pFileState->rowSize); // todo handle failure
// qDebug("===stream===put %" PRId64 " to disc, code:%d, size:%d", sKey.key.ts, code, pFileState->rowSize);
} }
if (streamStateGetBatchSize(batch) > 0) { if (streamStateGetBatchSize(batch) > 0) {
...@@ -370,6 +387,10 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -370,6 +387,10 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStateClearBatch(batch); streamStateClearBatch(batch);
int64_t elapsed = taosGetTimestampMs() - st;
qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%"PRId64"ms", pFileState->id, numOfElems,
BATCH_LIMIT, elapsed);
if (flushState) { if (flushState) {
const char* taskKey = "streamFileState"; const char* taskKey = "streamFileState";
{ {
...@@ -391,8 +412,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -391,8 +412,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
streamStateDestroyBatch(batch);
streamStateDestroyBatch(batch);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册