提交 7e33e70a 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 764a8129
......@@ -497,6 +497,36 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
return pCur;
}
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return NULL;
}
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
pCur->number = pState->number;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey));
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0};
stateSessionKeyDecode(&curKey, (char*)iKey);
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur;
}
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
......@@ -512,8 +542,8 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
stateSessionKeyDecode((void*)&ktmp, (char*)curKey);
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
*pVal = (char*)val;
*pVLen = vLen;
if (pVal != NULL) *pVal = (char*)val;
if (pVLen != NULL) *pVLen = vLen;
if (pKTmp->opNum != pCur->number) {
return -1;
......@@ -532,6 +562,62 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur)
rocksdb_iter_next(pCur->iter);
return 0;
}
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) {
return -1;
}
pCur->number = pState->number;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
char buf[128] = {0};
stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey));
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return -1;
}
int32_t kLen;
const char* iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
SStateSessionKey iKey = {0};
stateSessionKeyDecode(&iKey, (char*)iKeyStr);
c = stateSessionKeyCmpr(&sKey, sizeof(sKey), &iKey, sizeof(iKey));
SSessionKey resKey = *key;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
return code;
}
if (c > 0) {
streamStateCurNext_rocksdb(pState, pCur);
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
return code;
}
} else if (c < 0) {
streamStateCurPrev(pState, pCur);
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
streamStateFreeCur(pCur);
return code;
}
}
streamStateFreeCur(pCur);
return -1;
}
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
......@@ -558,6 +644,107 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
STREAM_STATE_DEL_ROCKSDB(pState, "sess", key);
return code;
}
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) {
// todo refactor
int32_t res = 0;
SSessionKey originKey = *key;
SSessionKey searchKey = *key;
searchKey.win.skey = key->win.skey - gap;
searchKey.win.ekey = key->win.ekey + gap;
int32_t valSize = *pVLen;
void* tmp = taosMemoryMalloc(valSize);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
streamStateCurNext_rocksdb(pState, pCur);
} else {
*key = originKey;
streamStateFreeCur(pCur);
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
}
*key = originKey;
res = 1;
memset(tmp, 0, valSize);
_end:
*pVal = tmp;
streamStateFreeCur(pCur);
return res;
}
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
// todo refactor
int32_t res = 0;
SSessionKey tmpKey = *key;
int32_t valSize = *pVLen;
void* tmp = taosMemoryMalloc(valSize);
// tdbRealloc(NULL, valSize);
if (!tmp) {
return -1;
}
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end;
}
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key);
goto _end;
}
streamStateCurNext_rocksdb(pState, pCur);
} else {
*key = tmpKey;
streamStateFreeCur(pCur);
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
if (code == 0) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize);
streamStateSessionDel_rocksdb(pState, key);
goto _end;
}
}
*key = tmpKey;
res = 1;
memset(tmp, 0, valSize);
_end:
*pVal = tmp;
streamStateFreeCur(pCur);
return res;
}
int32_t streamStateSessionClear(SStreamState* pState) {
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册