diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a33a0a577b7e06fc39bc979498ce7a6b6e943360..01934ecf77e22eb6e66799710ad089e9763db53f 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -89,5 +89,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); -int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); + +void* streamDefaultIterCreate_rocksdb(SStreamState* pState); +int32_t streamDefaultIterValid_rocksdb(void* iter); +void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); +int32_t streamDefaultIter_rocksdb(void* iter); +char** streamDefaultIterKey_rocksdb(void* iter); +char* streamDefaultIterVal_rocksdb(void* iter); + +// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 9977732a9aae0fe3d16c38ba6fdb2ee164888279..0fa7b78ce2da1d3ac45817f02c89b571dfcd695b 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -646,37 +646,86 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { return code; } +void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {} +int32_t streamDefaultIterValid_rocksdb(void* iter); +void* streamDefaultIterSeek_rocksdb(void* iter, const char* key); +int32_t streamDefaultIter_rocksdb(void* iter); +char** streamDefaultIterKey_rocksdb(void* iter); +char* streamDefaultIterVal_rocksdb(void* iter); +// typedef struct { +// char* start; +// char* end; +// void* result; +// } StreamFilterArg; + +// typedef int (*streamfilterFunc)(StreamFilterArg* arg); + +// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) { +// int code = 0; +// char* err = NULL; + +// rocksdb_snapshot_t* snapshot = NULL; +// rocksdb_readoptions_t* readopts = NULL; +// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); +// if (pIter == NULL) { +// return -1; +// } +// char* start = arg->start; +// char* end = arg->end; +// SArray* result = arg->result; + +// rocksdb_iter_seek(pIter, start, strlen(start)); +// while (rocksdb_iter_valid(pIter)) { +// const char* key = rocksdb_iter_key(pIter, NULL); +// if (end != NULL && strcmp(key, end) > 0) { +// break; +// } +// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { +// int64_t checkPoint = 0; +// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { +// // taosArrayPush(result, &checkPoint); +// // } +// } else { +// break; +// } +// rocksdb_iter_next(pIter); +// } +// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); +// rocksdb_readoptions_destroy(readopts); +// rocksdb_iter_destroy(pIter); +// return code; +// } int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { - int code = 0; - char* err = NULL; + // int code = 0; + // char* err = NULL; - rocksdb_snapshot_t* snapshot = NULL; - rocksdb_readoptions_t* readopts = NULL; - rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); - if (pIter == NULL) { - return -1; - } + // rocksdb_snapshot_t* snapshot = NULL; + // rocksdb_readoptions_t* readopts = NULL; + // rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + // if (pIter == NULL) { + // return -1; + // } - rocksdb_iter_seek(pIter, start, strlen(start)); - while (rocksdb_iter_valid(pIter)) { - const char* key = rocksdb_iter_key(pIter, NULL); - if (end != NULL && strcmp(key, end) > 0) { - break; - } - if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { - int64_t checkPoint = 0; - if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - taosArrayPush(result, &checkPoint); - } - } else { - break; - } - rocksdb_iter_next(pIter); - } - rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); - rocksdb_readoptions_destroy(readopts); - rocksdb_iter_destroy(pIter); - return code; + // rocksdb_iter_seek(pIter, start, strlen(start)); + // while (rocksdb_iter_valid(pIter)) { + // const char* key = rocksdb_iter_key(pIter, NULL); + // if (end != NULL && strcmp(key, end) > 0) { + // break; + // } + // if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + // int64_t checkPoint = 0; + // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + // taosArrayPush(result, &checkPoint); + // } + // } else { + // break; + // } + // rocksdb_iter_next(pIter); + // } + // rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + // rocksdb_readoptions_destroy(readopts); + // rocksdb_iter_destroy(pIter); + // return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {