提交 0716a4ae 编写于 作者: L liuyao

feat:add check point

上级 5aff5aa9
......@@ -58,15 +58,15 @@ static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
if (pWin1->groupId > pWin2->groupId) {
if (pWin1->ts > pWin2->ts) {
return 1;
} else if (pWin1->groupId < pWin2->groupId) {
} else if (pWin1->ts < pWin2->ts) {
return -1;
}
if (pWin1->ts > pWin2->ts) {
if (pWin1->groupId > pWin2->groupId) {
return 1;
} else if (pWin1->ts < pWin2->ts) {
} else if (pWin1->groupId < pWin2->groupId) {
return -1;
}
......
......@@ -109,7 +109,6 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal);
void streamFreeVal(void* val);
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext(SStreamState* pState, const SWinKey* key);
......
......@@ -47,7 +47,7 @@ int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void**
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState);
#ifdef __cplusplus
......
......@@ -197,7 +197,7 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf
char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsCheckpointInterval = 24 * 60 * 60 * 1000;
int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......
......@@ -426,6 +426,8 @@ typedef struct STimeWindowAggSupp {
int64_t waterMark;
TSKEY maxTs;
TSKEY minTs;
TSKEY checkPointTs;
TSKEY checkPointInterval;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
......
......@@ -869,22 +869,8 @@ static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) {
}
int32_t compareWinKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SWinKey* pDataPos = taosArrayGet(res, index);
SWinKey* pWKey = (SWinKey*)pKey;
if (pWKey->groupId > pDataPos->groupId) {
return 1;
} else if (pWKey->groupId < pDataPos->groupId) {
return -1;
}
if (pWKey->ts > pDataPos->ts) {
return 1;
} else if (pWKey->ts < pDataPos->ts) {
return -1;
}
return 0;
void* pDataPos = taosArrayGet((SArray*)data, index);
return winKeyCmprImpl(pKey, pDataPos);
}
static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
......@@ -1497,30 +1483,6 @@ static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap,
break;
}
}
// for debug
if (qDebugFlag & DEBUG_DEBUG && mark > 0) {
SStreamStateCur* pCur = streamStateGetCur(pState, key);
int32_t code = streamStateCurPrev(pState, pCur);
if (code == TSDB_CODE_SUCCESS) {
SWinKey tmpKey = {0};
code = streamStateGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS) {
STimeWindow tw = getFinalTimeWindow(tmpKey.ts, pInterval);
qDebug("===stream===error stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey,
tw.ekey, tmpKey.groupId, mark);
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
}
} else {
STimeWindow tw = getFinalTimeWindow(key->ts, pInterval);
qDebug("===stream===stream state first key:%" PRId64 "-%" PRId64 ",%" PRIu64 ",mark %" PRId64, tw.skey, tw.ekey,
key->groupId, mark);
}
streamStateFreeCur(pCur);
}
}
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
......@@ -2641,7 +2603,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else {
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
&pInfo->interval, &pInfo->delKey);
streamStateCommit(pTaskInfo->streamInfo.pState);
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
streamStateCommit(pInfo->pState);
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
}
}
return NULL;
} else {
......@@ -2833,6 +2798,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
.deleteMark = getDeleteMark(pIntervalPhyNode),
.deleteMarkSaved = 0,
.calTriggerSaved = 0,
.checkPointTs = 0,
.checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision),
};
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
......@@ -4887,7 +4854,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval,
&pInfo->delKey);
setOperatorCompleted(pOperator);
streamStateCommit(pTaskInfo->streamInfo.pState);
if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
streamStateCommit(pInfo->pState);
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
}
return NULL;
}
......@@ -5000,7 +4970,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
pInfo->interval = (SInterval) {
.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
......@@ -5009,19 +4979,19 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
};
STimeWindowAggSupp twAggSupp = {
pInfo->twAggSup = (STimeWindowAggSupp) {
.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(pIntervalPhyNode),
.checkPointTs = 0,
.checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision),
};
ASSERTS(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
pOperator->pTaskInfo = pTaskInfo;
pInfo->interval = interval;
pInfo->twAggSup = twAggSupp;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->isFinal = false;
......
......@@ -247,6 +247,8 @@ int32_t streamStateBegin(SStreamState* pState) {
int32_t streamStateCommit(SStreamState* pState) {
#ifdef USE_ROCKSDB
SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
flushSnapshot(pState->pFileState, pShot, true);
return 0;
#else
if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
......@@ -410,26 +412,6 @@ int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pV
return 0;
}
SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGetCur_rocksdb(pState, key);
#else
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
tdbTbcOpen(pState->pTdbState->pStateDb, &pCur->pCur, NULL);
int32_t c = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c);
if (c != 0) {
streamStateFreeCur(pCur);
return NULL;
}
pCur->number = pState->number;
return pCur;
#endif
}
SStreamStateCur* streamStateFillGetCur(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillGetCur_rocksdb(pState, key);
......
......@@ -71,6 +71,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, GetTsFu
pFileState->curRowCount = 0;
pFileState->deleteMark = delMark;
pFileState->flushMark = -1;
recoverSnapshot(pFileState);
return pFileState;
_error:
......@@ -152,7 +153,7 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
i++;
}
}
flushSnapshot(pFileState->pFileStore, pFlushList, pFileState->rowSize);
flushSnapshot(pFileState, pFlushList, false);
return TSDB_CODE_SUCCESS;
}
......@@ -272,7 +273,18 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs;
}
int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize) {
void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) {
pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark);
}
void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
*pVal = taosMemoryCalloc(1, *pLen);
void** buff = pVal;
taosEncodeFixedI64(buff, pFileState->flushMark);
}
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
int32_t code = TSDB_CODE_SUCCESS;
SListIter iter = {0};
tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD);
......@@ -280,11 +292,60 @@ int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize)
SListNode* pNode = NULL;
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
code = streamStatePut_rocksdb(pFile, pPos->pKey, pPos->pRowBuff, rowSize);
code = streamStatePut_rocksdb(pFileState->pFileStore, pPos->pKey, pPos->pRowBuff, pFileState->rowSize);
}
if (flushState) {
int32_t len = 0;
void* buff = NULL;
streamFileStateEncode(pFileState, &buff, &len);
SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao
streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len);
}
return code;
}
int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao
void* pStVal = NULL;
int32_t len = 0;
code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len);
if (code == TSDB_CODE_SUCCESS) {
streamFileStateDecode(pFileState, pStVal, len);
} else {
return TSDB_CODE_FAILED;
}
SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
if (!pCur) {
return TSDB_CODE_FAILED;
}
code = streamStateSeekLast(pFileState->pFileStore, pCur);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
while (code == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount == pFileState->maxRowCount) {
break;
}
void* pVal = NULL;
int32_t pVLen = 0;
SRowBuffPos* pNewPos = getNewRowPos(pFileState);
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void **)&pVal, &pVLen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
break;
}
memcpy(pNewPos->pRowBuff, pVal, pVLen);
taosMemoryFree(pVal);
code = tSimpleHashPut(pFileState->rowBuffMap, pNewPos->pKey, pFileState->rowSize, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos);
break;
}
code = streamStateCurPrev_rocksdb(pFileState->pFileStore, pCur);
}
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册