diff --git a/cmake/cmake.options b/cmake/cmake.options index 60ff00affc01408b084a8993441e4fe7052f4977..4ec9d18e08580a735598b647ef65188b46fdbc61 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -109,7 +109,7 @@ option( option( BUILD_WITH_ROCKSDB "If build with rocksdb" - OFF + ON ) option( diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 6b09bf4899ba0f444b80449c2a0d67df160b59ef..7b80c63eb882deaf875863efc89d7317ceada591 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -14,6 +14,8 @@ */ #include "tdatablock.h" + +#include "rocksdb/c.h" #include "tdbInt.h" #ifdef __cplusplus @@ -29,14 +31,18 @@ typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef struct STdbState { SStreamTask* pOwner; - TDB* db; - TTB* pStateDb; - TTB* pFuncStateDb; - TTB* pFillStateDb; // todo refactor - TTB* pSessionStateDb; - TTB* pParNameDb; - TTB* pParTagDb; - TXN* txn; + + rocksdb_t* rocksdb; + + TDB* db; + TTB* pStateDb; + TTB* pFuncStateDb; + TTB* pFillStateDb; // todo refactor + TTB* pSessionStateDb; + TTB* pParNameDb; + TTB* pParTagDb; + TXN* txn; + } STdbState; // incremental state storage diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt index ceddf4f2153735c4d8423dde806272419dc35fc5..d11ec74ee8eedc8fa512bf5d9ed5aa1808aadc2a 100644 --- a/source/libs/stream/CMakeLists.txt +++ b/source/libs/stream/CMakeLists.txt @@ -3,15 +3,27 @@ add_library(stream STATIC ${STREAM_SRC}) target_include_directories( stream PUBLIC "${TD_SOURCE_DIR}/include/libs/stream" + PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -target_link_libraries( - stream - PUBLIC tdb - PRIVATE os util transport qcom executor -) +if(${BUILD_WITH_ROCKSDB}) + target_link_libraries( + stream + PUBLIC rocksdb tdb + PRIVATE os util transport qcom executor + ) + #add_definitions(-DUSE_ROCKSDB) +endif(${BUILD_WITH_ROCKSDB}) + + +#target_link_libraries( +# stream +# PUBLIC tdb +# PRIVATE os util transport qcom executor +#) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) endif(${BUILD_TEST}) + diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index a2b3e20dbfbab336ef50ef786daff4ccba95858a..1d1fe27fd8b2977c25f26e84607882aafa61d62e 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -13,7 +13,9 @@ * along with this program. If not, see . */ +#include "streamState.h" #include "executor.h" +#include "rocksdb/c.h" #include "streamInc.h" #include "tcommon.h" #include "tcompare.h" @@ -106,6 +108,35 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, return 0; } +int compareState(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { return -1; } +const char* compareStateName(void* name) { return NULL; } +int streamInitBackend(SStreamState* pState, char* path) { + rocksdb_options_t* opts = rocksdb_options_create(); + rocksdb_options_increase_parallelism(opts, 4); + rocksdb_options_optimize_level_style_compaction(opts, 0); + // create the DB if it's not already present + rocksdb_options_set_create_if_missing(opts, 1); + + rocksdb_comparator_t* cmp1 = rocksdb_comparator_create(NULL, NULL, compareState, compareStateName); + rocksdb_comparator_t* cmp2 = rocksdb_comparator_create(NULL, NULL, compareState, compareStateName); + + char* err = NULL; + rocksdb_t* db = rocksdb_open(opts, path, &err); + if (err == NULL) { + pState->pTdbState->rocksdb = db; + } + + rocksdb_options_t* dbOpts1 = rocksdb_options_create_copy(opts); + rocksdb_options_t* dbOpts2 = rocksdb_options_create_copy(opts); + + rocksdb_options_set_comparator(dbOpts1, cmp1); + rocksdb_options_set_comparator(dbOpts2, cmp2); + + rocksdb_column_family_handle_t* cf1 = rocksdb_create_column_family(db, dbOpts1, "cmp1", &err); + rocksdb_column_family_handle_t* cf2 = rocksdb_create_column_family(db, dbOpts2, "cmp2", &err); + + return 0; +} SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { @@ -118,6 +149,15 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int streamStateDestroy(pState); return NULL; } +#ifdef USE_ROCKSDB + int code = streamInitBackend(pState, path); + if (code == -1) { + taosMemoryFree(pState); + pState = NULL; + } + return pState; + +#else char statePath[1024]; if (!specPath) { @@ -202,9 +242,13 @@ _err: tdbClose(pState->pTdbState->db); streamStateDestroy(pState); return NULL; +#endif } void streamStateClose(SStreamState* pState) { +#ifdef USE_ROCKSDB + +#else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbTbClose(pState->pTdbState->pStateDb); @@ -214,7 +258,7 @@ void streamStateClose(SStreamState* pState) { tdbTbClose(pState->pTdbState->pParNameDb); tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); - +#endif streamStateDestroy(pState); }