diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 9b80ce27860a639a8a406ee886004f586940afec..d7c9940b745afc4edc3308f7354d9afe7ab38109 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -40,6 +40,7 @@ typedef struct STdbState { struct SStreamTask* pOwner; void* param; void* env; + void* compactFactory; TDB* db; TTB* pStateDb; @@ -149,11 +150,11 @@ typedef struct SStateSessionKey { int64_t opNum; } SStateSessionKey; -typedef struct streamValue { +typedef struct SStreamValue { int64_t unixTimestamp; int32_t len; - char data[0]; -} streamValue; + char* data; +} SStreamValue; int sessionRangeKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); int sessionWinKeyCmpr(const SSessionKey* pWin1, const SSessionKey* pWin2); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 8e4400e6448ebbd5b04550e40b992baf70fcd533..efa0e12d10a5fb076e3e909fbbe6e718ca8b6006 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -289,24 +289,24 @@ int parKeyToString(void* k, char* buf) { return n; } int stremaValueEncode(void* k, char* buf) { - int len = 0; - streamValue* key = k; + int len = 0; + SStreamValue* key = k; len += taosEncodeFixedI64((void**)&buf, key->unixTimestamp); len += taosEncodeFixedI32((void**)&buf, key->len); len += taosEncodeBinary((void**)&buf, key->data, key->len); return len; } int streamValueDecode(void* k, char* buf) { - streamValue* key = k; - char* p = buf; + SStreamValue* key = k; + char* p = buf; p = taosDecodeFixedI64(p, &key->unixTimestamp); p = taosDecodeFixedI32(p, &key->len); p = taosDecodeBinary(p, (void**)&key->data, key->len); return p - buf; } int32_t streamValueToString(void* k, char* buf) { - streamValue* key = k; - int n = 0; + SStreamValue* key = k; + int n = 0; n += sprintf(buf + n, "[unixTimestamp:%" PRIi64 ",", key->unixTimestamp); n += sprintf(buf + n, "len:%d,", key->len); n += sprintf(buf + n, "data:%s]", key->data); @@ -315,7 +315,7 @@ int32_t streamValueToString(void* k, char* buf) { /*1: stale, 0: no stale*/ int32_t streaValueIsStale(void* k, int64_t ts) { - streamValue* key = k; + SStreamValue* key = k; if (key->unixTimestamp < ts) { return 1; } @@ -325,6 +325,8 @@ int32_t streaValueIsStale(void* k, int64_t ts) { typedef struct { void* tableOpt; void* lru; // global or not + void* filteFactory; + } rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; @@ -334,6 +336,8 @@ typedef int (*ToStringFunc)(void* key, char* buf); typedef const char* (*CompareName)(void* statue); typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); typedef void (*DestroyFunc)(void* state); +typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest); +typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest); const char* compareDefaultName(void* name); const char* compareStateName(void* name); @@ -346,28 +350,57 @@ const char* comparePartagKeyName(void* name); void destroyFunc(void* stata) { return; } typedef struct { - const char* key; - int32_t len; - int idx; - BackendCmpFunc cmpFunc; - EncodeFunc enFunc; - DecodeFunc deFunc; - ToStringFunc toStrFunc; - CompareName cmpName; - DestroyFunc detroyFunc; + const char* key; + int32_t len; + int idx; + BackendCmpFunc cmpFunc; + EncodeFunc enFunc; + DecodeFunc deFunc; + ToStringFunc toStrFunc; + CompareName cmpName; + DestroyFunc detroyFunc; + EncodeValueFunc enValueFunc; + DecodeValueFunc deValueFunc; } SCfInit; +int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { + SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; + + char* p = taosMemoryCalloc(1, sizeof(int64_t) + sizeof(int32_t) + key.len); + char* buf = p; + int32_t len = 0; + len += taosEncodeFixedI64((void**)&buf, key.unixTimestamp); + len += taosEncodeFixedI32((void**)&buf, key.len); + len += taosEncodeBinary((void**)&buf, (char*)value, vlen); + *dest = p; + return len; +} +int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { + SStreamValue key = {0}; + char* p = value; + p = taosDecodeFixedI64(p, &key.unixTimestamp); + p = taosDecodeFixedI32(p, &key.len); + p = taosDecodeBinary(p, (void**)&(key.data), key.len); + *ttl = key.unixTimestamp; + *dest = key.data; + return key.len; +} SCfInit ginitDict[] = { {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, - destroyFunc}, - {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc}, - {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc}, + destroyFunc, encodeValueFunc, decodeValueFunc}, + {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, - compareSessionKeyName, destroyFunc}, - {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc}, - {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc}, - {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc}, + compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, + {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, }; const char* compareDefaultName(void* name) { return ginitDict[0].key; } @@ -378,6 +411,41 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } const char* compareParKeyName(void* name) { return ginitDict[5].key; } const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } +typedef struct SCompactFilteFactory { + void* status; +} SCompactFilteFactory; + +void destroyCompactFilteFactory(void* arg) { + if (arg == NULL) return; +} +const char* compactFilteFactoryName(void* arg) { + SCompactFilteFactory* state = arg; + return "stream_compact_filter"; +} + +void destroyCompactFilte(void* arg) { + if (arg == NULL) return; +} +unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, const char* val, size_t vlen, + char** newval, size_t* newvlen, unsigned char* value_changed) { + int64_t unixTime = taosGetTimestampMs(); + SStreamValue value; + memset(&value, 0, sizeof(value)); + streamValueDecode(&value, (char*)val); + taosMemoryFree(value.data); + if (value.unixTimestamp != 0 && value.unixTimestamp < unixTime) { + return 1; + } + return 0; +} +const char* compactFilteName(void* arg) { return "stream_filte"; } + +rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx) { + SCompactFilteFactory* state = arg; + rocksdb_compactionfilter_t* filter = + rocksdb_compactionfilter_create(NULL, destroyCompactFilte, compactFilte, compactFilteName); + return filter; +} int streamInitBackend(SStreamState* pState, char* path) { rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); rocksdb_env_set_low_priority_background_threads(env, 4); @@ -392,6 +460,10 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_create_missing_column_families(opts, 1); rocksdb_options_set_write_buffer_size(opts, 128 << 20); + rocksdb_compactionfilterfactory_t* factory = rocksdb_compactionfilterfactory_create( + NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); + rocksdb_options_set_compaction_filter_factory(opts, factory); + char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -411,8 +483,6 @@ int streamInitBackend(SStreamState* pState, char* path) { param[i].tableOpt = tableOpt; param[i].lru = cache; - // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); - // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); }; rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**)); @@ -437,6 +507,7 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->dbOpt = opts; pState->pTdbState->param = param; pState->pTdbState->env = env; + pState->pTdbState->compactFactory = factory; return 0; } void streamCleanBackend(SStreamState* pState) { @@ -468,11 +539,13 @@ void streamCleanBackend(SStreamState* pState) { rocksdb_cache_destroy(param[i].lru); rocksdb_block_based_options_destroy(param[i].tableOpt); + // rocksdb_compactionfilterfactory_destroy(param[i].filteFactory); } taosMemoryFreeClear(pState->pTdbState->cfOpts); taosMemoryFree(pState->pTdbState->pCompare); taosMemoryFree(pState->pTdbState->param); rocksdb_env_destroy(pState->pTdbState->env); + rocksdb_compactionfilterfactory_destroy(pState->pTdbState->compactFactory); pState->pTdbState->rocksdb = NULL; } @@ -514,31 +587,34 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamGetInit(funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)value, (size_t)vLen, &err); \ - if (err != NULL) { \ - taosMemoryFree(err); \ - qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - code = -1; \ - } else { \ - qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ - } \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamGetInit(funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \ + rocksdb_t* db = pState->pTdbState->rocksdb; \ + rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + taosMemoryFree(err); \ + qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + code = -1; \ + } else { \ + qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -565,8 +641,15 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa if (err != NULL) taosMemoryFree(err); \ code = -1; \ } else { \ - if (pVal != NULL) *pVal = val; \ - if (vLen != NULL) *vLen = len; \ + char * p = NULL, *end = NULL; \ + int64_t ttl; \ + int32_t vlen = ginitDict[i].deValueFunc(val, len, &ttl, &p); \ + if (pVal != NULL) { \ + *pVal = p; \ + } else { \ + taosMemoryFree(p); \ + } \ + if (vLen != NULL) *vLen = vlen; \ } \ if (err != NULL) { \ taosMemoryFree(err); \ @@ -606,7 +689,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, value, vLen); + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen); return code; } int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { @@ -624,7 +707,7 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, value, vLen); + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen); return code; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { @@ -661,8 +744,11 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr char buf[128] = {0}; int32_t klen = ginitDict[i].enFunc((void*)key, buf); + char* ttlV = NULL; + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; - rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, val, (size_t)vlen); + rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); + taosMemoryFree(ttlV); return 0; } @@ -844,7 +930,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { sKeyStr, sLen, eKeyStr, eLen, &err); // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, eLen); if (err != NULL) { - qWarn("failed to delete range cf(state) err: %s, start: %s, end:%s", err, toStringStart, toStringEnd); + qWarn( + "failed to delete range cf(state) err: %s, " + "start: %s, end:%s", + err, toStringStart, toStringEnd); taosMemoryFree(err); } @@ -923,7 +1012,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { - // qWarn("streamState failed to seek key prev %s", toString); + // qWarn("streamState failed to seek key prev + // %s", toString); streamStateFreeCur(pCur); return NULL; } @@ -1528,7 +1618,7 @@ int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, tbname, TSDB_TABLE_NAME_LEN); + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); return code; } int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) {