提交 05df0f6e 编写于 作者: dengyihao's avatar dengyihao

add ttl to stream state key

上级 81bd7661
...@@ -44,6 +44,10 @@ int streamStateValueIsStale(char* vv) { ...@@ -44,6 +44,10 @@ int streamStateValueIsStale(char* vv) {
taosDecodeFixedI64(vv, &ts); taosDecodeFixedI64(vv, &ts);
return ts < taosGetTimestampSec() ? 1 : 0; return ts < taosGetTimestampSec() ? 1 : 0;
} }
int iterValueIsStale(rocksdb_iterator_t* iter) {
char* vv = (char*)rocksdb_iter_value(iter, NULL);
return streamStateValueIsStale(vv);
}
int defaultKeyEncode(void* k, char* buf) { int defaultKeyEncode(void* k, char* buf) {
int len = strlen((char*)k); int len = strlen((char*)k);
memcpy(buf, (char*)k, len); memcpy(buf, (char*)k, len);
...@@ -869,7 +873,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons ...@@ -869,7 +873,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
SStateKey tkey; SStateKey tkey;
SStateKey* pKtmp = &tkey; SStateKey* pKtmp = &tkey;
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
size_t tlen; size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
stateKeyDecode((void*)pKtmp, keyStr); stateKeyDecode((void*)pKtmp, keyStr);
...@@ -913,9 +917,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin ...@@ -913,9 +917,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL; return NULL;
} }
// skip ttl expired data // skip ttl expired data
while (rocksdb_iter_valid(pCur->iter)) { while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) break;
rocksdb_iter_next(pCur->iter); rocksdb_iter_next(pCur->iter);
} }
...@@ -949,9 +951,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK ...@@ -949,9 +951,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter)) { while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) break;
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
} }
...@@ -977,7 +977,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* ...@@ -977,7 +977,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
rocksdb_iter_seek(pCur->iter, buf, len); rocksdb_iter_seek(pCur->iter, buf, len);
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL); char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) { if (!streamStateValueIsStale(val)) {
SStateKey curKey; SStateKey curKey;
...@@ -1065,17 +1065,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta ...@@ -1065,17 +1065,11 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
while (rocksdb_iter_valid(pCur->iter)) {
char* val = (char*)rocksdb_iter_value(pCur->iter, NULL);
if (!streamStateValueIsStale(val)) {
break;
}
rocksdb_iter_prev(pCur->iter);
} }
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -1113,8 +1107,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta ...@@ -1113,8 +1107,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
const char* iVal = rocksdb_iter_value(pCur->iter, NULL); if (iterValueIsStale(pCur->iter)) {
if (streamStateValueIsStale((char*)iVal)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -1150,7 +1143,11 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con ...@@ -1150,7 +1143,11 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0}; SStateSessionKey curKey = {0};
...@@ -1172,7 +1169,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* ...@@ -1172,7 +1169,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
SStateSessionKey ktmp = {0}; SStateSessionKey ktmp = {0};
size_t kLen = 0, vLen = 0; size_t kLen = 0, vLen = 0;
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
return -1; return -1;
} }
const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
...@@ -1231,12 +1228,9 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK ...@@ -1231,12 +1228,9 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (rocksdb_iter_valid(pCur->iter)) { if (iterValueIsStale(pCur->iter)) {
const char* iVal = rocksdb_iter_value(pCur->iter, NULL); streamStateFreeCur(pCur);
if (streamStateValueIsStale((char*)iVal)) { return NULL;
streamStateFreeCur(pCur);
return NULL;
}
} }
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
...@@ -1258,25 +1252,24 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, ...@@ -1258,25 +1252,24 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
return -1; return -1;
} }
SWinKey winKey; SWinKey winKey;
if (rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) {
size_t tlen; return -1;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); }
winKeyDecode(&winKey, keyStr); size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
size_t vlen = 0; winKeyDecode(&winKey, keyStr);
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
char* dst = NULL;
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
if (len < 0) {
return -1;
}
if (pVal != NULL) *pVal = (char*)dst;
if (pVLen != NULL) *pVLen = vlen;
} else { size_t vlen = 0;
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
char* dst = NULL;
int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst);
if (len < 0) {
return -1; return -1;
} }
if (pVal != NULL) *pVal = (char*)dst;
if (pVLen != NULL) *pVLen = vlen;
*pKey = winKey; *pKey = winKey;
return 0; return 0;
} }
...@@ -1297,8 +1290,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const ...@@ -1297,8 +1290,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
// skip stale data
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_next(pCur->iter);
}
{ if (rocksdb_iter_valid(pCur->iter)) {
SWinKey curKey; SWinKey curKey;
size_t kLen = 0; size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
...@@ -1328,8 +1325,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const ...@@ -1328,8 +1325,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter);
}
{ if (rocksdb_iter_valid(pCur->iter)) {
SWinKey curKey; SWinKey curKey;
size_t kLen = 0; size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
...@@ -1340,6 +1340,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const ...@@ -1340,6 +1340,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
return pCur; return pCur;
} }
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
...@@ -1621,7 +1622,7 @@ int32_t streamDefaultIterValid_rocksdb(void* iter) { ...@@ -1621,7 +1622,7 @@ int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter; SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter); bool val = rocksdb_iter_valid(pCur->iter);
return val ? 0 : -1; return val ? 1 : 0;
} }
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter; SStreamStateCur* pCur = iter;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册