提交 a3a0d9c8 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 0690604a
......@@ -34,13 +34,11 @@ typedef struct STdbState {
rocksdb_t* rocksdb;
rocksdb_column_family_handle_t** pHandle;
rocksdb_writeoptions_t* wopts;
rocksdb_readoptions_t* ropts;
// rocksdb_column_family_handle_t* fillStateDB;
// rocksdb_column_family_handle_t* sessStateDB;
// rocksdb_column_family_handle_t* funcStateDB;
// rocksdb_column_family_handle_t* parnameStateDB;
// rocksdb_column_family_handle_t* partagStateDB;
rocksdb_writeoptions_t* writeOpts;
rocksdb_readoptions_t* readOpts;
rocksdb_options_t** cfOpts;
rocksdb_comparator_t** pCompare;
rocksdb_options_t* dbOpt;
TDB* db;
TTB* pStateDb;
......
......@@ -216,7 +216,7 @@ _err:
void streamStateClose(SStreamState* pState) {
#ifdef USE_ROCKSDB
// streamCleanBackend(pState);
streamCleanBackend(pState);
#else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
......
......@@ -292,6 +292,7 @@ const char* compareSessionKey(void* name) { return cfName[2]; }
const char* compareFuncKey(void* name) { return cfName[3]; }
const char* compareParKey(void* name) { return cfName[4]; }
const char* comparePartagKey(void* name) { return cfName[5]; }
void destroyFunc(void* stata) { return; }
int streamInitBackend(SStreamState* pState, char* path) {
rocksdb_options_t* opts = rocksdb_options_create();
......@@ -309,43 +310,72 @@ int streamInitBackend(SStreamState* pState, char* path) {
cfOpt[i] = rocksdb_options_create_copy(opts);
}
rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, NULL, stateKeyDBComp, compareStateName);
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
rocksdb_comparator_t* stateCompare = rocksdb_comparator_create(NULL, destroyFunc, stateKeyDBComp, compareStateName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[0], stateCompare);
pCompare[0] = stateCompare;
rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, NULL, winKeyDBComp, compareWinKeyName);
rocksdb_comparator_t* fillCompare = rocksdb_comparator_create(NULL, destroyFunc, winKeyDBComp, compareWinKeyName);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[1], fillCompare);
pCompare[1] = fillCompare;
rocksdb_comparator_t* sessCompare = rocksdb_comparator_create(NULL, NULL, stateSessionKeyDBComp, compareSessionKey);
rocksdb_comparator_t* sessCompare =
rocksdb_comparator_create(NULL, destroyFunc, stateSessionKeyDBComp, compareSessionKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[2], sessCompare);
pCompare[2] = sessCompare;
rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, NULL, tupleKeyDBComp, compareFuncKey);
rocksdb_comparator_t* funcCompare = rocksdb_comparator_create(NULL, destroyFunc, tupleKeyDBComp, compareFuncKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[3], funcCompare);
pCompare[3] = funcCompare;
rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, NULL, parKeyDBComp, compareParKey);
rocksdb_comparator_t* parnameCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, compareParKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[4], parnameCompare);
pCompare[4] = parnameCompare;
rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, NULL, parKeyDBComp, comparePartagKey);
rocksdb_comparator_t* partagCompare = rocksdb_comparator_create(NULL, destroyFunc, parKeyDBComp, comparePartagKey);
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[5], partagCompare);
pCompare[5] = partagCompare;
rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
rocksdb_t* db = rocksdb_open_column_families(opts, path, cfLen, cfName, cfOpt, cfHandle, &err);
pState->pTdbState->rocksdb = db;
pState->pTdbState->pHandle = cfHandle;
pState->pTdbState->wopts = rocksdb_writeoptions_create();
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
// rocksdb_writeoptions_
rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->wopts, 1);
pState->pTdbState->ropts = rocksdb_readoptions_create();
rocksdb_writeoptions_set_no_slowdown(pState->pTdbState->writeOpts, 1);
pState->pTdbState->readOpts = rocksdb_readoptions_create();
pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt;
pState->pTdbState->pCompare = pCompare;
pState->pTdbState->dbOpt = opts;
return 0;
}
void streamCleanBackend(SStreamState* pState) {
if (pState->pTdbState->rocksdb == NULL) {
qInfo("rocksdb already free");
return;
}
int cfLen = sizeof(cfName) / sizeof(cfName[0]);
for (int i = 0; i < cfLen; i++) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]);
}
rocksdb_writeoptions_destroy(pState->pTdbState->wopts);
rocksdb_readoptions_destroy(pState->pTdbState->ropts);
rocksdb_options_destroy(pState->pTdbState->dbOpt);
taosMemoryFreeClear(pState->pTdbState->pHandle);
taosMemoryFreeClear(pState->pTdbState->cfOpts);
taosMemoryFree(pState->pTdbState->pCompare);
rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts);
pState->pTdbState->writeOpts = NULL;
rocksdb_readoptions_destroy(pState->pTdbState->readOpts);
pState->pTdbState->readOpts = NULL;
rocksdb_close(pState->pTdbState->rocksdb);
pState->pTdbState->rocksdb = NULL;
}
int streamGetInit(const char* funcName) {
......@@ -370,7 +400,7 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
}
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName) {
int idx = streamGetInit(cfName);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
pState->pTdbState->pHandle[idx]);
}
......@@ -389,7 +419,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
ginitDict[i].toStrFunc((void*)key, toString); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
......@@ -415,7 +445,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \
if (val == NULL) { \
......@@ -449,7 +479,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
rocksdb_t* db = pState->pTdbState->rocksdb; \
rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \
if (err != NULL) { \
qWarn("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \
......@@ -554,7 +584,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->number = pState->number;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
......@@ -593,11 +623,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
return NULL;
}
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
pCur->number = pState->number;
......@@ -641,7 +671,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
return NULL;
}
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
......@@ -688,7 +719,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
if (pCur == NULL) return NULL;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[0]);
pCur->iter = streamStateIterCreate(pState, "default");
......@@ -754,7 +785,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
if (pCur == NULL) return NULL;
/// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
char buf[128] = {0};
......@@ -869,7 +901,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
}
pCur->number = pState->number;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[0]);
pCur->iter = streamStateIterCreate(pState, "default");
......@@ -903,7 +935,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
return NULL;
}
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
......@@ -936,7 +968,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
return NULL;
}
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[1]);
pCur->iter = streamStateIterCreate(pState, "fill");
......@@ -984,7 +1016,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
}
pCur->number = pState->number;
// pCur->iter =
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts,
// rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->readOpts,
// pState->pTdbState->pHandle[2]);
pCur->iter = streamStateIterCreate(pState, "sess");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册