diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index bd9300dfab234525ea3589e2bce342fd07c898d6..7dc7f670d8b427ddd9ff35975707dd9f3bf37e3f 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -26,67 +26,65 @@ int streamInitBackend(SStreamState* pState, char* path); void streamCleanBackend(SStreamState* pState); +void streamStateDestroy_rocksdb(SStreamState* pState); -int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); -int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key); +// state cf int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key); -int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); -int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateClear_rocksdb(SStreamState* pState); +int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); +SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); +// func cf +int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); +int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key); + +// session cf int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); +int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); -int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); -int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); - +int32_t streamStateSessionClear_rocksdb(SStreamState* pState); int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); -int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); -int32_t streamStateSessionClear_rocksdb(SStreamState* pState); -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); - -int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - -int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); +// fill cf +int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key); -SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); -SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); -int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); -int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); + +// partag cf +int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); +int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); + +// parname cf int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); -void streamStateDestroy_rocksdb(SStreamState* pState); - -void* streamStateCreateBatch(); -int32_t streamStateGetBatchSize(void* pBatch); -void streamStateClearBatch(void* pBatch); -void streamStateDestroyBatch(void* pBatch); -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); -int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); +// default cf int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); - int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); int32_t streamDefaultIterValid_rocksdb(void* iter); @@ -95,5 +93,13 @@ void streamDefaultIterNext_rocksdb(void* iter); char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); +// batch func +void* streamStateCreateBatch(); +int32_t streamStateGetBatchSize(void* pBatch); +void streamStateClearBatch(void* pBatch); +void streamStateDestroyBatch(void* pBatch); +int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen); +int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index efa0e12d10a5fb076e3e909fbbe6e718ca8b6006..b6838651f0f709cb1c5e5c60212e59e2c8d1bad4 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -17,6 +17,15 @@ #include "streamBackendRocksdb.h" #include "tcommon.h" +static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); +int streamGetInit(const char* funcName); + +// |key|-----value------| +// |key|ttl|len|userData| + +static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, + rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); + int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int ret = memcmp(aBuf, bBuf, aLen); if (ret == 0) { @@ -312,8 +321,8 @@ int32_t streamValueToString(void* k, char* buf) { n += sprintf(buf + n, "data:%s]", key->data); return n; } -/*1: stale, 0: no stale*/ +/*1: stale, 0: no stale*/ int32_t streaValueIsStale(void* k, int64_t ts) { SStreamValue* key = k; if (key->unixTimestamp < ts) { @@ -325,8 +334,6 @@ int32_t streaValueIsStale(void* k, int64_t ts) { typedef struct { void* tableOpt; void* lru; // global or not - void* filteFactory; - } rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; @@ -363,6 +370,7 @@ typedef struct { DecodeValueFunc deValueFunc; } SCfInit; + int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; @@ -375,15 +383,30 @@ int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { *dest = p; return len; } +/* + * ret >= 0 : found valid value + * ret < 0 : error or timeout + */ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { SStreamValue key = {0}; char* p = value; + int64_t now = taosGetTimestampMs(); p = taosDecodeFixedI64(p, &key.unixTimestamp); p = taosDecodeFixedI32(p, &key.len); p = taosDecodeBinary(p, (void**)&(key.data), key.len); - - *ttl = key.unixTimestamp; - *dest = key.data; + if (key.unixTimestamp != 0 && key.unixTimestamp < now) { + taosMemoryFree(key.data); + *dest = NULL; + return -1; + } + if (ttl != NULL) { + *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; + } + if (dest != NULL) { + *dest = key.data; + } else { + taosMemoryFree(key.data); + } return key.len; } SCfInit ginitDict[] = { @@ -550,6 +573,10 @@ void streamCleanBackend(SStreamState* pState) { pState->pTdbState->rocksdb = NULL; } +void streamStateDestroy_rocksdb(SStreamState* pState) { + // only close db + streamCleanBackend(pState); +} int streamGetInit(const char* funcName) { size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -617,47 +644,52 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ - qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ - if (err != NULL) taosMemoryFree(err); \ - code = -1; \ - } else { \ - char * p = NULL, *end = NULL; \ - int64_t ttl; \ - int32_t vlen = ginitDict[i].deValueFunc(val, len, &ttl, &p); \ - if (pVal != NULL) { \ - *pVal = p; \ - } else { \ - taosMemoryFree(p); \ - } \ - if (vLen != NULL) *vLen = vlen; \ - } \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL) { \ + qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ + if (err != NULL) taosMemoryFree(err); \ + code = -1; \ + } else { \ + char * p = NULL, *end = NULL; \ + int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \ + if (len < 0) { \ + qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \ + } \ + if (pVal != NULL) { \ + *pVal = p; \ + } else { \ + taosMemoryFree(p); \ + } \ + if (vLen != NULL) *vLen = len; \ + } \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ + code = -1; \ + } else { \ + if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ + } \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -687,22 +719,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa } \ } while (0); -int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen); - return code; -} -int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); - return 0; -} -int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "func", key); - return 0; -} - +// state cf int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int code = 0; @@ -710,203 +727,18 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen); return code; } -int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { - char* err = NULL; - rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); - if (err != NULL) { - qError("streamState failed to write batch, err:%s", err); - taosMemoryFree(err); - return -1; - } - return 0; -} - -void* streamStateCreateBatch() { - rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create(); - return pBatch; -} -int32_t streamStateGetBatchSize(void* pBatch) { - if (pBatch == NULL) return -1; - - return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch); -} - -void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } -void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen) { - int i = streamGetInit(cfName); - - if (i < 0) { - qError("streamState failed to put to cf name:%s", cfName); - return -1; - } - char buf[128] = {0}; - int32_t klen = ginitDict[i].enFunc((void*)key, buf); - - char* ttlV = NULL; - int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); - rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; - rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); - taosMemoryFree(ttlV); - return 0; -} - -int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); - return code; -} -int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); - return code; -} -int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); - return code; -} - -void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); - return pCur; -} -int32_t streamDefaultIterValid_rocksdb(void* iter) { - SStreamStateCur* pCur = iter; - bool val = rocksdb_iter_valid(pCur->iter); - - return val ? 0 : -1; -} -void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { - SStreamStateCur* pCur = iter; - rocksdb_iter_seek(pCur->iter, key, strlen(key)); -} -void streamDefaultIterNext_rocksdb(void* iter) { - SStreamStateCur* pCur = iter; - rocksdb_iter_next(pCur->iter); -} -char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { - SStreamStateCur* pCur = iter; - return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len); -} -char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { - SStreamStateCur* pCur = iter; - return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len); -} -// typedef struct { -// char* start; -// char* end; -// void* result; -// } StreamFilterArg; - -// typedef int (*streamfilterFunc)(StreamFilterArg* arg); - -// int32_t streamDefaultIterFilter_rocksdb(SStreamState* pState, streamfilterFunc filterFunc, StreamFilterArg* arg) { -// int code = 0; -// char* err = NULL; - -// rocksdb_snapshot_t* snapshot = NULL; -// rocksdb_readoptions_t* readopts = NULL; -// rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); -// if (pIter == NULL) { -// return -1; -// } -// char* start = arg->start; -// char* end = arg->end; -// SArray* result = arg->result; - -// rocksdb_iter_seek(pIter, start, strlen(start)); -// while (rocksdb_iter_valid(pIter)) { -// const char* key = rocksdb_iter_key(pIter, NULL); -// if (end != NULL && strcmp(key, end) > 0) { -// break; -// } -// if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { -// int64_t checkPoint = 0; -// // if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { -// // taosArrayPush(result, &checkPoint); -// // } -// } else { -// break; -// } -// rocksdb_iter_next(pIter); -// } -// rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); -// rocksdb_readoptions_destroy(readopts); -// rocksdb_iter_destroy(pIter); -// return code; -// } -int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { - int code = 0; - char* err = NULL; - - rocksdb_snapshot_t* snapshot = NULL; - rocksdb_readoptions_t* readopts = NULL; - rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); - if (pIter == NULL) { - return -1; - } - - rocksdb_iter_seek(pIter, start, strlen(start)); - while (rocksdb_iter_valid(pIter)) { - const char* key = rocksdb_iter_key(pIter, NULL); - if (end != NULL && strcmp(key, end) > 0) { - break; - } - if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { - int64_t checkPoint = 0; - if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - taosArrayPush(result, &checkPoint); - } - } else { - break; - } - rocksdb_iter_next(pIter); - } - rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); - rocksdb_readoptions_destroy(readopts); - rocksdb_iter_destroy(pIter); - return code; -} - int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); return code; } -// todo refactor int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey); return code; } - -// todo refactor -int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - int code = 0; - - STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); - return code; -} - -int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); - return code; -} -int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); - return code; -} - -// todo refactor int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); @@ -976,124 +808,121 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { return 0; } - -int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { - int code = 0; - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); - if (code == -1) { +int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + if (!pCur) { + return -1; } + rocksdb_iter_next(pCur->iter); + return 0; +} +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { + qDebug("streamStateGetFirst_rocksdb"); + SWinKey tmp = {.ts = 0, .groupId = 0}; + streamStatePut_rocksdb(pState, &tmp, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + streamStateFreeCur(pCur); + streamStateDel_rocksdb(pState, &tmp); return code; } -SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { - qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) { - return NULL; - } - pCur->number = pState->number; - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); - char buf[128] = {0}; - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int len = stateSessionKeyEncode(&sKey, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { - streamStateFreeCur(pCur); - return NULL; +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateGetGroupKVByCur_rocksdb"); + if (!pCur) { + return -1; } + uint64_t groupId = pKey->groupId; - int32_t c = 0; - size_t klen; - const char* iKey = rocksdb_iter_key(pCur->iter, &klen); - SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); - if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; - - rocksdb_iter_prev(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - // qWarn("streamState failed to seek key prev - // %s", toString); - streamStateFreeCur(pCur); - return NULL; + int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + if (code == 0) { + if (pKey->groupId == groupId) { + return 0; + } } - return pCur; + return -1; } -SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { - qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) { - return NULL; +int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + qDebug("streamStateAddIfNotExist_rocksdb"); + int32_t size = *pVLen; + if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { + return 0; } - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); - pCur->number = pState->number; + *pVal = taosMemoryMalloc(size); + memset(*pVal, 0, size); + return 0; +} +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + qDebug("streamStateCurPrev_rocksdb"); + if (!pCur) return -1; - char buf[128] = {0}; + rocksdb_iter_prev(pCur->iter); + return 0; +} +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateGetKVByCur_rocksdb"); + if (!pCur) return -1; + SStateKey tkey; + SStateKey* pKtmp = &tkey; - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int len = stateSessionKeyEncode(&sKey, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { - streamStateFreeCur(pCur); - return NULL; + if (rocksdb_iter_valid(pCur->iter)) { + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + stateKeyDecode((void*)pKtmp, keyStr); + if (pKtmp->opNum != pCur->number) { + return -1; + } + size_t vlen = 0; + if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); + if (pVLen != NULL) *pVLen = vlen; + *pKey = pKtmp->key; + return 0; } - size_t klen; - const char* iKey = rocksdb_iter_key(pCur->iter, &klen); - SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); - if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; - - rocksdb_iter_next(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { + return -1; +} +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { + qDebug("streamStateGetAndCheckCur_rocksdb"); + SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); + if (pCur) { + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); + if (code == 0) return pCur; streamStateFreeCur(pCur); - return NULL; } - return pCur; + return NULL; } -SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { - qDebug("streamStateSessionSeekKeyNext_rocksdb"); +SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); pCur->number = pState->number; + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - - char buf[128] = {0}; - int len = stateSessionKeyEncode(&sKey, buf); + SStateKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + int len = stateKeyEncode((void*)&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } - size_t klen; - const char* iKey = rocksdb_iter_key(pCur->iter, &klen); - SStateSessionKey curKey = {0}; - stateSessionKeyDecode(&curKey, (char*)iKey); - if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; - - rocksdb_iter_next(pCur->iter); - if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; + if (rocksdb_iter_valid(pCur->iter)) { + SStateKey curKey; + size_t kLen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { + return pCur; + } + rocksdb_iter_next(pCur->iter); + return pCur; } - return pCur; + streamStateFreeCur(pCur); + return NULL; } -int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - qDebug("streamStateAddIfNotExist_rocksdb"); - int32_t size = *pVLen; - if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { - return 0; - } - *pVal = taosMemoryMalloc(size); - memset(*pVal, 0, size); - return 0; -} SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); int32_t code = 0; @@ -1112,6 +941,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); return pCur; } + SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -1140,116 +970,159 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* return NULL; } -SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { - qDebug("streamStateGetAndCheckCur_rocksdb"); - SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); - if (pCur) { - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); - if (code == 0) return pCur; - streamStateFreeCur(pCur); +// func cf +int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen); + return code; +} +int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); + return 0; +} +int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "func", key); + return 0; +} + +// session cf +int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { + int code = 0; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); + if (code == -1) { } - return NULL; + return code; } -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateGetKVByCur_rocksdb"); - if (!pCur) return -1; - SStateKey tkey; - SStateKey* pKtmp = &tkey; +int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { + qDebug("streamStateSessionGet_rocksdb"); + int code = 0; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); + SSessionKey resKey = *key; + void* tmp = NULL; + int32_t vLen = 0; + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + if (code == 0) { + if (pVLen != NULL) *pVLen = vLen; - if (rocksdb_iter_valid(pCur->iter)) { - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - stateKeyDecode((void*)pKtmp, keyStr); - if (pKtmp->opNum != pCur->number) { - return -1; + if (key->win.skey != resKey.win.skey) { + code = -1; + } else { + *key = resKey; + *pVal = taosMemoryCalloc(1, *pVLen); + memcpy(*pVal, tmp, *pVLen); } - size_t vlen = 0; - if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); - if (pVLen != NULL) *pVLen = vlen; - *pKey = pKtmp->key; - return 0; } - return -1; + streamStateFreeCur(pCur); + // impl later + return code; } -SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillGetCur_rocksdb"); + +int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) { + int code = 0; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey); + return code; +} +SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { + qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); - if (pCur == NULL) return NULL; + char buf[128] = {0}; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int len = stateSessionKeyEncode(&sKey, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; + } + + int32_t c = 0; + size_t klen; + const char* iKey = rocksdb_iter_key(pCur->iter, &klen); + SStateSessionKey curKey = {0}; + stateSessionKeyDecode(&curKey, (char*)iKey); + if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; + rocksdb_iter_prev(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + // qWarn("streamState failed to seek key prev + // %s", toString); + streamStateFreeCur(pCur); + return NULL; + } + return pCur; +} +SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { + qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); + pCur->number = pState->number; char buf[128] = {0}; - int len = winKeyEncode((void*)key, buf); + + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } + size_t klen; + const char* iKey = rocksdb_iter_key(pCur->iter, &klen); + SStateSessionKey curKey = {0}; + stateSessionKeyDecode(&curKey, (char*)iKey); + if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; - if (rocksdb_iter_valid(pCur->iter)) { - size_t kLen; - SWinKey curKey; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - winKeyDecode((void*)&curKey, keyStr); - if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { - return pCur; - } + rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; } - - streamStateFreeCur(pCur); - return NULL; + return pCur; } -int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateFillGetKVByCur_rocksdb"); - if (!pCur) { - return -1; + +SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { + qDebug("streamStateSessionSeekKeyNext_rocksdb"); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; } - SWinKey winKey; - if (rocksdb_iter_valid(pCur->iter)) { - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - winKeyDecode(&winKey, keyStr); + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt); + pCur->number = pState->number; - size_t vlen = 0; - const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - if (pVal != NULL) { - char* dst = taosMemoryCalloc(1, vlen); - memcpy(dst, valStr, vlen); - *pVal = dst; - } - if (pVLen != NULL) *pVLen = vlen; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - } else { - return -1; - } - *pKey = winKey; - return 0; -} -int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateGetGroupKVByCur_rocksdb"); - if (!pCur) { - return -1; + char buf[128] = {0}; + int len = stateSessionKeyEncode(&sKey, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; } - uint64_t groupId = pKey->groupId; - int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); - if (code == 0) { - if (pKey->groupId == groupId) { - return 0; - } + size_t klen; + const char* iKey = rocksdb_iter_key(pCur->iter, &klen); + SStateSessionKey curKey = {0}; + stateSessionKeyDecode(&curKey, (char*)iKey); + if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; + + rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; } - return -1; -} -int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { - qDebug("streamStateGetFirst_rocksdb"); - SWinKey tmp = {.ts = 0, .groupId = 0}; - streamStatePut_rocksdb(pState, &tmp, NULL, 0); - SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); - streamStateFreeCur(pCur); - streamStateDel_rocksdb(pState, &tmp); - return code; + return pCur; } int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { qDebug("streamStateSessionGetKVByCur_rocksdb"); @@ -1266,11 +1139,14 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* stateSessionKeyDecode((void*)&ktmp, (char*)curKey); SStateSessionKey* pKTmp = &ktmp; - const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); - if (pVal != NULL) { - *pVal = (char*)val; + const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); + char* val = NULL; + int32_t len = decodeValueFunc((void*)vval, vLen, NULL, &val); + if (len < 0) { + return -1; } - if (pVLen != NULL) *pVLen = vLen; + if (pVal != NULL) *pVal = (char*)val; + if (pVLen != NULL) *pVLen = len; if (pKTmp->opNum != pCur->number) { return -1; @@ -1281,39 +1157,81 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* *pKey = pKTmp->key; return 0; } +// fill cf +int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + int code = 0; -SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateSeekKeyNext_rocksdb"); + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); + return code; +} + +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); + return code; +} +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); + return code; +} + +SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) { - return NULL; - } - pCur->number = pState->number; + + if (pCur == NULL) return NULL; + pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; - int len = stateKeyEncode((void*)&sKey, buf); + char buf[128] = {0}; + int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } if (rocksdb_iter_valid(pCur->iter)) { - SStateKey curKey; - size_t kLen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { + size_t kLen; + SWinKey curKey; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + winKeyDecode((void*)&curKey, keyStr); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { return pCur; } - rocksdb_iter_next(pCur->iter); - return pCur; } + streamStateFreeCur(pCur); return NULL; } +int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateFillGetKVByCur_rocksdb"); + if (!pCur) { + return -1; + } + SWinKey winKey; + if (rocksdb_iter_valid(pCur->iter)) { + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + winKeyDecode(&winKey, keyStr); + + size_t vlen = 0; + const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); + if (pVal != NULL) { + char* dst = taosMemoryCalloc(1, vlen); + memcpy(dst, valStr, vlen); + *pVal = dst; + } + if (pVLen != NULL) *pVLen = vlen; + + } else { + return -1; + } + *pKey = winKey; + return 0; +} + SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -1376,20 +1294,6 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const streamStateFreeCur(pCur); return NULL; } -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { - qDebug("streamStateCurPrev_rocksdb"); - if (!pCur) return -1; - - rocksdb_iter_prev(pCur->iter); - return 0; -} -int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { - if (!pCur) { - return -1; - } - rocksdb_iter_next(pCur->iter); - return 0; -} int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -1446,36 +1350,6 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes return -1; } -int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { - qDebug("streamStateSessionGet_rocksdb"); - int code = 0; - SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); - SSessionKey resKey = *key; - void* tmp = NULL; - int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); - if (code == 0) { - if (pVLen != NULL) *pVLen = vLen; - - if (key->win.skey != resKey.win.skey) { - code = -1; - } else { - *key = resKey; - *pVal = taosMemoryCalloc(1, *pVLen); - memcpy(*pVal, tmp, *pVLen); - } - } - streamStateFreeCur(pCur); - // impl later - return code; -} - -int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) { - int code = 0; - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey); - return code; -} int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { qDebug("streamStateSessionAddIfNotExist_rocksdb"); @@ -1526,6 +1400,27 @@ _end: streamStateFreeCur(pCur); return res; } +int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { + qDebug("streamStateSessionClear_rocksdb"); + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); + + while (1) { + SSessionKey delKey = {0}; + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); + if (code == 0 && size > 0) { + memset(buf, 0, size); + streamStateSessionPut_rocksdb(pState, &delKey, buf, size); + } else { + break; + } + streamStateCurNext_rocksdb(pState, pCur); + } + streamStateFreeCur(pCur); + return -1; +} int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { qDebug("streamStateStateAddIfNotExist_rocksdb"); @@ -1583,27 +1478,7 @@ _end: return res; } -int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { - qDebug("streamStateSessionClear_rocksdb"); - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; - SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); - - while (1) { - SSessionKey delKey = {0}; - void* buf = NULL; - int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); - if (code == 0 && size > 0) { - memset(buf, 0, size); - streamStateSessionPut_rocksdb(pState, &delKey, buf, size); - } else { - break; - } - streamStateCurNext_rocksdb(pState, pCur); - } - streamStateFreeCur(pCur); - return -1; -} +// partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); @@ -1615,7 +1490,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); return code; } - +// parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); @@ -1628,7 +1503,136 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi return code; } -void streamStateDestroy_rocksdb(SStreamState* pState) { - // only close db - streamCleanBackend(pState); +// default cf +int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); + return code; +} +int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); + return code; +} +int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); + return code; +} + +int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { + int code = 0; + char* err = NULL; + + rocksdb_snapshot_t* snapshot = NULL; + rocksdb_readoptions_t* readopts = NULL; + rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + if (pIter == NULL) { + return -1; + } + + rocksdb_iter_seek(pIter, start, strlen(start)); + while (rocksdb_iter_valid(pIter)) { + const char* key = rocksdb_iter_key(pIter, NULL); + int32_t vlen = 0; + const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen); + char* val = NULL; + int32_t len = decodeValueFunc((void*)vval, vlen, NULL, NULL); + if (len < 0) { + rocksdb_iter_next(pIter); + continue; + } + + if (end != NULL && strcmp(key, end) > 0) { + break; + } + if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + int64_t checkPoint = 0; + if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + taosArrayPush(result, &checkPoint); + } + } else { + break; + } + rocksdb_iter_next(pIter); + } + rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + rocksdb_readoptions_destroy(readopts); + rocksdb_iter_destroy(pIter); + return code; +} +void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + return pCur; +} +int32_t streamDefaultIterValid_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + bool val = rocksdb_iter_valid(pCur->iter); + + return val ? 0 : -1; +} +void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { + SStreamStateCur* pCur = iter; + rocksdb_iter_seek(pCur->iter, key, strlen(key)); +} +void streamDefaultIterNext_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + rocksdb_iter_next(pCur->iter); +} +char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len); +} +char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + int32_t vlen = 0; + char* dst = NULL; + const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); + if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) { + return NULL; + } + return dst; +} +// batch func +void* streamStateCreateBatch() { + rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create(); + return pBatch; +} +int32_t streamStateGetBatchSize(void* pBatch) { + if (pBatch == NULL) return 0; + return rocksdb_writebatch_count(pBatch); +} + +void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } +void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } +int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen) { + int i = streamGetInit(cfName); + + if (i < 0) { + qError("streamState failed to put to cf name:%s", cfName); + return -1; + } + char buf[128] = {0}; + int32_t klen = ginitDict[i].enFunc((void*)key, buf); + + char* ttlV = NULL; + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); + rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; + rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); + taosMemoryFree(ttlV); + return 0; +} +int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { + char* err = NULL; + rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); + if (err != NULL) { + qError("streamState failed to write batch, err:%s", err); + taosMemoryFree(err); + return -1; + } + return 0; } \ No newline at end of file