提交 b17f99de 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 981466a0
......@@ -313,21 +313,30 @@ int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* val
}
// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateFillPut_rocksdb(pState, key, value, vLen);
return streamStateGet_rocksdb(pState, key, pVal, pVLen);
#else
return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
#endif
}
// todo refactor
int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateGet_rocksdb(pState, key, pVal, pVLen);
return streamStateDel_rocksdb(pState, key);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen);
return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
#endif
}
// todo refactor
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
#ifdef USE_ROCKSDB
return streamStateFillPut_rocksdb(pState, key, value, vLen);
#else
return tdbTbUpsert(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), value, vLen, pState->pTdbState->txn);
#endif
}
......@@ -341,12 +350,11 @@ int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal
}
// todo refactor
int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateDel_rocksdb(pState, key);
return streamStateFillDel_rocksdb(pState, key);
#else
SStateKey sKey = {.key = *key, .opNum = pState->number};
return tdbTbDelete(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pState->pTdbState->txn);
return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
#endif
}
......@@ -373,15 +381,6 @@ int32_t streamStateClear(SStreamState* pState) {
void streamStateSetNumber(SStreamState* pState, int32_t number) { pState->number = number; }
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB
return streamStateFillDel_rocksdb(pState, key);
#else
return tdbTbDelete(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pState->pTdbState->txn);
#endif
}
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
return streamStateAddIfNotExist_rocksdb(pState, key, pVal, pVLen);
......@@ -535,13 +534,21 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) {
}
int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
//
#ifdef USE_ROCKSDB
rocksdb_iter_seek_to_first(pCur->iter);
return 0;
#else
return tdbTbcMoveToFirst(pCur->pCur);
#endif
}
int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
//
#ifdef USE_ROCKSDB
rocksdb_iter_seek_to_last(pCur->iter);
return 0;
#else
return tdbTbcMoveToLast(pCur->pCur);
#endif
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
......
......@@ -49,7 +49,7 @@ int stateKeyEncode(void* k, char* buf) {
int len = 0;
len += taosEncodeFixedU64((void**)&buf, key->key.groupId);
len += taosEncodeFixedI64((void**)&buf, key->key.ts);
len += taosEncodeFixedU64((void**)&buf, key->opNum);
len += taosEncodeFixedI64((void**)&buf, key->opNum);
return len;
}
int stateKeyDecode(void* k, char* buf) {
......@@ -482,6 +482,17 @@ int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, con
return code;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
return code;
}
// todo refactor
int32_t streamStateClear_rocksdb(SStreamState* pState) {
......@@ -502,16 +513,6 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
}
return 0;
}
int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen);
return code;
}
int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int code = 0;
STREAM_STATE_DEL_ROCKSDB(pState, "fill", key);
return code;
}
int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) {
int code = 0;
......@@ -545,10 +546,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
......@@ -560,9 +557,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
pCur->number = pState->number;
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey));
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
......@@ -575,10 +575,6 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
......@@ -606,10 +602,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
......@@ -626,31 +618,43 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
qWarn("streamStateGetCur_rocksdb-->1");
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]);
int32_t c = 0;
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
stateKeyEncode((void*)&sKey, buf);
char sKeyStr[128] = {0};
stateKeyToString(&sKey, sKeyStr);
rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey));
if (rocksdb_iter_valid(pCur->iter)) {
qWarn("streamStateGetCur_rocksdb-->2");
SStateKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr);
char tKeyStr[128] = {0};
stateKeyToString(&curKey, tKeyStr);
qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
pCur->number = pState->number;
return pCur;
}
qWarn("streamStateGetCur_rocksdb-->3");
}
qWarn("streamStateGetCur_rocksdb-->4");
streamStateFreeCur(pCur);
return NULL;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->iter =
......@@ -688,15 +692,19 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
SStateKey* pKtmp = &tkey;
if (rocksdb_iter_valid(pCur->iter)) {
qWarn("streamStateGetKVByCur_rocksdb-2");
size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
stateKeyDecode((void*)pKtmp, keyStr);
if (pKtmp->opNum != pCur->number) {
qWarn("streamStateGetKVByCur_rocksdb-3");
return -1;
}
qWarn("streamStateGetKVByCur_rocksdb-4");
*pKey = pKtmp->key;
return 0;
}
qWarn("streamStateGetKVByCur_rocksdb-5");
return -1;
}
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
......@@ -779,10 +787,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
pCur->number = pState->number;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]);
// if (!rocksdb_iter_valid(pCur->iter)) {
// streamStateFreeCur(pCur);
// return NULL;
// }
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
stateKeyEncode((void*)&sKey, buf);
......@@ -796,9 +801,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return pCur;
}
rocksdb_iter_next(pCur->iter);
if (rocksdb_iter_valid(pCur->iter)) {
return pCur;
}
return pCur;
}
streamStateFreeCur(pCur);
return NULL;
......@@ -817,6 +820,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
rocksdb_iter_seek(pCur->iter, buf, sizeof(*key));
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
{
SWinKey curKey;
......@@ -827,9 +831,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
return pCur;
}
rocksdb_iter_next(pCur->iter);
if (rocksdb_iter_valid(pCur->iter)) {
return pCur;
}
return pCur;
}
streamStateFreeCur(pCur);
return NULL;
......@@ -848,6 +850,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
rocksdb_iter_seek(pCur->iter, buf, sizeof(*key));
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
{
......@@ -859,9 +862,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
return pCur;
}
rocksdb_iter_prev(pCur->iter);
if (rocksdb_iter_valid(pCur->iter)) {
return pCur;
}
return pCur;
}
streamStateFreeCur(pCur);
return NULL;
......@@ -869,10 +870,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qWarn("streamStateCurPrev_rocksdb");
if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
return -1;
}
return 0;
}
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
......@@ -880,9 +879,6 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur)
return -1;
}
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
return -1;
}
return 0;
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
......@@ -959,7 +955,6 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
}
}
streamStateFreeCur(pCur);
// impl later
return code;
}
......@@ -1085,7 +1080,7 @@ int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
// memset(buf, 0, size);
memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册