提交 e6e9828d 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 ae54e994
......@@ -415,25 +415,27 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
int idx = streamGetInit(cfName);
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
*readOpt = (rocksdb_readoptions_t*)rocksdb_readoptions_create();
rocksdb_readoptions_set_snapshot(pState->pTdbState->readOpts, *snapshot);
rocksdb_readoptions_set_fill_cache(pState->pTdbState->readOpts, 0);
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt;
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, *readOpt, pState->pTdbState->pHandle[idx]);
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
do { \
code = 0; \
char buf[40] = {0}; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[40] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
......@@ -452,14 +454,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
do { \
code = 0; \
char buf[40] = {0}; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[40] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
......@@ -487,14 +489,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
do { \
code = 0; \
char buf[40] = {0}; \
char buf[128] = {0}; \
char* err = NULL; \
int i = streamGetInit(funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
return -1; \
} \
char toString[40] = {0}; \
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
......@@ -573,14 +575,14 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number};
char sKeyStr[40] = {0};
char eKeyStr[40] = {0};
char sKeyStr[128] = {0};
char eKeyStr[128] = {0};
int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr);
char toStringStart[40] = {0};
char toStringEnd[40] = {0};
char toStringStart[128] = {0};
char toStringEnd[128] = {0};
if (qDebugFlag & DEBUG_TRACE) {
stateKeyToString(&sKey, toStringStart);
stateKeyToString(&eKey, toStringEnd);
......@@ -614,7 +616,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
char buf[40] = {0};
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
......@@ -647,7 +649,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
pCur->number = pState->number;
char buf[40] = {0};
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
......@@ -681,7 +683,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[40] = {0};
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
......@@ -721,7 +723,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[40] = {0};
char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
......@@ -777,7 +779,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
char buf[40] = {0};
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
......@@ -890,7 +892,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[40] = {0};
char buf[128] = {0};
int len = stateKeyEncode((void*)&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
......@@ -921,7 +923,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
char buf[32] = {0};
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
......@@ -952,7 +954,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
pCur->db = pState->pTdbState->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
char buf[24] = {0};
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
......@@ -999,7 +1001,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
char buf[64] = {0};
char buf[128] = {0};
int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册