提交 5850a3ab 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/rocksRevert' into enh/rocksRevert

...@@ -51,15 +51,15 @@ static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { ...@@ -51,15 +51,15 @@ static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2; SWinKey* pWin2 = (SWinKey*)pKey2;
if (pWin1->ts > pWin2->ts) { if (pWin1->groupId > pWin2->groupId) {
return 1; return 1;
} else if (pWin1->ts < pWin2->ts) { } else if (pWin1->groupId < pWin2->groupId) {
return -1; return -1;
} }
if (pWin1->groupId > pWin2->groupId) { if (pWin1->ts > pWin2->ts) {
return 1; return 1;
} else if (pWin1->groupId < pWin2->groupId) { } else if (pWin1->ts < pWin2->ts) {
return -1; return -1;
} }
......
...@@ -38,6 +38,8 @@ typedef struct STdbState { ...@@ -38,6 +38,8 @@ typedef struct STdbState {
rocksdb_comparator_t** pCompare; rocksdb_comparator_t** pCompare;
rocksdb_options_t* dbOpt; rocksdb_options_t* dbOpt;
struct SStreamTask* pOwner; struct SStreamTask* pOwner;
void* param;
void* env;
TDB* db; TDB* db;
TTB* pStateDb; TTB* pStateDb;
...@@ -63,6 +65,7 @@ void streamStateClose(SStreamState* pState); ...@@ -63,6 +65,7 @@ void streamStateClose(SStreamState* pState);
int32_t streamStateBegin(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState);
void streamStateDestroy(SStreamState* pState); void streamStateDestroy(SStreamState* pState);
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark);
typedef struct { typedef struct {
rocksdb_iterator_t* iter; rocksdb_iterator_t* iter;
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "tarray.h"
#include "tdef.h" #include "tdef.h"
#include "tlist.h" #include "tlist.h"
...@@ -27,29 +28,33 @@ extern "C" { ...@@ -27,29 +28,33 @@ extern "C" {
typedef struct SStreamFileState SStreamFileState; typedef struct SStreamFileState SStreamFileState;
typedef struct SRowBuffPos { typedef struct SRowBuffPos {
void* pRowBuff; void* pRowBuff;
void* pKey; void* pKey;
bool beFlushed; bool beFlushed;
bool beUsed; bool beUsed;
} SRowBuffPos; } SRowBuffPos;
typedef SList SStreamSnapshot; typedef SList SStreamSnapshot;
typedef TSKEY (*GetTsFun)(void*); typedef TSKEY (*GetTsFun)(void*);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark); SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile,
TSKEY delMark);
void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState);
int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen);
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal);
void releaseRowBuffPos(SRowBuffPos* pBuff); void releaseRowBuffPos(SRowBuffPos* pBuff);
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen);
SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState);
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState);
int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -40,9 +40,7 @@ typedef struct SUpdateInfo { ...@@ -40,9 +40,7 @@ typedef struct SUpdateInfo {
TSKEY minTS; TSKEY minTS;
SScalableBf *pCloseWinSBF; SScalableBf *pCloseWinSBF;
SHashObj *pMap; SHashObj *pMap;
STimeWindow scanWindow; uint64_t maxDataVersion;
uint64_t scanGroupId;
uint64_t maxVersion;
} SUpdateInfo; } SUpdateInfo;
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark);
...@@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version);
void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoDestroy(SUpdateInfo *pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
......
...@@ -477,9 +477,10 @@ typedef struct SStreamScanInfo { ...@@ -477,9 +477,10 @@ typedef struct SStreamScanInfo {
int32_t blockRecoverTotCnt; int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes; SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes; SSDataBlock* pCreateTbRes;
int8_t igCheckUpdate; int8_t igCheckUpdate;
int8_t igExpired; int8_t igExpired;
SStreamState* pState;
} SStreamScanInfo; } SStreamScanInfo;
typedef struct { typedef struct {
...@@ -553,6 +554,7 @@ typedef struct SStreamIntervalOperatorInfo { ...@@ -553,6 +554,7 @@ typedef struct SStreamIntervalOperatorInfo {
SSDataBlock* pPullDataRes; SSDataBlock* pPullDataRes;
bool isFinal; bool isFinal;
SArray* pChildren; SArray* pChildren;
int32_t numOfChild;
SStreamState* pState; SStreamState* pState;
SWinKey delKey; SWinKey delKey;
uint64_t numOfDatapack; uint64_t numOfDatapack;
......
...@@ -1034,8 +1034,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou ...@@ -1034,8 +1034,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
pInfo->groupId = groupCol[rowIndex]; pInfo->groupId = groupCol[rowIndex];
} }
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) {
pTableScanInfo->base.cond.twindows = *pWin; pTableScanInfo->base.cond.twindows = *pWin;
pTableScanInfo->base.cond.endVersion = version;
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1; pTableScanInfo->currentGroupId = -1;
tsdbReaderClose(pTableScanInfo->base.dataReader); tsdbReaderClose(pTableScanInfo->base.dataReader);
...@@ -1154,7 +1155,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ ...@@ -1154,7 +1155,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
break; break;
} }
resetTableScanInfo(pInfo->pTableScanOp->info, &win); resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion);
pInfo->pTableScanOp->status = OP_OPENED; pInfo->pTableScanOp->status = OP_OPENED;
return true; return true;
} }
...@@ -1195,14 +1196,19 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t ...@@ -1195,14 +1196,19 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
qInfo("do stream range scan. windows index:%d", *pRowIndex); qInfo("do stream range scan. windows index:%d", *pRowIndex);
bool prepareRes = true;
while (1) { while (1) {
SSDataBlock* pResult = NULL; SSDataBlock* pResult = NULL;
pResult = doTableScan(pInfo->pTableScanOp); pResult = doTableScan(pInfo->pTableScanOp);
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) { if (!pResult) {
prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex);
// scan next window data // scan next window data
pResult = doTableScan(pInfo->pTableScanOp); pResult = doTableScan(pInfo->pTableScanOp);
} }
if (!pResult) { if (!pResult) {
if (prepareRes) {
continue;
}
blockDataCleanup(pSDB); blockDataCleanup(pSDB);
*pRowIndex = 0; *pRowIndex = 0;
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
...@@ -1453,39 +1459,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, ...@@ -1453,39 +1459,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
return code; return code;
} }
#if 0
void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTagCalSup = &pInfo->tagCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
if (pBlock == NULL || pBlock->info.rows == 0) return;
void* tag = NULL;
int32_t tagLen = 0;
if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) {
pBlock->info.tagLen = tagLen;
void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen);
if (pTag == NULL) {
tdbFree(tag);
taosMemoryFree(pBlock->info.pTag);
pBlock->info.pTag = NULL;
pBlock->info.tagLen = 0;
return;
}
pBlock->info.pTag = pTag;
memcpy(pBlock->info.pTag, tag, tagLen);
tdbFree(tag);
return;
} else {
pBlock->info.pTag = NULL;
}
tdbFree(tag);
}
#endif
static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
blockDataCleanup(pInfo->pCreateTbRes); blockDataCleanup(pInfo->pCreateTbRes);
if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) {
pBlock->info.parTbName[0] = 0; pBlock->info.parTbName[0] = 0;
...@@ -1541,7 +1516,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock ...@@ -1541,7 +1516,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
isDeletedStreamWindow(&win, pBlock->info.id.groupId, isDeletedStreamWindow(&win, pBlock->info.id.groupId,
pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); pInfo->pState, &pInfo->twAggSup);
if ((update || closedWin) && out) { if ((update || closedWin) && out) {
qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin);
uint64_t gpId = 0; uint64_t gpId = 0;
...@@ -1770,6 +1745,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { ...@@ -1770,6 +1745,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
if (pInfo->pUpdateInfo) { if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
checkUpdateData(pInfo, true, pBlock, true); checkUpdateData(pInfo, true, pBlock, true);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
if (pInfo->pUpdateDataRes->info.rows > 0) { if (pInfo->pUpdateDataRes->info.rows > 0) {
...@@ -1831,7 +1807,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1831,7 +1807,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
} }
/*resetTableScanInfo(pTSInfo, pWin);*/
tsdbReaderClose(pTSInfo->base.dataReader); tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL; pTSInfo->base.dataReader = NULL;
...@@ -1876,8 +1851,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { ...@@ -1876,8 +1851,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) { if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "scan recover update"); printDataBlock(pSDB, "scan recover update");
...@@ -1946,6 +1919,9 @@ FETCH_NEXT_BLOCK: ...@@ -1946,6 +1919,9 @@ FETCH_NEXT_BLOCK:
pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.skey = INT64_MIN;
pBlock->info.calWin.ekey = INT64_MAX; pBlock->info.calWin.ekey = INT64_MAX;
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
if (pInfo->pUpdateInfo) {
pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version);
}
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
switch (pBlock->info.type) { switch (pBlock->info.type) {
case STREAM_NORMAL: case STREAM_NORMAL:
...@@ -2043,11 +2019,9 @@ FETCH_NEXT_BLOCK: ...@@ -2043,11 +2019,9 @@ FETCH_NEXT_BLOCK:
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) { if (pSDB) {
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update"); printDataBlock(pSDB, "stream scan update");
calBlockTbName(pInfo, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }
...@@ -2109,13 +2083,6 @@ FETCH_NEXT_BLOCK: ...@@ -2109,13 +2083,6 @@ FETCH_NEXT_BLOCK:
setBlockIntoRes(pInfo, &block, false); setBlockIntoRes(pInfo, &block, false);
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId,
pInfo->pRes->info.version)) {
printDataBlock(pInfo->pRes, "stream scan ignore");
blockDataCleanup(pInfo->pRes);
continue;
}
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
...@@ -2515,6 +2482,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -2515,6 +2482,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate;
pInfo->igExpired = pTableScanNode->igExpired; pInfo->igExpired = pTableScanNode->igExpired;
pInfo->twAggSup.maxTs = INT64_MIN; pInfo->twAggSup.maxTs = INT64_MIN;
pInfo->pState = NULL;
// todo(liuyao) get buff from rocks db; // todo(liuyao) get buff from rocks db;
void* buff = NULL; void* buff = NULL;
......
...@@ -89,4 +89,5 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); ...@@ -89,4 +89,5 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key);
int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif #endif
\ No newline at end of file
...@@ -1066,12 +1066,21 @@ void streamStateDestroy(SStreamState* pState) { ...@@ -1066,12 +1066,21 @@ void streamStateDestroy(SStreamState* pState) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
streamFileStateDestroy(pState->pFileState); streamFileStateDestroy(pState->pFileState);
streamStateDestroy_rocksdb(pState); streamStateDestroy_rocksdb(pState);
taosMemoryFreeClear(pState->parNameMap);
// do nothong // do nothong
#endif #endif
taosMemoryFreeClear(pState->pTdbState); taosMemoryFreeClear(pState->pTdbState);
taosMemoryFreeClear(pState); taosMemoryFreeClear(pState);
} }
int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#ifdef USE_ROCKSDB
return deleteExpiredCheckPoint(pState->pFileState, mark);
#else
return 0;
#endif
}
#if 0 #if 0
char* streamStateSessionDump(SStreamState* pState) { char* streamStateSessionDump(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
......
...@@ -13,13 +13,22 @@ ...@@ -13,13 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "tcommon.h" #include "tcommon.h"
#include "tlog.h"
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
// int ret = memcmp(aBuf, bBuf, aLen);
return memcmp(aBuf, bBuf, aLen); if (ret == 0) {
if (aLen < bLen)
return -1;
else if (aLen > bLen)
return 1;
else
return 0;
} else {
return ret;
}
} }
int defaultKeyEncode(void* k, char* buf) { int defaultKeyEncode(void* k, char* buf) {
int len = strlen((char*)k); int len = strlen((char*)k);
...@@ -280,6 +289,10 @@ int parKeyToString(void* k, char* buf) { ...@@ -280,6 +289,10 @@ int parKeyToString(void* k, char* buf) {
return n; return n;
} }
typedef struct {
void* tableOpt;
void* lru; // global or not
} rocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
typedef int (*EncodeFunc)(void* key, char* buf); typedef int (*EncodeFunc)(void* key, char* buf);
...@@ -297,6 +310,8 @@ const char* compareFuncKeyName(void* name); ...@@ -297,6 +310,8 @@ const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name); const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name); const char* comparePartagKeyName(void* name);
void destroyFunc(void* stata) { return; }
typedef struct { typedef struct {
const char* key; const char* key;
int32_t len; int32_t len;
...@@ -312,14 +327,19 @@ typedef struct { ...@@ -312,14 +327,19 @@ typedef struct {
SCfInit ginitDict[] = { SCfInit ginitDict[] = {
{"default", strlen("default"), 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, {"default", strlen("default"), 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString,
compareDefaultName}, compareDefaultName, destroyFunc},
{"state", strlen("state"), 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName}, {"state", strlen("state"), 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName,
{"fill", strlen("fill"), 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName}, destroyFunc},
{"fill", strlen("fill"), 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName,
destroyFunc},
{"sess", strlen("sess"), 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, {"sess", strlen("sess"), 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode,
stateSessionKeyToString, compareSessionKeyName}, stateSessionKeyToString, compareSessionKeyName, destroyFunc},
{"func", strlen("func"), 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName}, {"func", strlen("func"), 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName,
{"parname", strlen("parname"), 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName}, destroyFunc},
{"partag", strlen("partag"), 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName}, {"parname", strlen("parname"), 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName,
destroyFunc},
{"partag", strlen("partag"), 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName,
destroyFunc},
}; };
const char* compareDefaultName(void* name) { return ginitDict[0].key; } const char* compareDefaultName(void* name) { return ginitDict[0].key; }
...@@ -330,8 +350,6 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } ...@@ -330,8 +350,6 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; }
const char* compareParKeyName(void* name) { return ginitDict[5].key; } const char* compareParKeyName(void* name) { return ginitDict[5].key; }
const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } const char* comparePartagKeyName(void* name) { return ginitDict[6].key; }
void destroyFunc(void* stata) { return; }
int streamInitBackend(SStreamState* pState, char* path) { int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
rocksdb_env_set_low_priority_background_threads(env, 15); rocksdb_env_set_low_priority_background_threads(env, 15);
...@@ -349,6 +367,7 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -349,6 +367,7 @@ int streamInitBackend(SStreamState* pState, char* path) {
char* err = NULL; char* err = NULL;
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam));
const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
cfOpt[i] = rocksdb_options_create_copy(opts); cfOpt[i] = rocksdb_options_create_copy(opts);
...@@ -362,6 +381,8 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -362,6 +381,8 @@ int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt);
param[i].tableOpt = tableOpt;
param[i].lru = cache;
// rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8);
// rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans);
}; };
...@@ -386,6 +407,8 @@ int streamInitBackend(SStreamState* pState, char* path) { ...@@ -386,6 +407,8 @@ int streamInitBackend(SStreamState* pState, char* path) {
pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt;
pState->pTdbState->pCompare = pCompare; pState->pTdbState->pCompare = pCompare;
pState->pTdbState->dbOpt = opts; pState->pTdbState->dbOpt = opts;
pState->pTdbState->param = param;
pState->pTdbState->env = env;
return 0; return 0;
} }
void streamCleanBackend(SStreamState* pState) { void streamCleanBackend(SStreamState* pState) {
...@@ -393,12 +416,17 @@ void streamCleanBackend(SStreamState* pState) { ...@@ -393,12 +416,17 @@ void streamCleanBackend(SStreamState* pState) {
qInfo("rocksdb already free"); qInfo("rocksdb already free");
return; return;
} }
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
rocksdbCfParam* param = pState->pTdbState->param;
for (int i = 0; i < cfLen; i++) { for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);
rocksdb_cache_destroy(param[i].lru);
rocksdb_block_based_options_destroy(param[i].tableOpt);
} }
taosMemoryFree(pState->pTdbState->param);
rocksdb_options_destroy(pState->pTdbState->dbOpt); rocksdb_options_destroy(pState->pTdbState->dbOpt);
taosMemoryFreeClear(pState->pTdbState->pHandle); taosMemoryFreeClear(pState->pTdbState->pHandle);
...@@ -412,6 +440,7 @@ void streamCleanBackend(SStreamState* pState) { ...@@ -412,6 +440,7 @@ void streamCleanBackend(SStreamState* pState) {
pState->pTdbState->readOpts = NULL; pState->pTdbState->readOpts = NULL;
rocksdb_close(pState->pTdbState->rocksdb); rocksdb_close(pState->pTdbState->rocksdb);
rocksdb_env_destroy(pState->pTdbState->env);
pState->pTdbState->rocksdb = NULL; pState->pTdbState->rocksdb = NULL;
} }
...@@ -438,9 +467,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa ...@@ -438,9 +467,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_t** readOpt) { rocksdb_readoptions_t** readOpt) {
int idx = streamGetInit(cfName); int idx = streamGetInit(cfName);
//*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
if (snapshot != NULL) { if (snapshot != NULL) {
*snapshot = NULL; *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
} }
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
...@@ -618,6 +646,39 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { ...@@ -618,6 +646,39 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) {
return code; return code;
} }
int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) {
int code = 0;
char* err = NULL;
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
rocksdb_iter_seek(pIter, start, strlen(start));
while (rocksdb_iter_valid(pIter)) {
const char* key = rocksdb_iter_key(pIter, NULL);
if (end != NULL && strcmp(key, end) > 0) {
break;
}
if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) {
int64_t checkPoint = 0;
if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) {
taosArrayPush(result, &checkPoint);
}
} else {
break;
}
rocksdb_iter_next(pIter);
}
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter);
return code;
}
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0; int code = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
......
...@@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma ...@@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo->pCloseWinSBF = NULL; pInfo->pCloseWinSBF = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
pInfo->maxVersion = 0; pInfo->maxDataVersion = 0;
pInfo->scanGroupId = 0;
pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
return pInfo; return pInfo;
} }
...@@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { ...@@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
return true; return true;
} }
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
pWin->skey, pWin->ekey, version);
pInfo->scanWindow = *pWin;
pInfo->scanGroupId = groupId;
pInfo->maxVersion = version;
}
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) {
if (!pInfo) {
return false;
}
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
pWin->skey, pWin->ekey, version);
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey &&
version <= pInfo->maxVersion) {
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId,
pWin->skey, pWin->ekey, version);
return true;
}
return false;
}
void updateInfoDestroy(SUpdateInfo *pInfo) { void updateInfoDestroy(SUpdateInfo *pInfo) {
if (pInfo == NULL) { if (pInfo == NULL) {
return; return;
...@@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) ...@@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
} }
if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { ...@@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
} }
ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1;
if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -313,15 +313,13 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { ...@@ -313,15 +313,13 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs; return pFileState->usedBuffs;
} }
void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); }
pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark);
}
void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY); *pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen); (*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal; void* buff = *pVal;
taosEncodeFixedI64(&buff, pFileState->flushMark); taosEncodeFixedI64(&buff, *key);
} }
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
...@@ -347,31 +345,88 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, ...@@ -347,31 +345,88 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
if (streamStateGetBatchSize(batch) > 0) { if (streamStateGetBatchSize(batch) > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
streamStateClearBatch(batch);
if (flushState) { if (flushState) {
int32_t len = 0; const char* taskKey = "streamFileState";
void* buff = NULL; {
streamFileStateEncode(pFileState, &buff, &len); char keyBuf[128] = {0};
SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao void* valBuf = NULL;
streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); int32_t len = 0;
taosMemoryFree(buff); sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
taosMemoryFree(valBuf);
}
{
char keyBuf[128] = {0};
char valBuf[64] = {0};
int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
}
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }
streamStateDestroyBatch(batch); streamStateDestroyBatch(batch);
return code;
}
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
const char* taskKey = "streamFileState";
char keyBuf[128] = {0};
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId);
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
}
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
const char* taskKey = "streamFileState";
return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
}
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS;
const char* taskKey = "streamFileState";
int64_t maxCheckPointId = 0;
{
char buf[128] = {0};
void* val = NULL;
int32_t len = 0;
memcpy(buf, taskKey, strlen(taskKey));
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0) {
return TSDB_CODE_FAILED;
}
sscanf(val, "%" PRId64 "", &maxCheckPointId);
taosMemoryFree(val);
}
for (int64_t i = maxCheckPointId; i > 0; i--) {
char buf[128] = {0};
void* val = 0;
int32_t len = 0;
sprintf(buf, "%s:%" PRId64 "", taskKey, i);
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0) {
return TSDB_CODE_FAILED;
}
TSKEY ts;
sscanf(val, "%" PRId64 "", &ts);
taosMemoryFree(val);
if (ts < mark) {
forceRemoveCheckpoint(pFileState, i);
break;
} else {
}
}
return code; return code;
} }
int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t recoverSnapshot(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark);
void* pStVal = NULL; void* pStVal = NULL;
int32_t len = 0; int32_t len = 0;
code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len);
if (code == TSDB_CODE_SUCCESS) {
streamFileStateDecode(pFileState, pStVal, len);
} else {
return TSDB_CODE_FAILED;
}
SWinKey key = {.groupId = 0, .ts = 0}; SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);
......
...@@ -223,40 +223,47 @@ ...@@ -223,40 +223,47 @@
,,n,script,./test.sh -f tsim/stream/basic0.sim -g ,,n,script,./test.sh -f tsim/stream/basic0.sim -g
,,y,script,./test.sh -f tsim/stream/basic1.sim ,,y,script,./test.sh -f tsim/stream/basic1.sim
,,y,script,./test.sh -f tsim/stream/basic2.sim ,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
,,y,script,./test.sh -f tsim/stream/deleteState.sim
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/session0.sim
,,y,script,./test.sh -f tsim/stream/session1.sim
,,y,script,./test.sh -f tsim/stream/state0.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/partitionby.sim
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
,,y,script,./test.sh -f tsim/stream/windowClose.sim
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
,,y,script,./test.sh -f tsim/stream/sliding.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
,,y,script,./test.sh -f tsim/stream/deleteState.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalRange.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
,,y,script,./test.sh -f tsim/stream/ignoreCheckUpdate.sim
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
,,y,script,./test.sh -f tsim/stream/partitionby.sim
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
,,y,script,./test.sh -f tsim/stream/session0.sim
,,y,script,./test.sh -f tsim/stream/session1.sim
,,y,script,./test.sh -f tsim/stream/sliding.sim
,,y,script,./test.sh -f tsim/stream/state0.sim
,,y,script,./test.sh -f tsim/stream/state1.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag2.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim ,,y,script,./test.sh -f tsim/stream/windowClose.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim
...@@ -1336,38 +1343,47 @@ ...@@ -1336,38 +1343,47 @@
,,n,script,./test.sh -f tsim/stream/basic0.sim -g ,,n,script,./test.sh -f tsim/stream/basic0.sim -g
,,y,script,./test.sh -f tsim/stream/basic1.sim ,,y,script,./test.sh -f tsim/stream/basic1.sim
,,y,script,./test.sh -f tsim/stream/basic2.sim ,,y,script,./test.sh -f tsim/stream/basic2.sim
,,y,script,./test.sh -f tsim/stream/basic3.sim
,,y,script,./test.sh -f tsim/stream/basic4.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
,,y,script,./test.sh -f tsim/stream/deleteState.sim
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim
,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim
,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim
,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
,,y,script,./test.sh -f tsim/stream/distributeSession0.sim
,,y,script,./test.sh -f tsim/stream/session0.sim
,,y,script,./test.sh -f tsim/stream/session1.sim
,,y,script,./test.sh -f tsim/stream/state0.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/partitionby.sim
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
,,y,script,./test.sh -f tsim/stream/windowClose.sim
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
,,y,script,./test.sh -f tsim/stream/sliding.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
,,y,script,./test.sh -f tsim/stream/deleteInterval.sim
,,y,script,./test.sh -f tsim/stream/deleteSession.sim
,,y,script,./test.sh -f tsim/stream/deleteState.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalRange.sim
,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim
,,y,script,./test.sh -f tsim/stream/ignoreCheckUpdate.sim
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
,,y,script,./test.sh -f tsim/stream/partitionby.sim
,,y,script,./test.sh -f tsim/stream/schedSnode.sim
,,y,script,./test.sh -f tsim/stream/session0.sim
,,y,script,./test.sh -f tsim/stream/session1.sim
,,y,script,./test.sh -f tsim/stream/sliding.sim
,,y,script,./test.sh -f tsim/stream/state0.sim
,,y,script,./test.sh -f tsim/stream/state1.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim
,,y,script,./test.sh -f tsim/stream/udTableAndTag2.sim
,,y,script,./test.sh -f tsim/stream/windowClose.sim
,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim
,,y,script,./test.sh -f tsim/tmq/basic1.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim
,,y,script,./test.sh -f tsim/tmq/basic2.sim ,,y,script,./test.sh -f tsim/tmq/basic2.sim
......
...@@ -213,38 +213,47 @@ rem ./test.sh -f tsim/db/alter_replica_13.sim ...@@ -213,38 +213,47 @@ rem ./test.sh -f tsim/db/alter_replica_13.sim
./test.sh -f tsim/stream/basic0.sim -g ./test.sh -f tsim/stream/basic0.sim -g
./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/basic3.sim
./test.sh -f tsim/stream/basic4.sim
./test.sh -f tsim/stream/checkStreamSTable1.sim
./test.sh -f tsim/stream/checkStreamSTable.sim
./test.sh -f tsim/stream/deleteInterval.sim
./test.sh -f tsim/stream/deleteSession.sim
./test.sh -f tsim/stream/deleteState.sim
./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim
./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/fillHistoryBasic1.sim ./test.sh -f tsim/stream/fillHistoryBasic1.sim
./test.sh -f tsim/stream/fillHistoryBasic2.sim ./test.sh -f tsim/stream/fillHistoryBasic2.sim
./test.sh -f tsim/stream/fillHistoryBasic3.sim ./test.sh -f tsim/stream/fillHistoryBasic3.sim
./test.sh -f tsim/stream/distributeInterval0.sim
./test.sh -f tsim/stream/distributeIntervalRetrive0.sim
./test.sh -f tsim/stream/distributeSession0.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/triggerInterval0.sim
./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/sliding.sim
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
./test.sh -f tsim/stream/partitionbyColumnSession.sim
./test.sh -f tsim/stream/partitionbyColumnState.sim
./test.sh -f tsim/stream/deleteInterval.sim
./test.sh -f tsim/stream/deleteSession.sim
./test.sh -f tsim/stream/deleteState.sim
./test.sh -f tsim/stream/fillIntervalDelete0.sim ./test.sh -f tsim/stream/fillIntervalDelete0.sim
./test.sh -f tsim/stream/fillIntervalDelete1.sim ./test.sh -f tsim/stream/fillIntervalDelete1.sim
./test.sh -f tsim/stream/fillIntervalLinear.sim ./test.sh -f tsim/stream/fillIntervalLinear.sim
./test.sh -f tsim/stream/fillIntervalPartitionBy.sim ./test.sh -f tsim/stream/fillIntervalPartitionBy.sim
./test.sh -f tsim/stream/fillIntervalPrevNext1.sim
./test.sh -f tsim/stream/fillIntervalPrevNext.sim ./test.sh -f tsim/stream/fillIntervalPrevNext.sim
./test.sh -f tsim/stream/fillIntervalRange.sim
./test.sh -f tsim/stream/fillIntervalValue.sim ./test.sh -f tsim/stream/fillIntervalValue.sim
./test.sh -f tsim/stream/ignoreCheckUpdate.sim
./test.sh -f tsim/stream/ignoreExpiredData.sim
./test.sh -f tsim/stream/partitionby1.sim
./test.sh -f tsim/stream/partitionbyColumnInterval.sim
./test.sh -f tsim/stream/partitionbyColumnSession.sim
./test.sh -f tsim/stream/partitionbyColumnState.sim
./test.sh -f tsim/stream/partitionby.sim
./test.sh -f tsim/stream/schedSnode.sim
./test.sh -f tsim/stream/session0.sim
./test.sh -f tsim/stream/session1.sim
./test.sh -f tsim/stream/sliding.sim
./test.sh -f tsim/stream/state0.sim
./test.sh -f tsim/stream/state1.sim
./test.sh -f tsim/stream/triggerInterval0.sim
./test.sh -f tsim/stream/triggerSession0.sim
./test.sh -f tsim/stream/udTableAndTag0.sim ./test.sh -f tsim/stream/udTableAndTag0.sim
./test.sh -f tsim/stream/udTableAndTag1.sim ./test.sh -f tsim/stream/udTableAndTag1.sim
./test.sh -f tsim/stream/udTableAndTag2.sim
./test.sh -f tsim/stream/windowClose.sim
./test.sh -f tsim/trans/lossdata1.sim ./test.sh -f tsim/trans/lossdata1.sim
./test.sh -f tsim/trans/create_db.sim ./test.sh -f tsim/trans/create_db.sim
./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/basic1.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册