diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 8ada79d971f93fae126eff2baa69f1170ee5b01d..0f39cf817b1bbd191d9ab49d3456be9e1bfa1c66 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -110,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); -void* streamStateCreateBatch(); -int32_t streamStateGetBatchSize(void* pBatch); -void streamStateClearBatch(void* pBatch); -void streamStateDestroyBatch(void* pBatch); -int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); -int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); - // default cf int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); @@ -136,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch); void streamStateClearBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch); int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen); + void* val, int32_t vlen, int64_t ttl); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d8524dc5d954fb6ea91da361d9f1f3585ec15b54..e34fc69de07b376bd2e08b7818bb5bde56219d97 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1979,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) { void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, - void* val, int32_t vlen) { + void* val, int32_t vlen, int64_t ttl) { int i = streamGetInit(cfName); if (i < 0) { @@ -1990,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr int32_t klen = ginitDict[i].enFunc((void*)key, buf); char* ttlV = NULL; - int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); + int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fee2f2ce5890dc4cb7c6a1ac88d571a65a6c1b2d..db530aa5a2e1585cc68c816f36f0843a4e80abbe 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -119,7 +119,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int pState->taskId = pTask->id.taskId; pState->streamId = pTask->id.streamId; #ifdef USE_ROCKSDB - qWarn("open stream state1"); + // qWarn("open stream state1"); taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); if (code == -1) { @@ -127,7 +127,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int taosMemoryFree(pState); pState = NULL; } - qWarn("open stream state2, %s", statePath); + // qWarn("open stream state2, %s", statePath); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); @@ -225,6 +225,7 @@ void streamStateClose(SStreamState* pState, bool remove) { #ifdef USE_ROCKSDB // streamStateCloseBackend(pState); streamStateDestroy(pState, remove); + taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); #else tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); @@ -236,7 +237,6 @@ void streamStateClose(SStreamState* pState, bool remove) { tdbTbClose(pState->pTdbState->pParTagDb); tdbClose(pState->pTdbState->db); #endif - taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); } int32_t streamStateBegin(SStreamState* pState) { @@ -404,7 +404,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo int32_t code = 0; void* batch = streamStateCreateBatch(); - code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); + code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0); if (code != 0) { return code; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ac2b869af30969f7e4534bb1025349ba181ed789..133b72bbe791e048283f6a55d72515017d2aebbc 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; - code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); + code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); } if (streamStateGetBatchSize(batch) > 0) { @@ -371,7 +371,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); taosMemoryFree(valBuf); } { @@ -380,7 +380,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; memcpy(keyBuf, taskKey, strlen(taskKey)); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); - code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); } streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); }