提交 764a8129 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 914d3918
......@@ -438,6 +438,35 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
STREAM_STATE_PUT_ROCKSDB(pState, "sess", key, value, vLen);
return code;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->number = pState->number;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
......@@ -467,21 +496,89 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
}
return pCur;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
SStateSessionKey ktmp = {0};
SStateSessionKey* pKTmp = &ktmp;
int32_t kLen, vLen;
if (!rocksdb_iter_valid(pCur->iter)) {
return -1;
}
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
*pVal = (char*)val;
*pVLen = vLen;
if (pKTmp->opNum != pCur->number) {
return -1;
}
if (pKey->groupId != 0 && pKey->groupId != pKTmp->key.groupId) {
return -1;
}
*pKey = pKTmp->key;
return 0;
}
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
if (!pCur) {
return -1;
}
rocksdb_iter_next(pCur->iter);
return 0;
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SSessionKey resKey = *key;
void* tmp = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, pVLen);
if (code == 0) {
if (key->win.skey != resKey.win.skey) {
code = -1;
} else {
*key = resKey;
*pVal = tmp;
}
}
streamStateFreeCur(pCur);
// impl later
return code;
}
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) {
int code = 0;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
STREAM_STATE_DEL_ROCKSDB(pState, "sess", key);
return code;
}
int32_t streamStateSessionClear(SStreamState* pState) {
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
while (1) {
SSessionKey delKey = {0};
void* buf = NULL;
int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) {
memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else {
break;
}
streamStateCurNext_rocksdb(pState, pCur);
}
streamStateFreeCur(pCur);
return -1;
}
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) {
int code = 0;
STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册