From c0ebdb92d32562c08d1fe9f94608bae42b286170 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 14 Apr 2023 02:10:33 +0000 Subject: [PATCH] startGroupTableMe --- source/libs/stream/inc/streamBackendRocksdb.h | 8 ++--- source/libs/stream/src/streamStateRocksdb.c | 35 +++++++++++++++---- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 01934ecf77..6e5fc53f3e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -92,10 +92,10 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); int32_t streamDefaultIterValid_rocksdb(void* iter); -void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); -int32_t streamDefaultIter_rocksdb(void* iter); -char** streamDefaultIterKey_rocksdb(void* iter); -char* streamDefaultIterVal_rocksdb(void* iter); +void streamDefaultIterSeek_rocksdb(void* iter, const char* key); +void streamDefaultIterNext_rocksdb(void* iter); +char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); +char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 0fa7b78ce2..79776a0424 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -646,12 +646,35 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { return code; } -void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {} -int32_t streamDefaultIterValid_rocksdb(void* iter); -void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); -int32_t streamDefaultIter_rocksdb(void* iter); -char** streamDefaultIterKey_rocksdb(void* iter); -char* streamDefaultIterVal_rocksdb(void* iter); +void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + return pCur; +} +int32_t streamDefaultIterValid_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + bool val = rocksdb_iter_valid(pCur->iter); + + return val ? 0 : -1; +} +void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { + SStreamStateCur* pCur = iter; + rocksdb_iter_seek(pCur->iter, key, strlen(key)); +} +void streamDefaultIterNext_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + rocksdb_iter_next(pCur->iter); +} +char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len); +} +char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len); +} // typedef struct { // char* start; // char* end; -- GitLab