diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 350ce0dab5a97903f4d2823eda173b21562468e5..2be0330efcec752a3b7322002bf93e9692ef5c94 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -27,21 +27,10 @@ extern "C" { #ifndef _STREAM_STATE_H_ #define _STREAM_STATE_H_ -typedef struct { - rocksdb_t* db; - rocksdb_writeoptions_t* writeOpts; - rocksdb_readoptions_t* readOpts; - rocksdb_options_t* dbOpt; - void* param; - void* env; - rocksdb_cache_t* cache; - TdThreadMutex mutex; - SList* list; -} SBackendHandle; -void* streamBackendInit(const char* path); -void streamBackendCleanup(void* arg); -SListNode* streamBackendAddCompare(void* backend, void* arg); -void streamBackendDelCompare(void* backend, void* arg); +// void* streamBackendInit(const char* path); +// void streamBackendCleanup(void* arg); +// SListNode* streamBackendAddCompare(void* backend, void* arg); +// void streamBackendDelCompare(void* backend, void* arg); typedef bool (*state_key_cmpr_fn)(void* pKey1, void* pKey2); typedef struct STdbState { @@ -55,8 +44,9 @@ typedef struct STdbState { void* param; void* env; SListNode* pComparNode; - SBackendHandle* pBackendHandle; + void* pBackendHandle; char idstr[48]; + void* compactFactory; TDB* db; TTB* pStateDb; @@ -168,11 +158,11 @@ typedef struct SStateSessionKey { int64_t opNum; } SStateSessionKey; -typedef struct streamValue { +typedef struct SStreamValue { int64_t unixTimestamp; int32_t len; - char data[0]; -} streamValue; + char* data; +} SStreamValue; int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 94caf18d553727fc3d2391f0e128c1ee4e392dae..5712d68561d5817cabaa0f589ea4896c223d2955 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -17,7 +17,9 @@ #define _STREAM_BACKEDN_ROCKSDB_H_ #include "executor.h" -#include "streamInc.h" + +#include "rocksdb/c.h" +// #include "streamInc.h" #include "streamState.h" #include "tcoding.h" #include "tcommon.h" @@ -28,58 +30,84 @@ typedef struct SCfComparator { rocksdb_comparator_t** comp; int32_t numOfComp; } SCfComparator; + +typedef struct { + rocksdb_t* db; + rocksdb_writeoptions_t* writeOpts; + rocksdb_readoptions_t* readOpts; + rocksdb_options_t* dbOpt; + void* param; + void* env; + rocksdb_cache_t* cache; + TdThreadMutex mutex; + rocksdb_compactionfilterfactory_t* filterFactory; + SList* list; +} SBackendHandle; + +void* streamBackendInit(const char* path); +void streamBackendCleanup(void* arg); +SListNode* streamBackendAddCompare(void* backend, void* arg); +void streamBackendDelCompare(void* backend, void* arg); + int streamStateOpenBackend(void* backend, SStreamState* pState); void streamStateCloseBackend(SStreamState* pState, bool remove); void streamStateDestroyCompar(void* arg); -// void streamStateRemoveBackend(SStreamState* pState); -int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); -int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key); +// state cf int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key); -int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); -int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key); int32_t streamStateClear_rocksdb(SStreamState* pState); +int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); +SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); + +// func cf +int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); +int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key); +// session cf int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); +int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); -int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey); -int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key); int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); +int32_t streamStateSessionClear_rocksdb(SStreamState* pState); int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); -int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); -int32_t streamStateSessionClear_rocksdb(SStreamState* pState); -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur); -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key); - -int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - -int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); -SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key); +// fill cf +int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key); -SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); -SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); -int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); -int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); + +// partag cf +int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); +int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); + +// parname cf int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); -void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); + +void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); void* streamStateCreateBatch(); int32_t streamStateGetBatchSize(void* pBatch); @@ -89,10 +117,10 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr void* val, int32_t vlen); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); +// default cf int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); - int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); void* streamDefaultIterCreate_rocksdb(SStreamState* pState); int32_t streamDefaultIterValid_rocksdb(void* iter); @@ -101,5 +129,13 @@ void streamDefaultIterNext_rocksdb(void* iter); char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len); char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len); +// batch func +void* streamStateCreateBatch(); +int32_t streamStateGetBatchSize(void* pBatch); +void streamStateClearBatch(void* pBatch); +void streamStateDestroyBatch(void* pBatch); +int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen); +int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c similarity index 73% rename from source/libs/stream/src/streamStateRocksdb.c rename to source/libs/stream/src/streamBackendRocksdb.c index 805b73780c4121ec16d096ee6d16c4c5493125dd..2fb60a9ec780442feb210b621413332b685bcd3b 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -13,10 +13,145 @@ * along with this program. If not, see . */ -#include "rocksdb/c.h" +// #include "streamStateRocksdb.h" #include "streamBackendRocksdb.h" #include "tcommon.h" +typedef struct SCompactFilteFactory { + void* status; +} SCompactFilteFactory; + +void destroyCompactFilteFactory(void* arg); +void destroyCompactFilte(void* arg); +const char* compactFilteFactoryName(void* arg); +const char* compactFilteName(void* arg); +unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed); +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx); + +void* streamBackendInit(const char* path); +void streamBackendCleanup(void* arg); +SListNode* streamBackendAddCompare(void* backend, void* arg); +void streamBackendDelCompare(void* backend, void* arg); + +typedef struct { + void* tableOpt; +} RocksdbCfParam; +const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; + +typedef int (*EncodeFunc)(void* key, char* buf); +typedef int (*DecodeFunc)(void* key, char* buf); +typedef int (*ToStringFunc)(void* key, char* buf); +typedef const char* (*CompareName)(void* statue); +typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +typedef void (*DestroyFunc)(void* state); +typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest); +typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest); + +const char* compareDefaultName(void* name); +const char* compareStateName(void* name); +const char* compareWinKeyName(void* name); +const char* compareSessionKeyName(void* name); +const char* compareFuncKeyName(void* name); +const char* compareParKeyName(void* name); +const char* comparePartagKeyName(void* name); + +void* streamBackendInit(const char* path) { + SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); + pHandle->list = tdListNew(sizeof(SCfComparator)); + taosThreadMutexInit(&pHandle->mutex, NULL); + + rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); + rocksdb_env_set_low_priority_background_threads(env, 4); + rocksdb_env_set_high_priority_background_threads(env, 2); + + rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); + + rocksdb_options_t* opts = rocksdb_options_create(); + rocksdb_options_set_env(opts, env); + rocksdb_options_set_create_if_missing(opts, 1); + rocksdb_options_set_create_missing_column_families(opts, 1); + rocksdb_options_set_write_buffer_size(opts, 128 << 20); + rocksdb_options_set_max_total_wal_size(opts, 128 << 20); + rocksdb_options_set_recycle_log_file_num(opts, 6); + rocksdb_options_set_max_write_buffer_number(opts, 3); + + pHandle->env = env; + pHandle->dbOpt = opts; + pHandle->cache = cache; + pHandle->filterFactory = rocksdb_compactionfilterfactory_create( + NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); + rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory); + + char* err = NULL; + pHandle->db = rocksdb_open(opts, path, &err); + if (err != NULL) { + qError("failed to open rocksdb, path:%s, reason:%s", path, err); + taosMemoryFreeClear(err); + goto _EXIT; + } + + return (void*)pHandle; +_EXIT: + rocksdb_options_destroy(opts); + rocksdb_cache_destroy(cache); + rocksdb_env_destroy(env); + taosThreadMutexDestroy(&pHandle->mutex); + rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); + tdListFree(pHandle->list); + free(pHandle); + return NULL; +} +void streamBackendCleanup(void* arg) { + SBackendHandle* pHandle = (SBackendHandle*)arg; + rocksdb_close(pHandle->db); + rocksdb_options_destroy(pHandle->dbOpt); + rocksdb_env_destroy(pHandle->env); + rocksdb_cache_destroy(pHandle->cache); + + taosThreadMutexDestroy(&pHandle->mutex); + SListNode* head = tdListPopHead(pHandle->list); + while (head != NULL) { + streamStateDestroyCompar(head->data); + taosMemoryFree(head); + head = tdListPopHead(pHandle->list); + } + // rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory); + tdListFree(pHandle->list); + + taosMemoryFree(pHandle); + + return; +} +SListNode* streamBackendAddCompare(void* backend, void* arg) { + SBackendHandle* pHandle = (SBackendHandle*)backend; + SListNode* node = NULL; + taosThreadMutexLock(&pHandle->mutex); + node = tdListAdd(pHandle->list, arg); + taosThreadMutexUnlock(&pHandle->mutex); + return node; +} +void streamBackendDelCompare(void* backend, void* arg) { + SBackendHandle* pHandle = (SBackendHandle*)backend; + SListNode* node = NULL; + taosThreadMutexLock(&pHandle->mutex); + node = tdListPopNode(pHandle->list, arg); + taosThreadMutexUnlock(&pHandle->mutex); + if (node) { + streamStateDestroyCompar(node->data); + taosMemoryFree(node); + } +} +void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); } +static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len); +int streamGetInit(const char* funcName); + +// |key|-----value------| +// |key|ttl|len|userData| + +static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, + rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); + int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { int ret = memcmp(aBuf, bBuf, aLen); if (ret == 0) { @@ -30,6 +165,16 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, return ret; } } +int streamStateValueIsStale(char* vv) { + int64_t ts = 0; + taosDecodeFixedI64(vv, &ts); + return ts < taosGetTimestampSec() ? 1 : 0; +} +int iterValueIsStale(rocksdb_iterator_t* iter) { + size_t len; + char* v = (char*)rocksdb_iter_value(iter, &len); + return streamStateValueIsStale(v); +} int defaultKeyEncode(void* k, char* buf) { int len = strlen((char*)k); memcpy(buf, (char*)k, len); @@ -289,95 +434,174 @@ int parKeyToString(void* k, char* buf) { return n; } int stremaValueEncode(void* k, char* buf) { - int len = 0; - streamValue* key = k; + int len = 0; + SStreamValue* key = k; len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key->len); len += taosEncodeBinary((void**)&buf, key->data, key->len); return len; } int streamValueDecode(void* k, char* buf) { - streamValue* key = k; - char* p = buf; + SStreamValue* key = k; + char* p = buf; p = taosDecodeFixedI64(p, &key->unixTimestamp); p = taosDecodeFixedI32(p, &key->len); p = taosDecodeBinary(p, (void**)&key->data, key->len); return p - buf; } int32_t streamValueToString(void* k, char* buf) { - streamValue* key = k; - int n = 0; + SStreamValue* key = k; + int n = 0; n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp); n += sprintf(buf + n, "len:%d,", key->len); n += sprintf(buf + n, "data:%s]", key->data); return n; } -/*1: stale, 0: no stale*/ +/*1: stale, 0: no stale*/ int32_t streaValueIsStale(void* k, int64_t ts) { - streamValue* key = k; + SStreamValue* key = k; if (key->unixTimestamp < ts) { return 1; } return 0; } -typedef struct { - void* tableOpt; -} RocksdbCfParam; -const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; - -typedef int (*EncodeFunc)(void* key, char* buf); -typedef int (*DecodeFunc)(void* key, char* buf); -typedef int (*ToStringFunc)(void* key, char* buf); -typedef const char* (*CompareName)(void* statue); -typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); -typedef void (*DestroyFunc)(void* state); - -const char* compareDefaultName(void* name); -const char* compareStateName(void* name); -const char* compareWinKeyName(void* name); -const char* compareSessionKeyName(void* name); -const char* compareFuncKeyName(void* name); -const char* compareParKeyName(void* name); -const char* comparePartagKeyName(void* name); - -void destroyFunc(void* stata) { return; } +void destroyFunc(void* arg) { + (void)arg; + return; +} typedef struct { - const char* key; - int32_t len; - int idx; - BackendCmpFunc cmpFunc; - EncodeFunc enFunc; - DecodeFunc deFunc; - ToStringFunc toStrFunc; - CompareName cmpName; - DestroyFunc detroyFunc; + const char* key; + int32_t len; + int idx; + BackendCmpFunc cmpFunc; + EncodeFunc enFunc; + DecodeFunc deFunc; + ToStringFunc toStrFunc; + CompareName cmpName; + DestroyFunc detroyFunc; + EncodeValueFunc enValueFunc; + DecodeValueFunc deValueFunc; } SCfInit; #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUBFIX) sprintf(name, "%s_%s", idstr, (SUBFIX)); +int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { + SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; + + char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); + char* buf = p; + int32_t len = 0; + len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); + len += taosEncodeFixedI32((void**)&buf, key.len); + len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + *dest = p; + return len; +} +/* + * ret >= 0 : found valid value + * ret < 0 : error or timeout + */ +int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { + SStreamValue key = {0}; + char* p = value; + int64_t now = taosGetTimestampMs(); + p = taosDecodeFixedI64(p, &key.unixTimestamp); + p = taosDecodeFixedI32(p, &key.len); + p = taosDecodeBinary(p, (void**)&(key.data), key.len); + if (key.unixTimestamp != 0 && key.unixTimestamp < now) { + taosMemoryFree(key.data); + *dest = NULL; + return -1; + } + if (ttl != NULL) { + *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now; + } + if (dest != NULL) { + *dest = key.data; + } else { + taosMemoryFree(key.data); + } + return key.len; +} SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, - destroyFunc}, - {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc}, - {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc}, + destroyFunc, encodeValueFunc, decodeValueFunc}, + {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, - compareSessionKeyName, destroyFunc}, - {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc}, - {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc}, - {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc}, + compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, + {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, }; -const char* compareDefaultName(void* name) { return ginitDict[0].key; } -const char* compareStateName(void* name) { return ginitDict[1].key; } -const char* compareWinKeyName(void* name) { return ginitDict[2].key; } -const char* compareSessionKeyName(void* name) { return ginitDict[3].key; } -const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } -const char* compareParKeyName(void* name) { return ginitDict[5].key; } -const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } +const char* compareDefaultName(void* arg) { + (void)arg; + return ginitDict[0].key; +} +const char* compareStateName(void* arg) { + (void)arg; + return ginitDict[1].key; +} +const char* compareWinKeyName(void* arg) { + (void)arg; + return ginitDict[2].key; +} +const char* compareSessionKeyName(void* arg) { + (void)arg; + return ginitDict[3].key; +} +const char* compareFuncKeyName(void* arg) { + (void)arg; + return ginitDict[4].key; +} +const char* compareParKeyName(void* arg) { + (void)arg; + return ginitDict[5].key; +} +const char* comparePartagKeyName(void* arg) { + (void)arg; + return ginitDict[6].key; +} + +void destroyCompactFilteFactory(void* arg) { + if (arg == NULL) return; +} +const char* compactFilteFactoryName(void* arg) { + SCompactFilteFactory* state = arg; + return "stream_compact_filter"; +} + +void destroyCompactFilte(void* arg) { (void)arg; } +unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed) { + int64_t unixTime = taosGetTimestampMs(); + SStreamValue value; + memset(&value, 0, sizeof(value)); + streamValueDecode(&value, (char*)val); + taosMemoryFree(value.data); + if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) { + return 1; + } + return 0; +} +const char* compactFilteName(void* arg) { return "stream_filte"; } + +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { + SCompactFilteFactory* state = arg; + rocksdb_compactionfilter_t* filter = + rocksdb_compactionfilter_create(NULL, destroyCompactFilte, compactFilte, compactFilteName); + return filter; +} int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open backend, %p, %d-%d", pState, pState->streamId, pState->taskId); @@ -390,7 +614,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { RocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { - cfOpt[i] = rocksdb_options_create(); + cfOpt[i] = rocksdb_options_create_copy(handle->dbOpt); // refactor later rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create(); rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache); @@ -436,6 +660,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); return 0; } + void streamStateCloseBackend(SStreamState* pState, bool remove) { char* status[] = {"close", "drop"}; qInfo("start to %s backend, %p, %d-%d", status[remove == false ? 0 : 1], pState, pState->streamId, pState->taskId); @@ -494,6 +719,7 @@ void streamStateDestroyCompar(void* arg) { } taosMemoryFree(comp->comp); } + int streamGetInit(const char* funcName) { size_t len = strlen(funcName); for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { @@ -520,83 +746,92 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); } - rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); *readOpt = rOpt; rocksdb_readoptions_set_snapshot(rOpt, *snapshot); - rocksdb_readoptions_set_fill_cache(rOpt, 0); return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to write to %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ - err); \ - code = -1; \ - } else { \ - qDebug("streamState str:%s succ to write to %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ - vLen); \ - } \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + code = -1; \ + } else { \ + qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL) { \ - qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ - funcname); \ - if (err != NULL) taosMemoryFree(err); \ - code = -1; \ - } else { \ - if (pVal != NULL) *pVal = val; \ - if (vLen != NULL) *vLen = len; \ - } \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ - err); \ - code = -1; \ - } else { \ - if (code == 0) \ - qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL) { \ + qDebug("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ + if (err != NULL) taosMemoryFree(err); \ + code = -1; \ + } else { \ + char * p = NULL, *end = NULL; \ + int32_t len = ginitDict[i].deValueFunc(val, len, NULL, &p); \ + if (len < 0) { \ + qDebug("streamState str: %s failed to read from %s, err: %s, timeout", toString, funcname, err); \ + code = -1; \ + } else { \ + qDebug("streamState str: %s succ to read from %s, valLen:%d", toString, funcname, len); \ + } \ + if (pVal != NULL) { \ + *pVal = p; \ + } else { \ + taosMemoryFree(p); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = len; \ + } \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qDebug("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ + code = -1; \ + } else { \ + if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ + } \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -627,210 +862,256 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa } \ } while (0); -int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { +// state cf +int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, value, vLen); + + SStateKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen); return code; } -int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); - return 0; -} -int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "func", key); - return 0; +int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + SStateKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); + return code; } - -int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - int code = 0; - +int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { + int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, value, vLen); + STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey); return code; } -int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { +int32_t streamStateClear_rocksdb(SStreamState* pState) { + qDebug("streamStateClear_rocksdb"); + + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + + int sLen = stateKeyEncode(&sKey, sKeyStr); + int eLen = stateKeyEncode(&eKey, eKeyStr); + + char toStringStart[128] = {0}; + char toStringEnd[128] = {0}; + if (qDebugFlag & DEBUG_TRACE) { + stateKeyToString(&sKey, toStringStart); + stateKeyToString(&eKey, toStringEnd); + } + char* err = NULL; - rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); + rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], + sKeyStr, sLen, eKeyStr, eLen, &err); + // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, + // eLen); if (err != NULL) { - qError("streamState failed to write batch, err:%s", err); + qWarn( + "failed to delete range cf(state) err: %s, " + "start: %s, end:%s", + err, toStringStart, toStringEnd); taosMemoryFree(err); - return -1; } - return 0; -} - -void* streamStateCreateBatch() { - rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create(); - return pBatch; -} -int32_t streamStateGetBatchSize(void* pBatch) { - if (pBatch == NULL) return -1; - return rocksdb_writebatch_count((rocksdb_writebatch_t*)pBatch); + return 0; } - -void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } -void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen) { - int i = streamGetInit(cfName); - - if (i < 0) { - qError("streamState failed to put to cf name:%s", cfName); +int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + if (!pCur) { return -1; } - char buf[128] = {0}; - int32_t klen = ginitDict[i].enFunc((void*)key, buf); - - rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; - rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen); + rocksdb_iter_next(pCur->iter); return 0; } - -int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); - return code; -} -int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); - return code; -} -int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); +int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { + qDebug("streamStateGetFirst_rocksdb"); + SWinKey tmp = {.ts = 0, .groupId = 0}; + streamStatePut_rocksdb(pState, &tmp, NULL, 0); + SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); + int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + streamStateFreeCur(pCur); + streamStateDel_rocksdb(pState, &tmp); return code; } -void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); +int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateGetGroupKVByCur_rocksdb"); + if (!pCur) { + return -1; + } + uint64_t groupId = pKey->groupId; - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); - return pCur; + int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + if (code == 0) { + if (pKey->groupId == groupId) { + return 0; + } + } + return -1; } -int32_t streamDefaultIterValid_rocksdb(void* iter) { - SStreamStateCur* pCur = iter; - bool val = rocksdb_iter_valid(pCur->iter); - - return val ? 0 : -1; +int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + qDebug("streamStateAddIfNotExist_rocksdb"); + int32_t size = *pVLen; + if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { + return 0; + } + *pVal = taosMemoryMalloc(size); + memset(*pVal, 0, size); + return 0; } -void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { - SStreamStateCur* pCur = iter; - rocksdb_iter_seek(pCur->iter, key, strlen(key)); +int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + qDebug("streamStateCurPrev_rocksdb"); + if (!pCur) return -1; + + rocksdb_iter_prev(pCur->iter); + return 0; } -void streamDefaultIterNext_rocksdb(void* iter) { - SStreamStateCur* pCur = iter; - rocksdb_iter_next(pCur->iter); -} -char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { - SStreamStateCur* pCur = iter; - return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len); +int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateGetKVByCur_rocksdb"); + if (!pCur) return -1; + SStateKey tkey; + SStateKey* pKtmp = &tkey; + + if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + stateKeyDecode((void*)pKtmp, keyStr); + if (pKtmp->opNum != pCur->number) { + return -1; + } + size_t vlen = 0; + if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); + if (pVLen != NULL) *pVLen = vlen; + *pKey = pKtmp->key; + return 0; + } + return -1; } -char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { - SStreamStateCur* pCur = iter; - return (char*)rocksdb_iter_value(pCur->iter, (size_t*)len); +SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { + qDebug("streamStateGetAndCheckCur_rocksdb"); + SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); + if (pCur) { + int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); + if (code == 0) return pCur; + streamStateFreeCur(pCur); + } + return NULL; } -int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { - int code = 0; - char* err = NULL; - rocksdb_snapshot_t* snapshot = NULL; - rocksdb_readoptions_t* readopts = NULL; - rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); - if (pIter == NULL) { - return -1; +SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateSeekKeyNext_rocksdb"); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + pCur->number = pState->number; + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + + SStateKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + int len = stateKeyEncode((void*)&sKey, buf); + if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + streamStateFreeCur(pCur); + return NULL; + } + // skip ttl expired data + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_next(pCur->iter); } - rocksdb_iter_seek(pIter, start, strlen(start)); - while (rocksdb_iter_valid(pIter)) { - const char* key = rocksdb_iter_key(pIter, NULL); - if (end != NULL && strcmp(key, end) > 0) { - break; - } - if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { - int64_t checkPoint = 0; - if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { - taosArrayPush(result, &checkPoint); - } - } else { - break; + if (rocksdb_iter_valid(pCur->iter)) { + SStateKey curKey; + size_t kLen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { + return pCur; } - rocksdb_iter_next(pIter); + rocksdb_iter_next(pCur->iter); + return pCur; } - rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); - rocksdb_readoptions_destroy(readopts); - rocksdb_iter_destroy(pIter); - return code; + streamStateFreeCur(pCur); + return NULL; } -int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); - return code; -} -// todo refactor -int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { - int code = 0; - SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_DEL_ROCKSDB(pState, "state", &sKey); - return code; -} +SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateGetCur_rocksdb"); + int32_t code = 0; + const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; + STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); -// todo refactor -int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { - int code = 0; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); - STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); - return code; -} + rocksdb_iter_prev(pCur->iter); + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_prev(pCur->iter); + } -int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); - return code; -} -int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { - int code = 0; - STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); - return code; + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + pCur = NULL; + } + STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); + return pCur; } -// todo refactor -int32_t streamStateClear_rocksdb(SStreamState* pState) { - qDebug("streamStateClear_rocksdb"); +SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateGetCur_rocksdb"); + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; + if (pCur == NULL) return NULL; + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); - int sLen = stateKeyEncode(&sKey, sKeyStr); - int eLen = stateKeyEncode(&eKey, eKeyStr); + SStateKey sKey = {.key = *key, .opNum = pState->number}; + char buf[128] = {0}; + int len = stateKeyEncode((void*)&sKey, buf); - char toStringStart[128] = {0}; - char toStringEnd[128] = {0}; - if (qDebugFlag & DEBUG_TRACE) { - stateKeyToString(&sKey, toStringStart); - stateKeyToString(&eKey, toStringEnd); - } + rocksdb_iter_seek(pCur->iter, buf, len); - char* err = NULL; - rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], - sKeyStr, sLen, eKeyStr, eLen, &err); - // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen); - if (err != NULL) { - qWarn("failed to delete range cf(state) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); - taosMemoryFree(err); + if (rocksdb_iter_valid(pCur->iter) && !iterValueIsStale(pCur->iter)) { + size_t vlen; + char* val = (char*)rocksdb_iter_value(pCur->iter, &vlen); + if (!streamStateValueIsStale(val)) { + SStateKey curKey; + size_t kLen = 0; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + stateKeyDecode((void*)&curKey, keyStr); + + if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { + pCur->number = pState->number; + return pCur; + } + } } + streamStateFreeCur(pCur); + return NULL; +} +// func cf +int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen); + return code; +} +int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); + return 0; +} +int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "func", key); return 0; } +// session cf int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; @@ -839,6 +1120,36 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k } return code; } +int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { + qDebug("streamStateSessionGet_rocksdb"); + int code = 0; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); + SSessionKey resKey = *key; + void* tmp = NULL; + int32_t vLen = 0; + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); + if (code == 0) { + if (pVLen != NULL) *pVLen = vLen; + + if (key->win.skey != resKey.win.skey) { + code = -1; + } else { + *key = resKey; + *pVal = taosMemoryCalloc(1, *pVLen); + memcpy(*pVal, tmp, *pVLen); + } + } + streamStateFreeCur(pCur); + // impl later + return code; +} + +int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) { + int code = 0; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey); + return code; +} SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -853,6 +1164,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int len = stateSessionKeyEncode(&sKey, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { + } + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_prev(pCur->iter); + + if (!rocksdb_iter_valid(pCur->iter)) { streamStateFreeCur(pCur); return NULL; } @@ -866,7 +1181,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { - // qWarn("streamState failed to seek key prev %s", toString); + // qWarn("streamState failed to seek key prev + // %s", toString); streamStateFreeCur(pCur); return NULL; } @@ -890,6 +1206,10 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta streamStateFreeCur(pCur); return NULL; } + if (iterValueIsStale(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -922,7 +1242,11 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con streamStateFreeCur(pCur); return NULL; } - + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); SStateSessionKey curKey = {0}; @@ -936,174 +1260,6 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con } return pCur; } - -int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - qDebug("streamStateAddIfNotExist_rocksdb"); - int32_t size = *pVLen; - if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { - return 0; - } - *pVal = taosMemoryMalloc(size); - memset(*pVal, 0, size); - return 0; -} -SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateGetCur_rocksdb"); - int32_t code = 0; - const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; - STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); - char buf[128] = {0}; - int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); - - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) return NULL; - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); - rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); - rocksdb_iter_prev(pCur->iter); - - STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey); - return pCur; -} -SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - - if (pCur == NULL) return NULL; - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); - - SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; - int len = stateKeyEncode((void*)&sKey, buf); - - rocksdb_iter_seek(pCur->iter, buf, len); - if (rocksdb_iter_valid(pCur->iter)) { - SStateKey curKey; - size_t kLen = 0; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); - - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { - pCur->number = pState->number; - return pCur; - } - } - streamStateFreeCur(pCur); - return NULL; -} - -SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { - qDebug("streamStateGetAndCheckCur_rocksdb"); - SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); - if (pCur) { - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); - if (code == 0) return pCur; - streamStateFreeCur(pCur); - } - return NULL; -} -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateGetKVByCur_rocksdb"); - if (!pCur) return -1; - SStateKey tkey; - SStateKey* pKtmp = &tkey; - - if (rocksdb_iter_valid(pCur->iter)) { - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - stateKeyDecode((void*)pKtmp, keyStr); - if (pKtmp->opNum != pCur->number) { - return -1; - } - size_t vlen = 0; - if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); - if (pVLen != NULL) *pVLen = vlen; - *pKey = pKtmp->key; - return 0; - } - return -1; -} -SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - - if (pCur == NULL) return NULL; - - pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - - char buf[128] = {0}; - int len = winKeyEncode((void*)key, buf); - if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { - streamStateFreeCur(pCur); - return NULL; - } - - if (rocksdb_iter_valid(pCur->iter)) { - size_t kLen; - SWinKey curKey; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - winKeyDecode((void*)&curKey, keyStr); - if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { - return pCur; - } - } - - streamStateFreeCur(pCur); - return NULL; -} -int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateFillGetKVByCur_rocksdb"); - if (!pCur) { - return -1; - } - SWinKey winKey; - if (rocksdb_iter_valid(pCur->iter)) { - size_t tlen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); - winKeyDecode(&winKey, keyStr); - - size_t vlen = 0; - const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - if (pVal != NULL) { - char* dst = taosMemoryCalloc(1, vlen); - memcpy(dst, valStr, vlen); - *pVal = dst; - } - if (pVLen != NULL) *pVLen = vlen; - - } else { - return -1; - } - *pKey = winKey; - return 0; -} -int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qDebug("streamStateGetGroupKVByCur_rocksdb"); - if (!pCur) { - return -1; - } - uint64_t groupId = pKey->groupId; - - int32_t code = streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); - if (code == 0) { - if (pKey->groupId == groupId) { - return 0; - } - } - return -1; -} -int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { - qDebug("streamStateGetFirst_rocksdb"); - SWinKey tmp = {.ts = 0, .groupId = 0}; - streamStatePut_rocksdb(pState, &tmp, NULL, 0); - SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - int32_t code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); - streamStateFreeCur(pCur); - streamStateDel_rocksdb(pState, &tmp); - return code; -} int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { qDebug("streamStateSessionGetKVByCur_rocksdb"); if (!pCur) { @@ -1112,18 +1268,21 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* SStateSessionKey ktmp = {0}; size_t kLen = 0, vLen = 0; - if (!rocksdb_iter_valid(pCur->iter)) { + if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { return -1; } const char* curKey = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); stateSessionKeyDecode((void*)&ktmp, (char*)curKey); SStateSessionKey* pKTmp = &ktmp; - const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); - if (pVal != NULL) { - *pVal = (char*)val; + const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); + char* val = NULL; + int32_t len = decodeValueFunc((void*)vval, vLen, NULL, &val); + if (len < 0) { + return -1; } - if (pVLen != NULL) *pVLen = vLen; + if (pVal != NULL) *pVal = (char*)val; + if (pVLen != NULL) *pVLen = len; if (pKTmp->opNum != pCur->number) { return -1; @@ -1134,39 +1293,86 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* *pKey = pKTmp->key; return 0; } +// fill cf +int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + int code = 0; + + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); + return code; +} + +int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); + return code; +} +int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "fill", key); + return code; +} -SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateSeekKeyNext_rocksdb"); +SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + qDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - if (pCur == NULL) { - return NULL; - } - pCur->number = pState->number; + + if (pCur == NULL) return NULL; + pCur->db = pState->pTdbState->rocksdb; - pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt); + pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt); - SStateKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; - int len = stateKeyEncode((void*)&sKey, buf); + char buf[128] = {0}; + int len = winKeyEncode((void*)key, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) { streamStateFreeCur(pCur); return NULL; } + if (iterValueIsStale(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } if (rocksdb_iter_valid(pCur->iter)) { - SStateKey curKey; - size_t kLen; - char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); - stateKeyDecode((void*)&curKey, keyStr); - if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) > 0) { + size_t kLen; + SWinKey curKey; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); + winKeyDecode((void*)&curKey, keyStr); + if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) { return pCur; } - rocksdb_iter_next(pCur->iter); - return pCur; } + streamStateFreeCur(pCur); return NULL; } +int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qDebug("streamStateFillGetKVByCur_rocksdb"); + if (!pCur) { + return -1; + } + SWinKey winKey; + if (!rocksdb_iter_valid(pCur->iter) || iterValueIsStale(pCur->iter)) { + return -1; + } + size_t tlen; + char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); + winKeyDecode(&winKey, keyStr); + + size_t vlen = 0; + const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); + char* dst = NULL; + int32_t len = decodeValueFunc((void*)valStr, vlen, NULL, &dst); + if (len < 0) { + return -1; + } + + if (pVal != NULL) *pVal = (char*)dst; + if (pVLen != NULL) *pVLen = vlen; + + *pKey = winKey; + return 0; +} + SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -1183,8 +1389,12 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const streamStateFreeCur(pCur); return NULL; } + // skip stale data + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_next(pCur->iter); + } - { + if (rocksdb_iter_valid(pCur->iter)) { SWinKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); @@ -1214,8 +1424,11 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const streamStateFreeCur(pCur); return NULL; } + while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { + rocksdb_iter_prev(pCur->iter); + } - { + if (rocksdb_iter_valid(pCur->iter)) { SWinKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); @@ -1226,23 +1439,10 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const rocksdb_iter_prev(pCur->iter); return pCur; } + streamStateFreeCur(pCur); return NULL; } -int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { - qDebug("streamStateCurPrev_rocksdb"); - if (!pCur) return -1; - - rocksdb_iter_prev(pCur->iter); - return 0; -} -int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { - if (!pCur) { - return -1; - } - rocksdb_iter_next(pCur->iter); - return 0; -} int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); @@ -1299,36 +1499,6 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes return -1; } -int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { - qDebug("streamStateSessionGet_rocksdb"); - int code = 0; - SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); - SSessionKey resKey = *key; - void* tmp = NULL; - int32_t vLen = 0; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); - if (code == 0) { - if (pVLen != NULL) *pVLen = vLen; - - if (key->win.skey != resKey.win.skey) { - code = -1; - } else { - *key = resKey; - *pVal = taosMemoryCalloc(1, *pVLen); - memcpy(*pVal, tmp, *pVLen); - } - } - streamStateFreeCur(pCur); - // impl later - return code; -} - -int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* key) { - int code = 0; - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_DEL_ROCKSDB(pState, "sess", &sKey); - return code; -} int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { qDebug("streamStateSessionAddIfNotExist_rocksdb"); @@ -1379,6 +1549,27 @@ _end: streamStateFreeCur(pCur); return res; } +int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { + qDebug("streamStateSessionClear_rocksdb"); + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); + + while (1) { + SSessionKey delKey = {0}; + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); + if (code == 0 && size > 0) { + memset(buf, 0, size); + streamStateSessionPut_rocksdb(pState, &delKey, buf, size); + } else { + break; + } + streamStateCurNext_rocksdb(pState, pCur); + } + streamStateFreeCur(pCur); + return -1; +} int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { qDebug("streamStateStateAddIfNotExist_rocksdb"); @@ -1436,27 +1627,7 @@ _end: return res; } -int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { - qDebug("streamStateSessionClear_rocksdb"); - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; - SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); - - while (1) { - SSessionKey delKey = {0}; - void* buf = NULL; - int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, &delKey, &buf, &size); - if (code == 0 && size > 0) { - memset(buf, 0, size); - streamStateSessionPut_rocksdb(pState, &delKey, buf, size); - } else { - break; - } - streamStateCurNext_rocksdb(pState, pCur); - } - streamStateFreeCur(pCur); - return -1; -} +// partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); @@ -1468,10 +1639,10 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); return code; } - +// parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, tbname, TSDB_TABLE_NAME_LEN); + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); return code; } int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { @@ -1481,7 +1652,135 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi return code; } -void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { - // only close db - streamStateCloseBackend(pState, remove); +int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { + int code = 0; + STREAM_STATE_PUT_ROCKSDB(pState, "default", &key, pVal, pVLen); + return code; +} +int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { + int code = 0; + STREAM_STATE_GET_ROCKSDB(pState, "default", &key, pVal, pVLen); + return code; +} +int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "default", &key); + return code; +} + +int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { + int code = 0; + char* err = NULL; + + rocksdb_snapshot_t* snapshot = NULL; + rocksdb_readoptions_t* readopts = NULL; + rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + if (pIter == NULL) { + return -1; + } + + rocksdb_iter_seek(pIter, start, strlen(start)); + while (rocksdb_iter_valid(pIter)) { + const char* key = rocksdb_iter_key(pIter, NULL); + int32_t vlen = 0; + const char* vval = rocksdb_iter_value(pIter, (size_t*)&vlen); + char* val = NULL; + int32_t len = decodeValueFunc((void*)vval, vlen, NULL, NULL); + if (len < 0) { + rocksdb_iter_next(pIter); + continue; + } + + if (end != NULL && strcmp(key, end) > 0) { + break; + } + if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + int64_t checkPoint = 0; + if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + taosArrayPush(result, &checkPoint); + } + } else { + break; + } + rocksdb_iter_next(pIter); + } + rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + rocksdb_readoptions_destroy(readopts); + rocksdb_iter_destroy(pIter); + return code; +} +void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + + pCur->db = pState->pTdbState->rocksdb; + pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt); + return pCur; +} +int32_t streamDefaultIterValid_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + bool val = rocksdb_iter_valid(pCur->iter); + + return val ? 1 : 0; +} +void streamDefaultIterSeek_rocksdb(void* iter, const char* key) { + SStreamStateCur* pCur = iter; + rocksdb_iter_seek(pCur->iter, key, strlen(key)); +} +void streamDefaultIterNext_rocksdb(void* iter) { + SStreamStateCur* pCur = iter; + rocksdb_iter_next(pCur->iter); +} +char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + return (char*)rocksdb_iter_key(pCur->iter, (size_t*)len); +} +char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) { + SStreamStateCur* pCur = iter; + int32_t vlen = 0; + char* dst = NULL; + const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen); + if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) { + return NULL; + } + return dst; +} +// batch func +void* streamStateCreateBatch() { + rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create(); + return pBatch; +} +int32_t streamStateGetBatchSize(void* pBatch) { + if (pBatch == NULL) return 0; + return rocksdb_writebatch_count(pBatch); +} + +void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } +void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } +int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, + void* val, int32_t vlen) { + int i = streamGetInit(cfName); + + if (i < 0) { + qError("streamState failed to put to cf name:%s", cfName); + return -1; + } + char buf[128] = {0}; + int32_t klen = ginitDict[i].enFunc((void*)key, buf); + + char* ttlV = NULL; + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); + rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; + rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); + taosMemoryFree(ttlV); + return 0; +} +int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { + char* err = NULL; + rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); + if (err != NULL) { + qError("streamState failed to write batch, err:%s", err); + taosMemoryFree(err); + return -1; + } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a3f71588179cdbde1ce7e876618ff38321277a68..0f33ef6a269c123e1236d87ce7b348e0d4f5f7c7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,6 +14,7 @@ */ #include "executor.h" +#include "streamBackendRocksdb.h" #include "streamInc.h" #include "ttimer.h" diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 03ab8d5e307cfefa86b05aca7a71d49c0a83d370..86056f94bb6bd6c567b61a02f46eff26ca0fb32e 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -26,88 +26,6 @@ #define MAX_TABLE_NAME_NUM 100000 -void* streamBackendInit(const char* path) { - SBackendHandle* pHandle = calloc(1, sizeof(SBackendHandle)); - pHandle->list = tdListNew(sizeof(SCfComparator)); - taosThreadMutexInit(&pHandle->mutex, NULL); - - rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); - rocksdb_env_set_low_priority_background_threads(env, 4); - rocksdb_env_set_high_priority_background_threads(env, 2); - - rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20); - - rocksdb_options_t* opts = rocksdb_options_create(); - rocksdb_options_set_env(opts, env); - rocksdb_options_set_create_if_missing(opts, 1); - rocksdb_options_set_create_missing_column_families(opts, 1); - rocksdb_options_set_write_buffer_size(opts, 128 << 20); - rocksdb_options_set_max_total_wal_size(opts, 128 << 20); - rocksdb_options_set_recycle_log_file_num(opts, 6); - rocksdb_options_set_max_write_buffer_number(opts, 3); - - pHandle->env = env; - pHandle->dbOpt = opts; - pHandle->cache = cache; - - char* err = NULL; - pHandle->db = rocksdb_open(opts, path, &err); - if (err != NULL) { - qError("failed to open rocksdb, path:%s, reason:%s", path, err); - taosMemoryFreeClear(err); - goto _EXIT; - } - - return pHandle; -_EXIT: - rocksdb_options_destroy(opts); - rocksdb_cache_destroy(cache); - rocksdb_env_destroy(env); - taosThreadMutexDestroy(&pHandle->mutex); - tdListFree(pHandle->list); - free(pHandle); - return NULL; -} -void streamBackendCleanup(void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)arg; - rocksdb_close(pHandle->db); - rocksdb_options_destroy(pHandle->dbOpt); - rocksdb_env_destroy(pHandle->env); - rocksdb_cache_destroy(pHandle->cache); - - taosThreadMutexDestroy(&pHandle->mutex); - SListNode* head = tdListPopHead(pHandle->list); - while (head != NULL) { - streamStateDestroyCompar(head->data); - taosMemoryFree(head); - head = tdListPopHead(pHandle->list); - } - tdListFree(pHandle->list); - - taosMemoryFree(pHandle); - - return; -} -SListNode* streamBackendAddCompare(void* backend, void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)backend; - SListNode* node = NULL; - taosThreadMutexLock(&pHandle->mutex); - node = tdListAdd(pHandle->list, arg); - taosThreadMutexUnlock(&pHandle->mutex); - return node; -} -void streamBackendDelCompare(void* backend, void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)backend; - SListNode* node = NULL; - taosThreadMutexLock(&pHandle->mutex); - node = tdListPopNode(pHandle->list, arg); - taosThreadMutexUnlock(&pHandle->mutex); - if (node) { - streamStateDestroyCompar(node->data); - taosMemoryFree(node); - } -} - int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2) { if (pWin1->groupId > pWin2->groupId) { return 1;