提交 8039702c 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 509c6387
......@@ -109,7 +109,7 @@ option(
option(
BUILD_WITH_ROCKSDB
"If build with rocksdb"
OFF
ON
)
option(
......
......@@ -14,6 +14,8 @@
*/
#include "tdatablock.h"
#include "rocksdb/c.h"
#include "tdbInt.h"
#ifdef __cplusplus
......@@ -29,6 +31,9 @@ typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2);
typedef struct STdbState {
SStreamTask* pOwner;
rocksdb_t* rocksdb;
TDB* db;
TTB* pStateDb;
TTB* pFuncStateDb;
......@@ -37,6 +42,7 @@ typedef struct STdbState {
TTB* pParNameDb;
TTB* pParTagDb;
TXN* txn;
} STdbState;
// incremental state storage
......
......@@ -3,15 +3,27 @@ add_library(stream STATIC ${STREAM_SRC})
target_include_directories(
stream
PUBLIC "${TD_SOURCE_DIR}/include/libs/stream"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
if(${BUILD_WITH_ROCKSDB})
target_link_libraries(
stream
PUBLIC tdb
PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor
)
)
#add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
#target_link_libraries(
# stream
# PUBLIC tdb
# PRIVATE os util transport qcom executor
#)
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
......@@ -13,7 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamState.h"
#include "executor.h"
#include "rocksdb/c.h"
#include "streamInc.h"
#include "tcommon.h"
#include "tcompare.h"
......@@ -106,6 +108,35 @@ static inline int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
return 0;
}
int compareState(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { return -1; }
const char* compareStateName(void* name) { return NULL; }
int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_increase_parallelism(opts, 4);
rocksdb_options_optimize_level_style_compaction(opts, 0);
// create the DB if it's not already present
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_comparator_t* cmp1 = rocksdb_comparator_create(NULL, NULL, compareState, compareStateName);
rocksdb_comparator_t* cmp2 = rocksdb_comparator_create(NULL, NULL, compareState, compareStateName);
char* err = NULL;
rocksdb_t* db = rocksdb_open(opts, path, &err);
if (err == NULL) {
pState->pTdbState->rocksdb = db;
}
rocksdb_options_t* dbOpts1 = rocksdb_options_create_copy(opts);
rocksdb_options_t* dbOpts2 = rocksdb_options_create_copy(opts);
rocksdb_options_set_comparator(dbOpts1, cmp1);
rocksdb_options_set_comparator(dbOpts2, cmp2);
rocksdb_column_family_handle_t* cf1 = rocksdb_create_column_family(db, dbOpts1, "cmp1", &err);
rocksdb_column_family_handle_t* cf2 = rocksdb_create_column_family(db, dbOpts2, "cmp2", &err);
return 0;
}
SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
if (pState == NULL) {
......@@ -118,6 +149,15 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
streamStateDestroy(pState);
return NULL;
}
#ifdef USE_ROCKSDB
int code = streamInitBackend(pState, path);
if (code == -1) {
taosMemoryFree(pState);
pState = NULL;
}
return pState;
#else
char statePath[1024];
if (!specPath) {
......@@ -202,9 +242,13 @@ _err:
tdbClose(pState->pTdbState->db);
streamStateDestroy(pState);
return NULL;
#endif
}
void streamStateClose(SStreamState* pState) {
#ifdef USE_ROCKSDB
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbTbClose(pState->pTdbState->pStateDb);
......@@ -214,7 +258,7 @@ void streamStateClose(SStreamState* pState) {
tdbTbClose(pState->pTdbState->pParNameDb);
tdbTbClose(pState->pTdbState->pParTagDb);
tdbClose(pState->pTdbState->db);
#endif
streamStateDestroy(pState);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册