From 9f9171719a956f0816c52412b90a5eb347f336c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 15 Jun 2023 21:29:52 +0800 Subject: [PATCH] support fill history --- cmake/rocksdb_CMakeLists.txt.in | 17 +- include/libs/function/function.h | 52 +- include/libs/stream/tstream.h | 75 +-- source/dnode/mnode/impl/src/mndScheduler.c | 63 +- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/inc/streamInc.h | 3 +- source/libs/stream/src/streamBackendRocksdb.c | 574 ++++++++++-------- source/libs/stream/src/streamMeta.c | 17 +- source/libs/stream/src/streamState.c | 85 +-- 9 files changed, 505 insertions(+), 382 deletions(-) diff --git a/cmake/rocksdb_CMakeLists.txt.in b/cmake/rocksdb_CMakeLists.txt.in index ba4a404af6..af6ec3c56c 100644 --- a/cmake/rocksdb_CMakeLists.txt.in +++ b/cmake/rocksdb_CMakeLists.txt.in @@ -1,11 +1,14 @@ # rocksdb ExternalProject_Add(rocksdb - GIT_REPOSITORY https://github.com/facebook/rocksdb.git - GIT_TAG v8.1.1 - SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" - CONFIGURE_COMMAND "" - BUILD_COMMAND "" - INSTALL_COMMAND "" - TEST_COMMAND "" + URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz + URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b + DOWNLOAD_NO_PROGRESS 1 + DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download" + SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "" + TEST_COMMAND "" ) + diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c92ce254a8..8b08db8f78 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -129,30 +129,38 @@ typedef struct SSerializeDataHandle { } SSerializeDataHandle; // incremental state storage + +typedef struct SBackendWrapper { + void *rocksdb; + void **pHandle; + void *writeOpts; + void *readOpts; + void **cfOpts; + void *dbOpt; + void *param; + void *env; + SListNode *pComparNode; + void *pBackend; + void *compactFactory; + TdThreadRwlock rwLock; + bool remove; + int64_t backendId; + char idstr[64]; +} SBackendWrapper; typedef struct STdbState { - void *rocksdb; - void **pHandle; - void *writeOpts; - void *readOpts; - void **cfOpts; - void *dbOpt; + SBackendWrapper *pBackendWrapper; + int64_t backendWrapperId; + char idstr[64]; + struct SStreamTask *pOwner; - void *param; - void *env; - SListNode *pComparNode; - void *pBackend; - char idstr[64]; - void *compactFactory; - TdThreadRwlock rwLock; - - void *db; - void *pStateDb; - void *pFuncStateDb; - void *pFillStateDb; // todo refactor - void *pSessionStateDb; - void *pParNameDb; - void *pParTagDb; - void *txn; + void *db; + void *pStateDb; + void *pFuncStateDb; + void *pFillStateDb; // todo refactor + void *pSessionStateDb; + void *pParNameDb; + void *pParTagDb; + void *txn; } STdbState; typedef struct { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 09583572ed..f668588e1a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -44,8 +44,8 @@ enum { TASK_STATUS__DROPPING, TASK_STATUS__FAIL, TASK_STATUS__STOP, - TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner - TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused + TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner + TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused TASK_STATUS__PAUSE, }; @@ -266,11 +266,11 @@ typedef struct SCheckpointInfo { } SCheckpointInfo; typedef struct SStreamStatus { - int8_t taskStatus; - int8_t checkDownstream; - int8_t schedStatus; - int8_t keepTaskStatus; - bool transferState; + int8_t taskStatus; + int8_t checkDownstream; + int8_t schedStatus; + int8_t keepTaskStatus; + bool transferState; TdThreadMutex lock; } SStreamStatus; @@ -280,19 +280,19 @@ typedef struct SHistDataRange { } SHistDataRange; typedef struct SSTaskBasicInfo { - int32_t nodeId; // vgroup id or snode id - SEpSet epSet; - int32_t selfChildId; - int32_t totalLevel; - int8_t taskLevel; - int8_t fillHistory; // is fill history task or not + int32_t nodeId; // vgroup id or snode id + SEpSet epSet; + int32_t selfChildId; + int32_t totalLevel; + int8_t taskLevel; + int8_t fillHistory; // is fill history task or not } SSTaskBasicInfo; typedef struct SDispatchMsgInfo { - void* pData; // current dispatch data - int16_t msgType; // dispatch msg type - int32_t retryCount; // retry send data count - int64_t blockingTs; // output blocking timestamp + void* pData; // current dispatch data + int16_t msgType; // dispatch msg type + int32_t retryCount; // retry send data count + int64_t blockingTs; // output blocking timestamp } SDispatchMsgInfo; typedef struct { @@ -351,21 +351,22 @@ struct SStreamTask { // meta typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasks; - SArray* pTaskList; // SArray - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - SRWLatch lock; - int32_t walScanCounter; - void* streamBackend; - int64_t streamBackendRid; - SHashObj* pTaskBackendUnique; + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasks; + SArray* pTaskList; // SArray + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + SRWLatch lock; + int32_t walScanCounter; + void* streamBackend; + int64_t streamBackendRid; + SHashObj* pTaskBackendUnique; + TdThreadMutex backendMutex; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -578,16 +579,16 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); // common -int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); -int32_t streamRestoreParam(SStreamTask* pTask); -int32_t streamSetStatusNormal(SStreamTask* pTask); +int32_t streamSetParamForScanHistoryData(SStreamTask* pTask); +int32_t streamRestoreParam(SStreamTask* pTask); +int32_t streamSetStatusNormal(SStreamTask* pTask); const char* streamGetTaskStatusStr(int32_t status); // source level -int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); +int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamSourceScanHistoryData(SStreamTask* pTask); -//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); +// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 24b2a60898..f41a878b4a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,8 +25,9 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); -static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); +static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, + SVgObj* pVgroup, int32_t fillHistory); +static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { @@ -101,13 +102,13 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { return 0; } -int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) { +int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, + SStreamTask* pTask) { bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { - isShuffle = true; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; @@ -203,8 +204,8 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { // create sink node for each vgroup. int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, int32_t fillHistory) { - SSdb* pSdb = pMnode->pSdb; - void* pIter = NULL; + SSdb* pSdb = pMnode->pSdb; + void* pIter = NULL; while (1) { SVgObj* pVgroup = NULL; @@ -225,7 +226,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea return 0; } -int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) { +int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, + int32_t fillHistory) { SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -248,7 +250,7 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas // todo set the correct ts, which should be last key of queried table. pTask->dataRange.window.skey = INT64_MIN; - pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); + pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs(); mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); @@ -298,10 +300,10 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) { return TSDB_CODE_OUT_OF_MEMORY; } - if(pDownstream->pUpstreamEpInfoList == NULL) { + if (pDownstream->pUpstreamEpInfoList == NULL) { pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES); } - + taosArrayPush(pDownstream->pUpstreamEpInfoList, &pEpInfo); return TSDB_CODE_SUCCESS; } @@ -314,7 +316,7 @@ static SArray* addNewTaskList(SArray* pTasksList) { // set the history task id static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { - for(int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); SStreamTask** pHTask = taosArrayGet(pHTaskList, i); @@ -339,7 +341,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pHTaskList = addNewTaskList(pStream->pHTasksList); } - SSdb* pSdb = pMnode->pSdb; + SSdb* pSdb = pMnode->pSdb; SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); if (LIST_LENGTH(inner->pNodeList) != 1) { terrno = TSDB_CODE_QRY_INVALID_INPUT; @@ -367,8 +369,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* // new stream task SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, - 0, hasExtraSink); + int32_t code = + addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0, hasExtraSink); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); return -1; @@ -390,8 +392,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* return TSDB_CODE_SUCCESS; } -static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, SMnode* pMnode, - SSubplan* pPlan, SVgObj* pVgroup) { +static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, + SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup) { SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -400,9 +402,10 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui // todo set the correct ts, which should be last key of queried table. pTask->dataRange.window.skey = INT64_MIN; - pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); + pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs(); - mDebug("s-task:0x%x set time window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + mDebug("s-task:0x%x set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey, + pTask->dataRange.window.ekey); // all the source tasks dispatch result to a single agg node. setFixedDownstreamEpInfo(pTask, pDownstreamTask); @@ -413,8 +416,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return setEpToDownstreamTask(pTask, pDownstreamTask); } -static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, int32_t fillHistory, - SStreamTask** pAggTask) { +static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, + int32_t fillHistory, SStreamTask** pAggTask) { *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList); if (*pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -472,7 +475,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL); *pHAggTask = NULL; - code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory, pHAggTask); + code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory, + pHAggTask); if (code != TSDB_CODE_SUCCESS) { if (pSnode != NULL) { sdbRelease(pSdb, pSnode); @@ -538,7 +542,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } if (pStream->conf.fillHistory) { - code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup); + code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode, + plan, pVgroup); sdbRelease(pSdb, pVgroup); if (code != TSDB_CODE_SUCCESS) { @@ -552,7 +557,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl return TSDB_CODE_SUCCESS; } -static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, int32_t fillHistory) { +static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, + int32_t fillHistory) { SArray* pSinkTaskList = addNewTaskList(pTasksList); if (pStream->fixedSinkVgId == 0) { if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) { @@ -560,7 +566,8 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr return -1; } } else { - if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, fillHistory) < 0) { + if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, + fillHistory) < 0) { // TODO free return -1; } @@ -571,7 +578,7 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr } static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { - SSdb* pSdb = pMnode->pSdb; + SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); bool hasExtraSink = false; @@ -655,8 +662,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - }else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){ - SNode *pAst = NULL; + } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) { + SNode* pAst = NULL; if (nodesStringToNode(pTopic->ast, &pAst) != 0) { mError("topic:%s, failed to create since %s", pTopic->name, terrstr()); return -1; @@ -671,7 +678,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib nodesDestroyNode(pAst); } - if(pPlan){ + if (pPlan) { int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); if (levelNum != 1) { qDestroyQueryPlan(pPlan); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index da4e442f1a..6195200ba5 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -46,6 +46,7 @@ typedef struct { void* streamBackendInit(const char* path); void streamBackendCleanup(void* arg); +void streamBackendHandleCleanup(void* arg); SListNode* streamBackendAddCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index aeb35d9ad5..19a8044b71 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -50,11 +50,12 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet); + SEpSet* pEpSet); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); extern int32_t streamBackendId; +extern int32_t streamBackendWrapperId; #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c4e3f147e0..e5aa08edaf 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -40,16 +40,8 @@ typedef struct { rocksdb_comparator_t** pCompares; } RocksdbCfInst; -uint32_t nextPow2(uint32_t x) { - x = x - 1; - x = x | (x >> 1); - x = x | (x >> 2); - x = x | (x >> 4); - x = x | (x >> 8); - x = x | (x >> 16); - return x + 1; -} -int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); +uint32_t nextPow2(uint32_t x); +int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); void destroyRocksdbCfInst(RocksdbCfInst* inst); @@ -71,7 +63,22 @@ typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const typedef void (*DestroyFunc)(void* state); typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest); typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest); +typedef struct { + const char* key; + int32_t len; + int idx; + BackendCmpFunc cmpFunc; + EncodeFunc enFunc; + DecodeFunc deFunc; + ToStringFunc toStrFunc; + CompareName cmpName; + DestroyFunc detroyFunc; + EncodeValueFunc enValueFunc; + DecodeValueFunc deValueFunc; + +} SCfInit; +#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); const char* compareDefaultName(void* name); const char* compareStateName(void* name); const char* compareWinKeyName(void* name); @@ -80,6 +87,62 @@ const char* compareFuncKeyName(void* name); const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); +int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +int defaultKeyEncode(void* k, char* buf); +int defaultKeyDecode(void* k, char* buf); +int defaultKeyToString(void* k, char* buf); + +int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +int stateKeyEncode(void* k, char* buf); +int stateKeyDecode(void* k, char* buf); +int stateKeyToString(void* k, char* buf); + +int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +int stateSessionKeyEncode(void* ses, char* buf); +int stateSessionKeyDecode(void* ses, char* buf); +int stateSessionKeyToString(void* k, char* buf); + +int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +int winKeyEncode(void* k, char* buf); +int winKeyDecode(void* k, char* buf); +int winKeyToString(void* k, char* buf); + +int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +int tupleKeyEncode(void* k, char* buf); +int tupleKeyDecode(void* k, char* buf); +int tupleKeyToString(void* k, char* buf); + +int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen); +int parKeyEncode(void* k, char* buf); +int parKeyDecode(void* k, char* buf); +int parKeyToString(void* k, char* buf); + +int stremaValueEncode(void* k, char* buf); +int streamValueDecode(void* k, char* buf); +int32_t streamValueToString(void* k, char* buf); +int32_t streaValueIsStale(void* k, int64_t ts); +void destroyFunc(void* arg); + +int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest); +int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest); + +SCfInit ginitDict[] = { + {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, + destroyFunc, encodeValueFunc, decodeValueFunc}, + {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, + compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, + {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, + {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, + encodeValueFunc, decodeValueFunc}, +}; + void* streamBackendInit(const char* path) { qDebug("start to init stream backend at %s", path); SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); @@ -189,6 +252,69 @@ void streamBackendCleanup(void* arg) { qDebug("destroy stream backend backend:%p", pHandle); return; } +void streamBackendHandleCleanup(void* arg) { + SBackendWrapper* wrapper = arg; + bool remove = false; + + qDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); + if (wrapper->rocksdb == NULL) { + return; + } + + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + + char* err = NULL; + if (wrapper->remove) { + for (int i = 0; i < cfLen; i++) { + if (wrapper->pHandle[i] != NULL) + rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err); + if (err != NULL) { + // qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); + taosMemoryFreeClear(err); + } + } + } else { + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + for (int i = 0; i < cfLen; i++) { + if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err); + if (err != NULL) { + qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err); + taosMemoryFreeClear(err); + } + } + rocksdb_flushoptions_destroy(flushOpt); + } + + for (int i = 0; i < cfLen; i++) { + if (wrapper->pHandle[i] != NULL) { + rocksdb_column_family_handle_destroy(wrapper->pHandle[i]); + } + } + taosMemoryFreeClear(wrapper->pHandle); + for (int i = 0; i < cfLen; i++) { + rocksdb_options_destroy(wrapper->cfOpts[i]); + rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt); + } + + if (remove) { + streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode); + } + rocksdb_writeoptions_destroy(wrapper->writeOpts); + wrapper->writeOpts = NULL; + + rocksdb_readoptions_destroy(wrapper->readOpts); + wrapper->readOpts = NULL; + taosMemoryFreeClear(wrapper->cfOpts); + taosMemoryFreeClear(wrapper->param); + + taosThreadRwlockDestroy(&wrapper->rwLock); + wrapper->rocksdb = NULL; + taosReleaseRef(streamBackendId, wrapper->backendId); + + qDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr); + taosMemoryFree(wrapper); + return; +} SListNode* streamBackendAddCompare(void* backend, void* arg) { SBackendHandle* pHandle = (SBackendHandle*)backend; SListNode* node = NULL; @@ -537,23 +663,6 @@ void destroyFunc(void* arg) { return; } -typedef struct { - const char* key; - int32_t len; - int idx; - BackendCmpFunc cmpFunc; - EncodeFunc enFunc; - DecodeFunc deFunc; - ToStringFunc toStrFunc; - CompareName cmpName; - DestroyFunc detroyFunc; - EncodeValueFunc enValueFunc; - DecodeValueFunc deValueFunc; - -} SCfInit; - -#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); - int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; int32_t len = 0; @@ -608,22 +717,6 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) { } return key.len; } -SCfInit ginitDict[] = { - {"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName, - destroyFunc, encodeValueFunc, decodeValueFunc}, - {"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString, - compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc}, - {"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, - {"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc, - encodeValueFunc, decodeValueFunc}, -}; const char* compareDefaultName(void* arg) { (void)arg; @@ -816,23 +909,30 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t int streamStateOpenBackend(void* backend, SStreamState* pState) { qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); taosAcquireRef(streamBackendId, pState->streamBackendRid); - SBackendHandle* handle = backend; - - sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); + SBackendHandle* handle = backend; + SBackendWrapper* pBackendWrapper = taosMemoryCalloc(1, sizeof(SBackendWrapper)); taosThreadMutexLock(&handle->cfMutex); + RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; - pState->pTdbState->rocksdb = inst->db; - pState->pTdbState->pHandle = (void**)inst->pHandle; - pState->pTdbState->writeOpts = inst->wOpt; - pState->pTdbState->readOpts = inst->rOpt; - pState->pTdbState->cfOpts = (void**)(inst->cfOpt); - pState->pTdbState->dbOpt = handle->dbOpt; - pState->pTdbState->param = inst->param; - pState->pTdbState->pBackend = handle; - pState->pTdbState->pComparNode = inst->pCompareNode; + pBackendWrapper->rocksdb = inst->db; + pBackendWrapper->pHandle = (void**)inst->pHandle; + pBackendWrapper->writeOpts = inst->wOpt; + pBackendWrapper->readOpts = inst->rOpt; + pBackendWrapper->cfOpts = (void**)(inst->cfOpt); + pBackendWrapper->dbOpt = handle->dbOpt; + pBackendWrapper->param = inst->param; + pBackendWrapper->pBackend = handle; + pBackendWrapper->pComparNode = inst->pCompareNode; taosThreadMutexUnlock(&handle->cfMutex); + pBackendWrapper->backendId = pState->streamBackendRid; + memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); + + int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper); + pState->pTdbState->backendWrapperId = id; + pState->pTdbState->pBackendWrapper = pBackendWrapper; + qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr); return 0; } taosThreadMutexUnlock(&handle->cfMutex); @@ -865,92 +965,43 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { pCompare[i] = compare; } rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); - pState->pTdbState->rocksdb = handle->db; - pState->pTdbState->pHandle = (void**)cfHandle; - pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); - pState->pTdbState->readOpts = rocksdb_readoptions_create(); - pState->pTdbState->cfOpts = (void**)cfOpt; - pState->pTdbState->dbOpt = handle->dbOpt; - pState->pTdbState->param = param; - pState->pTdbState->pBackend = handle; - - taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL); + pBackendWrapper->rocksdb = handle->db; + pBackendWrapper->pHandle = (void**)cfHandle; + pBackendWrapper->writeOpts = rocksdb_writeoptions_create(); + pBackendWrapper->readOpts = rocksdb_readoptions_create(); + pBackendWrapper->cfOpts = (void**)cfOpt; + pBackendWrapper->dbOpt = handle->dbOpt; + pBackendWrapper->param = param; + pBackendWrapper->pBackend = handle; + pBackendWrapper->backendId = pState->streamBackendRid; + taosThreadRwlockInit(&pBackendWrapper->rwLock, NULL); SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; - pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); - rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); - qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId); + pBackendWrapper->pComparNode = streamBackendAddCompare(handle, &compare); + rocksdb_writeoptions_disable_WAL(pBackendWrapper->writeOpts, 1); + memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr)); + + int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper); + pState->pTdbState->backendWrapperId = id; + pState->pTdbState->pBackendWrapper = pBackendWrapper; + qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr); return 0; } void streamStateCloseBackend(SStreamState* pState, bool remove) { - SBackendHandle* pHandle = pState->pTdbState->pBackend; + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + SBackendHandle* pHandle = wrapper->pBackend; taosThreadMutexLock(&pHandle->cfMutex); - RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); + RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { RocksdbCfInst* inst = *ppInst; taosMemoryFree(inst); taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); } taosThreadMutexUnlock(&pHandle->cfMutex); - char* status[] = {"close", "drop"}; - qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle, - pState->streamId, pState->taskId); - if (pState->pTdbState->rocksdb == NULL) { - return; - } - - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); - - char* err = NULL; - if (remove) { - for (int i = 0; i < cfLen; i++) { - if (pState->pTdbState->pHandle[i] != NULL) - rocksdb_drop_column_family(pState->pTdbState->rocksdb, - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err); - if (err != NULL) { - qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); - taosMemoryFreeClear(err); - } - } - } else { - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - for (int i = 0; i < cfLen; i++) { - if (pState->pTdbState->pHandle[i] != NULL) - rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err); - if (err != NULL) { - qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err); - taosMemoryFreeClear(err); - } - } - rocksdb_flushoptions_destroy(flushOpt); - } - - for (int i = 0; i < cfLen; i++) { - if (pState->pTdbState->pHandle[i] != NULL) { - rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); - } - } - taosMemoryFreeClear(pState->pTdbState->pHandle); - for (int i = 0; i < cfLen; i++) { - rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); - rocksdb_block_based_options_destroy(((RocksdbCfParam*)pState->pTdbState->param)[i].tableOpt); - } - - if (remove) { - streamBackendDelCompare(pState->pTdbState->pBackend, pState->pTdbState->pComparNode); - } - rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts); - pState->pTdbState->writeOpts = NULL; - - rocksdb_readoptions_destroy(pState->pTdbState->readOpts); - pState->pTdbState->readOpts = NULL; - taosMemoryFreeClear(pState->pTdbState->cfOpts); - taosMemoryFreeClear(pState->pTdbState->param); - - taosThreadRwlockDestroy(&pState->pTdbState->rwLock); - pState->pTdbState->rocksdb = NULL; - taosReleaseRef(streamBackendId, pState->streamBackendRid); + qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, + wrapper->idstr); + taosReleaseRef(streamBackendWrapperId, pState->pTdbState->backendWrapperId); } void streamStateDestroyCompar(void* arg) { SCfComparator* comp = (SCfComparator*)arg; @@ -969,27 +1020,27 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { break; } } + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; if (pState != NULL && idx != -1) { rocksdb_column_family_handle_t* cf = NULL; - taosThreadRwlockRdlock(&pState->pTdbState->rwLock); - cf = pState->pTdbState->pHandle[idx]; - taosThreadRwlockUnlock(&pState->pTdbState->rwLock); + taosThreadRwlockRdlock(&wrapper->rwLock); + cf = wrapper->pHandle[idx]; + taosThreadRwlockUnlock(&wrapper->rwLock); if (cf == NULL) { char buf[128] = {0}; - GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key); + GEN_COLUMN_FAMILY_NAME(buf, wrapper->idstr, ginitDict[idx].key); char* err = NULL; - taosThreadRwlockWrlock(&pState->pTdbState->rwLock); - cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err); + taosThreadRwlockWrlock(&wrapper->rwLock); + cf = rocksdb_create_column_family(wrapper->rocksdb, wrapper->cfOpts[idx], buf, &err); if (err != NULL) { idx = -1; - qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId, - funcName, err); + qError("failed to to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err); taosMemoryFree(err); } else { - pState->pTdbState->pHandle[idx] = cf; + wrapper->pHandle[idx] = cf; } - taosThreadRwlockUnlock(&pState->pTdbState->rwLock); + taosThreadRwlockUnlock(&wrapper->rwLock); } } @@ -1009,8 +1060,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t** readOpt) { int idx = streamStateGetCfIdx(pState, cfName); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; if (snapshot != NULL) { - *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); + *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb); } rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); *readOpt = rOpt; @@ -1018,8 +1070,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_fill_cache(rOpt, 0); - return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]); + return rocksdb_create_iterator_cf(wrapper->rocksdb, rOpt, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]); } #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ @@ -1033,15 +1084,15 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa code = -1; \ break; \ } \ - char toString[128] = {0}; \ + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ + char toString[128] = {0}; \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = \ - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->rocksdb; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ @@ -1053,81 +1104,76 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = \ - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL || len == 0) { \ - if (err == NULL) { \ - qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ - funcname); \ - } else { \ - qError("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ - err); \ - taosMemoryFreeClear(err); \ - } \ - code = -1; \ - } else { \ - char* p = NULL; \ - int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (tlen <= 0) { \ - qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ - funcname); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ - tlen); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = tlen; \ - } \ - if (code == 0) \ - qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->rocksdb; \ + rocksdb_readoptions_t* opts = wrapper->readOpts; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL || len == 0) { \ + if (err == NULL) { \ + qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + } else { \ + qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ + qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ + code = -1; \ + } else { \ + qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = tlen; \ + } \ + if (code == 0) qDebug("streamState str: %s succ to read from %s_%s", toString, wrapper->idstr, funcname); \ } while (0); -#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ - code = -1; \ - break; \ - } \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = \ - ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ - rocksdb_t* db = pState->pTdbState->rocksdb; \ - rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \ - rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ - if (err != NULL) { \ - qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ - err); \ - taosMemoryFree(err); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \ - } \ +#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + code = -1; \ + break; \ + } \ + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \ + char toString[128] = {0}; \ + if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->rocksdb; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpts; \ + rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ + if (err != NULL) { \ + qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFree(err); \ + code = -1; \ + } else { \ + qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ + } \ } while (0); // state cf @@ -1153,18 +1199,19 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - char sKeyStr[128] = {0}; - char eKeyStr[128] = {0}; - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); - if (pState->pTdbState->pHandle[1] != NULL) { + if (wrapper->pHandle[1] != NULL) { char* err = NULL; - rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], - sKeyStr, sLen, eKeyStr, eLen, &err); + rocksdb_delete_range_cf(wrapper->rocksdb, wrapper->writeOpts, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen, + &err); if (err != NULL) { char toStringStart[128] = {0}; char toStringEnd[128] = {0}; @@ -1174,7 +1221,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) { qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); taosMemoryFree(err); } else { - rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[1], sKeyStr, sLen, eKeyStr, eLen); + rocksdb_compact_range_cf(wrapper->rocksdb, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen); } } @@ -1268,8 +1315,9 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin if (pCur == NULL) { return NULL; } + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; pCur->number = pState->number; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1302,15 +1350,16 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); - int32_t code = 0; + int32_t code = 0; + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); - char buf[128] = {0}; - int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); - + char buf[128] = {0}; + int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); @@ -1330,10 +1379,11 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateGetCur_rocksdb"); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1421,12 +1471,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); + + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } pCur->number = pState->number; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1462,11 +1514,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -1499,11 +1552,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { qDebug("streamStateSessionSeekKeyNext_rocksdb"); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); pCur->number = pState->number; @@ -1593,10 +1647,11 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; if (pCur == NULL) return NULL; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1651,12 +1706,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyNext_rocksdb"); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; } - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1687,12 +1743,13 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { qDebug("streamStateFillSeekKeyPrev_rocksdb"); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1723,12 +1780,13 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { qDebug("streamStateSessionGetKeyByRange_rocksdb"); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; } pCur->number = pState->number; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); @@ -1959,6 +2017,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co int code = 0; char* err = NULL; + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; rocksdb_snapshot_t* snapshot = NULL; rocksdb_readoptions_t* readopts = NULL; rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); @@ -1991,15 +2050,16 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co } rocksdb_iter_next(pIter); } - rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + rocksdb_release_snapshot(wrapper->rocksdb, snapshot); rocksdb_readoptions_destroy(readopts); rocksdb_iter_destroy(pIter); return code; } void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; - pCur->db = pState->pTdbState->rocksdb; + pCur->db = wrapper->rocksdb; pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); return pCur; @@ -2046,7 +2106,8 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { - int i = streamStateGetCfIdx(pState, cfName); + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + int i = streamStateGetCfIdx(pState, cfName); if (i < 0) { qError("streamState failed to put to cf name:%s", cfName); @@ -2057,7 +2118,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr char* ttlV = NULL; int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); - rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; + rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); taosMemoryFree(ttlV); return 0; @@ -2069,7 +2130,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); - rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[cfIdx].idx]; + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + + rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); if (tmpBuf == NULL) { @@ -2078,8 +2141,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb return 0; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { - char* err = NULL; - rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); + char* err = NULL; + SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; + rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); if (err != NULL) { qError("streamState failed to write batch, err:%s", err); taosMemoryFree(err); @@ -2087,3 +2151,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { } return 0; } + +uint32_t nextPow2(uint32_t x) { + x = x - 1; + x = x | (x >> 1); + x = x | (x >> 2); + x = x | (x >> 4); + x = x | (x >> 8); + x = x | (x >> 16); + return x + 1; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b7bc35d13d..5a5180b8c5 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -21,10 +21,17 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; -static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } +int32_t streamBackendWrapperId = 0; +static void streamMetaEnvInit() { + streamBackendId = taosOpenRef(64, streamBackendCleanup); + streamBackendWrapperId = taosOpenRef(64, streamBackendHandleCleanup); +} void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } -void streamMetaCleanup() { taosCloseRef(streamBackendId); } +void streamMetaCleanup() { + taosCloseRef(streamBackendId); + taosCloseRef(streamBackendWrapperId); +} SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { int32_t code = -1; @@ -90,10 +97,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->streamBackend = streamBackendInit(streamPath); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + pMeta->pTaskBackendUnique = + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); taosMemoryFree(streamPath); taosInitRWLatch(&pMeta->lock); + taosThreadMutexInit(&pMeta->backendMutex, NULL); + return pMeta; _err: @@ -136,6 +147,8 @@ void streamMetaClose(SStreamMeta* pMeta) { taosRemoveRef(streamBackendId, pMeta->streamBackendRid); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); taosMemoryFree(pMeta->path); + taosThreadMutexDestroy(&pMeta->backendMutex); + taosHashCleanup(pMeta->pTaskBackendUnique); taosMemoryFree(pMeta); } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index aeb25c2368..448d693b0a 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -116,16 +116,33 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz pState->taskId = pStreamTask->id.taskId; pState->streamId = pStreamTask->id.streamId; + sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); #ifdef USE_ROCKSDB SStreamMeta* pMeta = pStreamTask->pMeta; pState->streamBackendRid = pMeta->streamBackendRid; - int code = streamStateOpenBackend(pMeta->streamBackend, pState); - if (code == -1) { - taosReleaseRef(streamBackendId, pMeta->streamBackendRid); - taosMemoryFree(pState); - pState = NULL; + // taosWLockLatch(&pMeta->lock); + taosThreadMutexLock(&pMeta->backendMutex); + void* uniqueId = + taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); + if (uniqueId == NULL) { + int code = streamStateOpenBackend(pMeta->streamBackend, pState); + if (code == -1) { + taosReleaseRef(streamBackendId, pState->streamBackendRid); + taosThreadMutexUnlock(&pMeta->backendMutex); + taosMemoryFree(pState); + return NULL; + } + taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1, + &pState->pTdbState->backendWrapperId, sizeof(pState->pTdbState->backendWrapperId)); + } else { + int64_t id = *(int64_t*)uniqueId; + pState->pTdbState->backendWrapperId = id; + pState->pTdbState->pBackendWrapper = taosAcquireRef(streamBackendWrapperId, id); + + taosAcquireRef(streamBackendId, pState->streamBackendRid); } + taosThreadMutexUnlock(&pMeta->backendMutex); pState->pTdbState->pOwner = pTask; pState->pFileState = NULL; @@ -385,8 +402,8 @@ int32_t streamStateClear(SStreamState* pState) { streamStatePut(pState, &key, NULL, 0); while (1) { SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &key); - SWinKey delKey = {0}; - int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); + SWinKey delKey = {0}; + int32_t code = streamStateGetKVByCur(pCur, &delKey, NULL, 0); streamStateFreeCur(pCur); if (code == 0) { streamStateDel(pState, &delKey); @@ -498,7 +515,7 @@ int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** return -1; } const SStateKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -518,7 +535,7 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo return -1; } const SWinKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { return -1; } @@ -535,7 +552,7 @@ int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const v return -1; } uint64_t groupId = pKey->groupId; - int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); + int32_t code = streamStateFillGetKVByCur(pCur, pKey, pVal, pVLen); if (code == 0) { if (pKey->groupId == groupId) { return 0; @@ -553,7 +570,7 @@ int32_t streamStateGetFirst(SStreamState* pState, SWinKey* key) { SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut(pState, &tmp, NULL, 0); SStreamStateCur* pCur = streamStateSeekKeyNext(pState, &tmp); - int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); + int32_t code = streamStateGetKVByCur(pCur, key, NULL, 0); streamStateFreeCur(pCur); streamStateDel(pState, &tmp); return code; @@ -593,7 +610,7 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key } SStateKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -726,9 +743,9 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa #else SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, key); - SSessionKey resKey = *key; - void* tmp = NULL; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); + SSessionKey resKey = *key; + void* tmp = NULL; + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { if (key->win.skey != resKey.win.skey) { code = -1; @@ -767,7 +784,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -798,7 +815,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext(SStreamState* pState, cons } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -830,7 +847,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return NULL; @@ -854,7 +871,7 @@ int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, v return -1; } SStateSessionKey* pKTmp = NULL; - int32_t kLen; + int32_t kLen; if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, (const void**)pVal, pVLen) < 0) { return -1; } @@ -873,13 +890,13 @@ int32_t streamStateSessionClear(SStreamState* pState) { #ifdef USE_ROCKSDB return streamStateSessionClear_rocksdb(pState); #else - SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; + SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pState, &key); while (1) { SSessionKey delKey = {0}; - void* buf = NULL; - int32_t size = 0; - int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); + void* buf = NULL; + int32_t size = 0; + int32_t code = streamStateSessionGetKVByCur(pCur, &delKey, &buf, &size); if (code == 0 && size > 0) { memset(buf, 0, size); streamStateSessionPut(pState, &delKey, buf, size); @@ -908,14 +925,14 @@ int32_t streamStateSessionGetKeyByRange(SStreamState* pState, const SSessionKey* } SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - int32_t c = 0; + int32_t c = 0; if (tdbTbcMoveTo(pCur->pCur, &sKey, sizeof(SStateSessionKey), &c) < 0) { streamStateFreeCur(pCur); return -1; } SSessionKey resKey = *key; - int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); + int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, NULL, 0); if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) { *curKey = resKey; streamStateFreeCur(pCur); @@ -951,19 +968,19 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, return streamStateSessionAddIfNotExist_rocksdb(pState, key, gap, pVal, pVLen); #else // todo refactor - int32_t res = 0; + int32_t res = 0; SSessionKey originKey = *key; SSessionKey searchKey = *key; searchKey.win.skey = key->win.skey - gap; searchKey.win.ekey = key->win.ekey + gap; int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); @@ -1006,16 +1023,16 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch #ifdef USE_ROCKSDB return streamStateStateAddIfNotExist_rocksdb(pState, key, pKeyData, keyDataLen, fn, pVal, pVLen); #else - int32_t res = 0; + int32_t res = 0; SSessionKey tmpKey = *key; - int32_t valSize = *pVLen; - void* tmp = tdbRealloc(NULL, valSize); + int32_t valSize = *pVLen; + void* tmp = tdbRealloc(NULL, valSize); if (!tmp) { return -1; } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key); - int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); + int32_t code = streamStateSessionGetKVByCur(pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { memcpy(tmp, *pVal, valSize); @@ -1113,9 +1130,7 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { #endif } -void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { - streamFileStateReloadInfo(pState->pFileState, ts); -} +void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); } #if 0 char* streamStateSessionDump(SStreamState* pState) { -- GitLab