提交 43e16c86 编写于 作者: dengyihao's avatar dengyihao

add backend

上级 7e33e70a
...@@ -629,7 +629,8 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo ...@@ -629,7 +629,8 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
code = -1; code = -1;
} else { } else {
*key = resKey; *key = resKey;
*pVal = tmp; *pVal = taosMemoryMalloc(*pVLen);
memcpy(*pVal, tmp, *pVLen);
} }
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
...@@ -707,14 +708,14 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* ...@@ -707,14 +708,14 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
if (code == 0) { if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key); streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
streamStateSessionDel(pState, key); streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
...@@ -746,7 +747,7 @@ _end: ...@@ -746,7 +747,7 @@ _end:
return res; return res;
} }
int32_t streamStateSessionClear(SStreamState* pState) { int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);
...@@ -756,7 +757,7 @@ int32_t streamStateSessionClear(SStreamState* pState) { ...@@ -756,7 +757,7 @@ int32_t streamStateSessionClear(SStreamState* pState) {
int32_t size = 0; int32_t size = 0;
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size);
if (code == 0 && size > 0) { if (code == 0 && size > 0) {
memset(buf, 0, size); // memset(buf, 0, size);
streamStateSessionPut_rocksdb(pState, &delKey, buf, size); streamStateSessionPut_rocksdb(pState, &delKey, buf, size);
} else { } else {
break; break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册