diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index dee215a336964e268c3bfab6f99c43d20eb73178..e95d76aa23e58f81dbe0d748178126b2277431e3 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -44,6 +44,10 @@ int streamStateValueIsStale(char* vv) { taosDecodeFixedI64(vv, &ts); return ts < taosGetTimestampSec() ? 1 : 0; } +int iterValueIsStale(rocksdb_iterator_t* iter) { + char* vv = (char*)rocksdb_iter_value(iter, NULL); + return streamStateValueIsStale(vv); +} int defaultKeyEncode(void* k, char* buf) { int len = strlen((char*)k); memcpy(buf, (char*)k, len); @@ -869,7 +873,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons SStateKey tkey; SStateKey* pKtmp = &tkey; - if (rocksdb_iter_valid(pCur->iter)) { + if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { size_t tlen; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); stateKeyDecode((void*)pKtmp, keyStr); @@ -913,9 +917,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } // skip ttl expired data - while (rocksdb_iter_valid(pCur->iter)) { - char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); - if (!streamStateValueIsStale(val)) break; + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { rocksdb_iter_next(pCur->iter); } @@ -949,9 +951,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_prev(pCur->iter); - while (rocksdb_iter_valid(pCur->iter)) { - char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); - if (!streamStateValueIsStale(val)) break; + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { rocksdb_iter_prev(pCur->iter); } @@ -977,7 +977,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* rocksdb_iter_seek(pCur->iter, buf, len); - if (rocksdb_iter_valid(pCur->iter)) { + if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); if (!streamStateValueIsStale(val)) { SStateKey curKey; @@ -1065,17 +1065,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { - streamStateFreeCur(pCur); - return NULL; - } - while (rocksdb_iter_valid(pCur->iter)) { - char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); - if (!streamStateValueIsStale(val)) { - break; - } - rocksdb_iter_prev(pCur->iter); } + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); return NULL; } @@ -1113,8 +1107,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta streamStateFreeCur(pCur); return NULL; } - const char* iVal = rocksdb_iter_value(pCur->iter, NULL); - if (streamStateValueIsStale((char*)iVal)) { + if (iterValueIsStale(pCur->iter)) { streamStateFreeCur(pCur); return NULL; } @@ -1150,7 +1143,11 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con streamStateFreeCur(pCur); return NULL; } - + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -1172,7 +1169,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* SStateSessionKey ktmp = {0}; size_t kLen = 0, vLen = 0; - if (!rocksdb_iter_valid(pCur->iter)) { + if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { return -1; } const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); @@ -1231,12 +1228,9 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK streamStateFreeCur(pCur); return NULL; } - if (rocksdb_iter_valid(pCur->iter)) { - const char* iVal = rocksdb_iter_value(pCur->iter, NULL); - if (streamStateValueIsStale((char*)iVal)) { - streamStateFreeCur(pCur); - return NULL; - } + if (iterValueIsStale(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; } if (rocksdb_iter_valid(pCur->iter)) { @@ -1258,25 +1252,24 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, return -1; } SWinKey winKey; - if (rocksdb_iter_valid(pCur->iter)) { - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - winKeyDecode(&winKey, keyStr); - - size_t vlen = 0; - const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - char* dst = NULL; - int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); - if (len < 0) { - return -1; - } - - if (pVal != NULL) *pVal = (char*)dst; - if (pVLen != NULL) *pVLen = vlen; + if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { + return -1; + } + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + winKeyDecode(&winKey, keyStr); - } else { + size_t vlen = 0; + const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); + char* dst = NULL; + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); + if (len < 0) { return -1; } + + if (pVal != NULL) *pVal = (char*)dst; + if (pVLen != NULL) *pVLen = vlen; + *pKey = winKey; return 0; } @@ -1297,8 +1290,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const streamStateFreeCur(pCur); return NULL; } + // skip stale data + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_next(pCur->iter); + } - { + if (rocksdb_iter_valid(pCur->iter)) { SWinKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); @@ -1328,8 +1325,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const streamStateFreeCur(pCur); return NULL; } + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_prev(pCur->iter); + } - { + if (rocksdb_iter_valid(pCur->iter)) { SWinKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); @@ -1340,6 +1340,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const rocksdb_iter_prev(pCur->iter); return pCur; } + streamStateFreeCur(pCur); return NULL; } @@ -1621,7 +1622,7 @@ int32_t streamDefaultIterValid_rocksdb(void* iter) { SStreamStateCur* pCur = iter; bool val = rocksdb_iter_valid(pCur->iter); - return val ? 0 : -1; + return val ? 1 : 0; } void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { SStreamStateCur* pCur = iter;