diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 8b08db8f788a8517783e8cc7e371cac0fbae7d3e..2e3cd670d73651494aafc148a474dc8d2f48ad3c 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -130,7 +130,7 @@ typedef struct SSerializeDataHandle { // incremental state storage -typedef struct SBackendWrapper { +typedef struct SBackendCfWrapper { void *rocksdb; void **pHandle; void *writeOpts; @@ -146,11 +146,11 @@ typedef struct SBackendWrapper { bool remove; int64_t backendId; char idstr[64]; -} SBackendWrapper; +} SBackendCfWrapper; typedef struct STdbState { - SBackendWrapper *pBackendWrapper; - int64_t backendWrapperId; - char idstr[64]; + SBackendCfWrapper *pBackendCfWrapper; + int64_t backendCfWrapperId; + char idstr[64]; struct SStreamTask *pOwner; void *db; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 6195200ba53a5c6c66526ea4693b313220797e8c..b6bc9c888bec1136f82e13d5f072c0acbed9d937 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -42,7 +42,7 @@ typedef struct { TdThreadMutex cfMutex; SHashObj* cfInst; int64_t defaultCfInit; -} SBackendHandle; +} SBackendWrapper; void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 19a8044b71ea47c219c4f077fc80c79e4e740347..eec37d7dbb6cfaa4e7e3e9cd197c297256593657 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -55,7 +55,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); extern int32_t streamBackendId; -extern int32_t streamBackendWrapperId; +extern int32_t streamBackendCfWrapperId; #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 4697c5b130f4e42c0d6f45304f621622b23e1a06..884100808f67f2c3b10665be1cd8d7c62516ed73 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -145,7 +145,7 @@ SCfInit ginitDict[] = { void* streamBackendInit(const char* path) { qDebug("start to init stream backend at %s", path); - SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); + SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); pHandle->list = tdListNew(sizeof(SCfComparator)); taosThreadMutexInit(&pHandle->mutex, NULL); taosThreadMutexInit(&pHandle->cfMutex, NULL); @@ -212,8 +212,8 @@ _EXIT: return NULL; } void streamBackendCleanup(void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)arg; - RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); + SBackendWrapper* pHandle = (SBackendWrapper*)arg; + RocksdbCfInst** pIter = (RocksdbCfInst**)taosHashIterate(pHandle->cfInst, NULL); while (pIter != NULL) { RocksdbCfInst* inst = *pIter; destroyRocksdbCfInst(inst); @@ -253,8 +253,8 @@ void streamBackendCleanup(void* arg) { return; } void streamBackendHandleCleanup(void* arg) { - SBackendWrapper* wrapper = arg; - + SBackendCfWrapper* wrapper = arg; + bool remove = wrapper->remove; qDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); if (wrapper->rocksdb == NULL) { return; @@ -263,7 +263,7 @@ void streamBackendHandleCleanup(void* arg) { int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); char* err = NULL; - if (wrapper->remove) { + if (remove) { for (int i = 0; i < cfLen; i++) { if (wrapper->pHandle[i] != NULL) rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err); @@ -295,7 +295,7 @@ void streamBackendHandleCleanup(void* arg) { rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt); } - if (wrapper->remove) { + if (remove) { streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode); } rocksdb_writeoptions_destroy(wrapper->writeOpts); @@ -315,16 +315,16 @@ void streamBackendHandleCleanup(void* arg) { return; } SListNode* streamBackendAddCompare(void* backend, void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)backend; - SListNode* node = NULL; + SBackendWrapper* pHandle = (SBackendWrapper*)backend; + SListNode* node = NULL; taosThreadMutexLock(&pHandle->mutex); node = tdListAdd(pHandle->list, arg); taosThreadMutexUnlock(&pHandle->mutex); return node; } void streamBackendDelCompare(void* backend, void* arg) { - SBackendHandle* pHandle = (SBackendHandle*)backend; - SListNode* node = NULL; + SBackendWrapper* pHandle = (SBackendWrapper*)backend; + SListNode* node = NULL; taosThreadMutexLock(&pHandle->mutex); node = tdListPopNode(pHandle->list, arg); taosThreadMutexUnlock(&pHandle->mutex); @@ -784,11 +784,11 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) { } int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) { - SBackendHandle* handle = backend; - char* err = NULL; - int64_t streamId; - int32_t taskId, dummy = 0; - char suffix[64] = {0}; + SBackendWrapper* handle = backend; + char* err = NULL; + int64_t streamId; + int32_t taskId, dummy = 0; + char suffix[64] = {0}; rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*)); RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*)); @@ -908,30 +908,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); taosAcquireRef(streamBackendId, pState->streamBackendRid); - SBackendHandle* handle = backend; - SBackendWrapper* pBackendWrapper = taosMemoryCalloc(1, sizeof(SBackendWrapper)); + SBackendWrapper* handle = backend; + SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); taosThreadMutexLock(&handle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; - pBackendWrapper->rocksdb = inst->db; - pBackendWrapper->pHandle = (void**)inst->pHandle; - pBackendWrapper->writeOpts = inst->wOpt; - pBackendWrapper->readOpts = inst->rOpt; - pBackendWrapper->cfOpts = (void**)(inst->cfOpt); - pBackendWrapper->dbOpt = handle->dbOpt; - pBackendWrapper->param = inst->param; - pBackendWrapper->pBackend = handle; - pBackendWrapper->pComparNode = inst->pCompareNode; + pBackendCfWrapper->rocksdb = inst->db; + pBackendCfWrapper->pHandle = (void**)inst->pHandle; + pBackendCfWrapper->writeOpts = inst->wOpt; + pBackendCfWrapper->readOpts = inst->rOpt; + pBackendCfWrapper->cfOpts = (void**)(inst->cfOpt); + pBackendCfWrapper->dbOpt = handle->dbOpt; + pBackendCfWrapper->param = inst->param; + pBackendCfWrapper->pBackend = handle; + pBackendCfWrapper->pComparNode = inst->pCompareNode; taosThreadMutexUnlock(&handle->cfMutex); - pBackendWrapper->backendId = pState->streamBackendRid; - memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); + pBackendCfWrapper->backendId = pState->streamBackendRid; + memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); - int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper); - pState->pTdbState->backendWrapperId = id; - pState->pTdbState->pBackendWrapper = pBackendWrapper; - qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr); + int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); + pState->pTdbState->backendCfWrapperId = id; + pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper; + qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr); return 0; } taosThreadMutexUnlock(&handle->cfMutex); @@ -964,31 +964,31 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pCompare[i] = compare; } rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); - pBackendWrapper->rocksdb = handle->db; - pBackendWrapper->pHandle = (void**)cfHandle; - pBackendWrapper->writeOpts = rocksdb_writeoptions_create(); - pBackendWrapper->readOpts = rocksdb_readoptions_create(); - pBackendWrapper->cfOpts = (void**)cfOpt; - pBackendWrapper->dbOpt = handle->dbOpt; - pBackendWrapper->param = param; - pBackendWrapper->pBackend = handle; - pBackendWrapper->backendId = pState->streamBackendRid; - taosThreadRwlockInit(&pBackendWrapper->rwLock, NULL); + pBackendCfWrapper->rocksdb = handle->db; + pBackendCfWrapper->pHandle = (void**)cfHandle; + pBackendCfWrapper->writeOpts = rocksdb_writeoptions_create(); + pBackendCfWrapper->readOpts = rocksdb_readoptions_create(); + pBackendCfWrapper->cfOpts = (void**)cfOpt; + pBackendCfWrapper->dbOpt = handle->dbOpt; + pBackendCfWrapper->param = param; + pBackendCfWrapper->pBackend = handle; + pBackendCfWrapper->backendId = pState->streamBackendRid; + taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; - pBackendWrapper->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pBackendWrapper->writeOpts, 1); - memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); - - int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper); - pState->pTdbState->backendWrapperId = id; - pState->pTdbState->pBackendWrapper = pBackendWrapper; - qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr); + pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare); + rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1); + memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); + + int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper); + pState->pTdbState->backendCfWrapperId = id; + pState->pTdbState->pBackendCfWrapper = pBackendCfWrapper; + qInfo("succ to open state %p on backendWrapper %p %s", pState, pBackendCfWrapper, pBackendCfWrapper->idstr); return 0; } void streamStateCloseBackend(SStreamState* pState, bool remove) { - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SBackendHandle* pHandle = wrapper->pBackend; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SBackendWrapper* pHandle = wrapper->pBackend; taosThreadMutexLock(&pHandle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { @@ -1001,7 +1001,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, wrapper->idstr); wrapper->remove |= remove; // update by other pState - taosReleaseRef(streamBackendWrapperId, pState->pTdbState->backendWrapperId); + taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId); } void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; @@ -1020,7 +1020,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { break; } } - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; if (pState != NULL && idx != -1) { rocksdb_column_family_handle_t* cf = NULL; taosThreadRwlockRdlock(&wrapper->rwLock); @@ -1060,7 +1060,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t** readOpt) { int idx = streamStateGetCfIdx(pState, cfName); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; if (snapshot != NULL) { *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb); } @@ -1084,8 +1084,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = -1; \ break; \ } \ - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ - char toString[128] = {0}; \ + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ @@ -1115,8 +1115,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = -1; \ break; \ } \ - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ - char toString[128] = {0}; \ + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ @@ -1159,8 +1159,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = -1; \ break; \ } \ - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ - char toString[128] = {0}; \ + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ @@ -1199,11 +1199,11 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); @@ -1315,7 +1315,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin if (pCur == NULL) { return NULL; } - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; pCur->number = pState->number; pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, @@ -1350,8 +1350,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - int32_t code = 0; - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + int32_t code = 0; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); @@ -1379,8 +1379,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; pCur->db = wrapper->rocksdb; @@ -1472,8 +1472,8 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -1514,8 +1514,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -1552,8 +1552,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -1646,8 +1646,8 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; if (pCur == NULL) return NULL; @@ -1706,8 +1706,8 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; } @@ -1743,8 +1743,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } @@ -1780,8 +1780,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; } @@ -2017,7 +2017,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co int code = 0; char* err = NULL; - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; rocksdb_snapshot_t* snapshot = NULL; rocksdb_readoptions_t* readopts = NULL; rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); @@ -2056,8 +2056,8 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { - SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, @@ -2106,8 +2106,8 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - int i = streamStateGetCfIdx(pState, cfName); + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; + int i = streamStateGetCfIdx(pState, cfName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfName); @@ -2130,7 +2130,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); @@ -2141,8 +2141,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb return 0; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { - char* err = NULL; - SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + char* err = NULL; + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { qError("streamState failed to write batch, err:%s", err); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5a5180b8c50c43f19cd5c6796c340bcfacb49ca9..4c743e46e603f8ec1cb0010f07708b14ef61e1d1 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -21,16 +21,16 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; -int32_t streamBackendWrapperId = 0; +int32_t streamBackendCfWrapperId = 0; static void streamMetaEnvInit() { streamBackendId = taosOpenRef(64, streamBackendCleanup); - streamBackendWrapperId = taosOpenRef(64, streamBackendHandleCleanup); + streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaCleanup() { taosCloseRef(streamBackendId); - taosCloseRef(streamBackendWrapperId); + taosCloseRef(streamBackendCfWrapperId); } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 448d693b0a4527350fc44617590e8f3fa6a9db01..216e3b8a11d7adb0dcab6555356aa40b3a694edb 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -134,11 +134,11 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz return NULL; } taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, - &pState->pTdbState->backendWrapperId, sizeof(pState->pTdbState->backendWrapperId)); + &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId)); } else { int64_t id = *(int64_t*)uniqueId; - pState->pTdbState->backendWrapperId = id; - pState->pTdbState->pBackendWrapper = taosAcquireRef(streamBackendWrapperId, id); + pState->pTdbState->backendCfWrapperId = id; + pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); taosAcquireRef(streamBackendId, pState->streamBackendRid); }