diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index f1d7606805d1ded926645c03ce88cb9c60871df6..1112dda1d76da2323e63f79daa73ca5c7ee21b35 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -682,6 +682,35 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* streamStateFreeCur(pCur); return NULL; } + +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { + qDebug("streamStateGetAndCheckCur_rocksdb"); + SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); + if (pCur) { + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); + if (code == 0) return pCur; + streamStateFreeCur(pCur); + } + return NULL; +} +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateGetKVByCur_rocksdb"); + if (!pCur) return -1; + SStateKey tkey; + SStateKey* pKtmp = &tkey; + + if (rocksdb_iter_valid(pCur->iter)) { + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + stateKeyDecode((void*)pKtmp, keyStr); + if (pKtmp->opNum != pCur->number) { + return -1; + } + *pKey = pKtmp->key; + return 0; + } + return -1; +} SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -691,7 +720,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); char buf[128] = {0}; - int len = winKeyDecode((void*)key, buf); + int len = winKeyEncode((void*)key, buf); rocksdb_iter_seek(pCur->iter, buf, len); if (!rocksdb_iter_valid(pCur->iter)) { rocksdb_iter_seek_for_prev(pCur->iter, buf, len); @@ -713,34 +742,6 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK streamStateFreeCur(pCur); return NULL; } -SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { - qDebug("streamStateGetAndCheckCur_rocksdb"); - SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); - if (pCur) { - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); - if (code == 0) return pCur; - streamStateFreeCur(pCur); - } - return NULL; -} -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateGetKVByCur_rocksdb"); - if (!pCur) return -1; - SStateKey tkey; - SStateKey* pKtmp = &tkey; - - if (rocksdb_iter_valid(pCur->iter)) { - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - stateKeyDecode((void*)pKtmp, keyStr); - if (pKtmp->opNum != pCur->number) { - return -1; - } - *pKey = pKtmp->key; - return 0; - } - return -1; -} int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { qDebug("streamStateFillGetKVByCur_rocksdb"); if (!pCur) {