diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 4cfbf2c2fa2c3ae35053b2ad1ce5ed6aef115a42..e2944d13dadf12391fd256846ab23cf7fb11e3fc 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -16,13 +16,13 @@ #ifndef TDENGINE_STORAGEAPI_H #define TDENGINE_STORAGEAPI_H -#include "tsimplehash.h" -#include "tscalablebf.h" +#include "function.h" +#include "index.h" #include "taosdef.h" -#include "tmsg.h" #include "tcommon.h" -#include "index.h" -#include "function.h" +#include "tmsg.h" +#include "tscalablebf.h" +#include "tsimplehash.h" #ifdef __cplusplus extern "C" { @@ -46,7 +46,7 @@ typedef struct SMetaEntry { int8_t type; int8_t flags; // TODO: need refactor? tb_uid_t uid; - char * name; + char* name; union { struct { SSchemaWrapper schemaRow; @@ -57,43 +57,45 @@ typedef struct SMetaEntry { int64_t ctime; int32_t ttlDays; int32_t commentLen; - char * comment; + char* comment; tb_uid_t suid; - uint8_t *pTags; + uint8_t* pTags; } ctbEntry; struct { int64_t ctime; int32_t ttlDays; int32_t commentLen; - char * comment; + char* comment; int32_t ncid; // next column id SSchemaWrapper schemaRow; } ntbEntry; struct { - STSma *tsma; + STSma* tsma; } smaEntry; }; - uint8_t *pBuf; + uint8_t* pBuf; } SMetaEntry; typedef struct SMetaReader { - int32_t flags; - void * pMeta; - SDecoder coder; - SMetaEntry me; - void * pBuf; - int32_t szBuf; - struct SStoreMeta* pAPI; + int32_t flags; + void* pMeta; + SDecoder coder; + SMetaEntry me; + void* pBuf; + int32_t szBuf; + struct SStoreMeta* pAPI; } SMetaReader; typedef struct SMTbCursor { - void * pDbc; - void * pKey; - void * pVal; + void* pMeta; + void* pDbc; + void* pKey; + void* pVal; int32_t kLen; int32_t vLen; SMetaReader mr; + int8_t paused; } SMTbCursor; typedef struct SRowBuffPos { @@ -107,22 +109,22 @@ typedef struct SRowBuffPos { typedef struct SMetaTableInfo { int64_t suid; int64_t uid; - SSchemaWrapper *schema; + SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; } SMetaTableInfo; typedef struct SSnapContext { - SMeta * pMeta; // todo remove it - int64_t snapVersion; - void * pCur; - int64_t suid; - int8_t subType; - SHashObj * idVersion; - SHashObj * suidInfo; - SArray * idList; - int32_t index; - bool withMeta; - bool queryMeta; // true-get meta, false-get data + SMeta* pMeta; // todo remove it + int64_t snapVersion; + void* pCur; + int64_t suid; + int8_t subType; + SHashObj* idVersion; + SHashObj* suidInfo; + SArray* idList; + int32_t index; + bool withMeta; + bool queryMeta; // true-get meta, false-get data } SSnapContext; typedef struct { @@ -139,10 +141,9 @@ typedef struct { // int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); // bool tqNextBlockInWal(STqReader* pReader, const char* idstr); // bool tqNextBlockImpl(STqReader *pReader, const char* idstr); -// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); -// SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); -// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); -// int32_t destroySnapContext(SSnapContext *ctx); +// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t +// *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext +// *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx); // clang-format off /*-------------------------------------------------new api format---------------------------------------------------*/ @@ -219,16 +220,16 @@ typedef struct SStoreTqReader { bool (*tqReaderIsQueriedTable)(); bool (*tqReaderCurrentBlockConsumed)(); - struct SWalReader *(*tqReaderGetWalReader)(); // todo remove it - int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it + struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it + int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it int32_t (*tqReaderSetSubmitMsg)(); // todo remove it bool (*tqReaderNextBlockFilterOut)(); } SStoreTqReader; typedef struct SStoreSnapshotFn { - int32_t (*createSnapshot)(SSnapContext *ctx, int64_t uid); - int32_t (*destroySnapshot)(SSnapContext *ctx); + int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid); + int32_t (*destroySnapshot)(SSnapContext* ctx); SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid); } SStoreSnapshotFn; @@ -252,42 +253,54 @@ int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, in int32_t payloadLen, double selectivityRatio); tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); -int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); +int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t +payloadLen); */ typedef struct SStoreMeta { - SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor - void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor - int32_t (*cursorNext)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorNext - int32_t (*cursorPrev)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorPrev - - int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList); - int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList); - const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it - - int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid); - int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType); - int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName); - bool (*isTableExisted)(void *pVnode, tb_uid_t uid); - - int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); - int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); - - int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes); - int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio); - - void *(*storeGetIndexInfo)(); - void *(*getInvertIndex)(void* pVnode); - int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] - int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList - void *storeGetVersionRange; - void *storeGetLastTimestamp; - - int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema + SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor + void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor + void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor + void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first); // metaResumeTbCursor + int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext + int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev + + int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList); + int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList); + const void* (*extractTagVal)(const void* tag, int16_t type, STagVal* tagVal); // todo remove it + + int32_t (*getTableUidByName)(void* pVnode, char* tbName, uint64_t* uid); + int32_t (*getTableTypeByName)(void* pVnode, char* tbName, ETableType* tbType); + int32_t (*getTableNameByUid)(void* pVnode, uint64_t uid, char* tbName); + bool (*isTableExisted)(void* pVnode, tb_uid_t uid); + + int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); + int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, + int32_t payloadLen); + + int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, + bool* acquireRes); + int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, + int32_t payloadLen, double selectivityRatio); + + void* (*storeGetIndexInfo)(); + void* (*getInvertIndex)(void* pVnode); + int32_t (*getChildTableList)( + void* pVnode, int64_t suid, + SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] + int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList + void* storeGetVersionRange; + void* storeGetLastTimestamp; + + int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema // db name, vgId, numOfTables, numOfSTables - int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); - void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); + int32_t (*getNumOfChildTables)( + void* pVnode, int64_t uid, + int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); + void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables, + int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & + // metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); int64_t (*getNumOfRowsInMem)(void* pVnode); /** @@ -298,24 +311,24 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); } SStoreMeta; typedef struct SStoreMetaReader { - void (*initReader)(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); - void (*clearReader)(SMetaReader *pReader); - void (*readerReleaseLock)(SMetaReader *pReader); - int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid); - int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name); - int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); + void (*initReader)(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI); + void (*clearReader)(SMetaReader* pReader); + void (*readerReleaseLock)(SMetaReader* pReader); + int32_t (*getTableEntryByUid)(SMetaReader* pReader, tb_uid_t uid); + int32_t (*getTableEntryByName)(SMetaReader* pReader, const char* name); + int32_t (*getEntryGetUidCache)(SMetaReader* pReader, tb_uid_t uid); } SStoreMetaReader; typedef struct SUpdateInfo { - SArray *pTsBuckets; + SArray* pTsBuckets; uint64_t numBuckets; - SArray *pTsSBFs; + SArray* pTsSBFs; uint64_t numSBFs; int64_t interval; int64_t watermark; TSKEY minTS; - SScalableBf *pCloseWinSBF; - SHashObj *pMap; + SScalableBf* pCloseWinSBF; + SHashObj* pMap; uint64_t maxDataVersion; } SUpdateInfo; @@ -334,15 +347,15 @@ typedef struct SStateStore { int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal); - void (*streamStateFreeVal)(void* val); + void (*streamStateFreeVal)(void* val); int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); - bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key); + bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateClear)(SStreamState* pState); - void (*streamStateSetNumber)(SStreamState* pState, int32_t number); + void (*streamStateSetNumber)(SStreamState* pState, int32_t number); int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); @@ -353,36 +366,37 @@ typedef struct SStateStore { int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur); int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur); - SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key); - SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key); - SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key); - SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key); - void (*streamStateFreeCur)(SStreamStateCur* pCur); + SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key); + SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key); + SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key); + SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key); + void (*streamStateFreeCur)(SStreamStateCur* pCur); int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); - int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); + int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, + int32_t* pVLen); int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key); int32_t (*streamStateSessionClear)(SStreamState* pState); int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, - state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); + state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); - SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); - TSKEY (*updateInfoFillBlockData)(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); - bool (*updateInfoIsUpdated)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); - bool (*updateInfoIsTableInserted)(SUpdateInfo *pInfo, int64_t tbUid); - void (*updateInfoDestroy)(SUpdateInfo *pInfo); + SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); + TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); + bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); + bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); + void (*updateInfoDestroy)(SUpdateInfo* pInfo); - SUpdateInfo* (*updateInfoInitP)(SInterval *pInterval, int64_t watermark); - void (*updateInfoAddCloseWindowSBF)(SUpdateInfo *pInfo); - void (*updateInfoDestoryColseWinSBF)(SUpdateInfo *pInfo); - int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); - int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, SUpdateInfo *pInfo); + SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); + void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); + void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo); + int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo); + int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo); SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key); @@ -396,11 +410,11 @@ typedef struct SStateStore { bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); - void (*streamStateClose)(SStreamState* pState, bool remove); - int32_t (*streamStateBegin)(SStreamState* pState); - int32_t (*streamStateCommit)(SStreamState* pState); - void (*streamStateDestroy)(SStreamState* pState, bool remove); - int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); + void (*streamStateClose)(SStreamState* pState, bool remove); + int32_t (*streamStateBegin)(SStreamState* pState); + int32_t (*streamStateCommit)(SStreamState* pState); + void (*streamStateDestroy)(SStreamState* pState, bool remove); + int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); } SStateStore; typedef struct SStorageAPI { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0bfdf33019394cff3a02637790074ffabdb9e060..2811fc35b0af49a423679953a2df42b2d1087e0e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -109,6 +109,8 @@ typedef SVCreateTSmaReq SSmaCfg; SMTbCursor* metaOpenTbCursor(void* pVnode); void metaCloseTbCursor(SMTbCursor* pTbCur); +void metaPauseTbCursor(SMTbCursor* pTbCur); +void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first); int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType); int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index fa9eea5e29d743062b8b3fbf4b3f57c690774602..29fe89c3f29716bf49940b6bffe061e798a608b1 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -17,8 +17,8 @@ #include "osMemory.h" #include "tencode.h" -void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) { - SMeta* pMeta = ((SVnode*)pVnode)->pMeta; +void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) { + SMeta *pMeta = ((SVnode *)pVnode)->pMeta; metaReaderInit(pReader, pMeta, flags); pReader->pAPI = pAPI; } @@ -71,7 +71,7 @@ _err: } bool metaIsTableExist(void *pVnode, tb_uid_t uid) { - SVnode* pVnodeObj = pVnode; + SVnode *pVnodeObj = pVnode; metaRLock(pVnodeObj->pMeta); // query uid.idx if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) { @@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); @@ -179,7 +179,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { SMetaReader *pReader = &mr; // query name.idx - if (tdbTbGet(((SMeta*)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { + if (tdbTbGet(((SMeta *)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; metaReaderClear(&mr); return -1; @@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0); code = metaGetTableEntryByName(&mr, tbName); if (code == 0) *tbType = mr.me.type; @@ -221,12 +221,13 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) { return NULL; } - SVnode* pVnodeObj = pVnode; - metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0); - - tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); + SVnode *pVnodeObj = pVnode; + // metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0); - tdbTbcMoveToFirst((TBC *)pTbCur->pDbc); + // tdbTbcMoveToFirst((TBC *)pTbCur->pDbc); + pTbCur->pMeta = pVnodeObj->pMeta; + pTbCur->paused = 1; + metaResumeTbCursor(pTbCur, 1); return pTbCur; } @@ -234,14 +235,45 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { if (pTbCur) { tdbFree(pTbCur->pKey); tdbFree(pTbCur->pVal); - metaReaderClear(&pTbCur->mr); - if (pTbCur->pDbc) { - tdbTbcClose((TBC *)pTbCur->pDbc); + if (!pTbCur->paused) { + metaReaderClear(&pTbCur->mr); + if (pTbCur->pDbc) { + tdbTbcClose((TBC *)pTbCur->pDbc); + } } taosMemoryFree(pTbCur); } } +void metaPauseTbCursor(SMTbCursor *pTbCur) { + if (!pTbCur->paused) { + metaReaderClear(&pTbCur->mr); + tdbTbcClose((TBC *)pTbCur->pDbc); + pTbCur->paused = 1; + } +} +void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) { + if (pTbCur->paused) { + metaReaderInit(&pTbCur->mr, pTbCur->pMeta, 0); + + tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); + + if (first) { + tdbTbcMoveToFirst((TBC *)pTbCur->pDbc); + } else { + int c = 0; + tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c); + if (c < 0) { + tdbTbcMoveToPrev(pTbCur->pDbc); + } else { + tdbTbcMoveToNext(pTbCur->pDbc); + } + } + + pTbCur->paused = 0; + } +} + int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { int ret; void *pBuf; @@ -974,7 +1006,7 @@ typedef struct { } SIdxCursor; int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { - SMeta *pMeta = ((SVnode*)pVnode)->pMeta; + SMeta *pMeta = ((SVnode *)pVnode)->pMeta; SMetaFltParam *param = arg; int32_t ret = 0; @@ -1034,7 +1066,7 @@ END: } int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) { - SMeta *pMeta = ((SVnode*)pVnode)->pMeta; + SMeta *pMeta = ((SVnode *)pVnode)->pMeta; SMetaFltParam *param = arg; int32_t ret = 0; char *buf = NULL; @@ -1101,7 +1133,7 @@ END: return ret; } int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) { - SMeta *pMeta = ((SVnode*)pVnode)->pMeta; + SMeta *pMeta = ((SVnode *)pVnode)->pMeta; SMetaFltParam *param = arg; int32_t ret = 0; char *buf = NULL; @@ -1132,7 +1164,7 @@ END: return 0; } int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) { - SMeta *pMeta = ((SVnode*)pVnode)->pMeta; + SMeta *pMeta = ((SVnode *)pVnode)->pMeta; SMetaFltParam *param = arg; SMetaEntry oStbEntry = {0}; @@ -1318,7 +1350,7 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi } int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) { - SMeta* pMeta = ((SVnode*) pVnode)->pMeta; + SMeta *pMeta = ((SVnode *)pVnode)->pMeta; const int32_t LIMIT = 128; int32_t isLock = false; @@ -1350,8 +1382,8 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) { return 0; } -int32_t metaGetTableTags(void* pVnode, uint64_t suid, SArray *pUidTagInfo) { - SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode*)pVnode)->pMeta, suid, 1); +int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) { + SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode *)pVnode)->pMeta, suid, 1); // If len > 0 means there already have uids, and we only want the // tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept @@ -1456,11 +1488,11 @@ _exit: return code; } -int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t* numOfTables) { +int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) { int32_t code = 0; *numOfTables = 0; - SVnode* pVnodeObj = pVnode; + SVnode *pVnodeObj = pVnode; metaRLock(pVnodeObj->pMeta); // fast path: search cache diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 526b9b4e2db3f586f6cd8936d939e7da4c970be9..d2db6368a2d041fa25916ae8202c61e7339b6c8e 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ +#include "meta.h" #include "storageapi.h" -#include "vnodeInt.h" #include "tstreamUpdate.h" -#include "meta.h" +#include "vnodeInt.h" static void initTsdbReaderAPI(TsdReader* pReader); static void initMetadataAPI(SStoreMeta* pMeta); @@ -56,10 +56,10 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdReaderResetStatus = tsdbReaderReset; pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo; - pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away + pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away pReader->tsdSetQueryTableList = tsdbSetTableList; - pReader->tsdSetReaderTaskId = (void (*)(void *, const char *))tsdbReaderSetId; + pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId; } void initMetadataAPI(SStoreMeta* pMeta) { @@ -67,6 +67,8 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->openTableMetaCursor = metaOpenTbCursor; pMeta->closeTableMetaCursor = metaCloseTbCursor; + pMeta->pauseTableMetaCursor = metaPauseTbCursor; + pMeta->resumeTableMetaCursor = metaResumeTbCursor; pMeta->cursorNext = metaTbCursorNext; pMeta->cursorPrev = metaTbCursorPrev; @@ -78,7 +80,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->storeGetIndexInfo = vnodeGetIdx; pMeta->getInvertIndex = vnodeGetIvtIdx; - pMeta->extractTagVal = (const void *(*)(const void *, int16_t, STagVal *))metaGetTableTagVal; + pMeta->extractTagVal = (const void* (*)(const void*, int16_t, STagVal*))metaGetTableTagVal; pMeta->getTableTags = metaGetTableTags; pMeta->getTableTagsByUid = metaGetTableTagsByUids; @@ -86,7 +88,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { pMeta->getTableTypeByName = metaGetTableTypeByName; pMeta->getTableNameByUid = metaGetTableNameByUid; - pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor + pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor pMeta->storeGetTableList = vnodeGetTableList; pMeta->getCachedTableList = metaGetCachedTableUidList; @@ -106,7 +108,7 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderNextBlockInWal = tqNextBlockInWal; - pTq->tqNextBlockImpl = tqNextBlockImpl;// todo remove it + pTq->tqNextBlockImpl = tqNextBlockImpl; // todo remove it pTq->tqReaderAddTables = tqReaderAddTbUidList; pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList; @@ -116,10 +118,10 @@ void initTqAPI(SStoreTqReader* pTq) { pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable; pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed; - pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it - pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it + pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it + pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it - pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it + pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it pTq->tqGetResultBlock = tqGetResultBlock; pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; @@ -199,7 +201,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStateClose = streamStateClose; pStore->streamStateBegin = streamStateBegin; pStore->streamStateCommit = streamStateCommit; - pStore->streamStateDestroy= streamStateDestroy; + pStore->streamStateDestroy = streamStateDestroy; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; } @@ -239,4 +241,4 @@ void initSnapshotFn(SStoreSnapshotFn* pSnapshot) { pSnapshot->destroySnapshot = destroySnapContext; pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot; pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot; -} \ No newline at end of file +} diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index fd766e21f81595af7969a924493ade90011b8205..23a7d2c9e9e0f8c6415d2a8cccb53e5b7ee0cfd4 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -25,6 +25,7 @@ #include "tdatablock.h" #include "tmsg.h" +#include "index.h" #include "operator.h" #include "query.h" #include "querytask.h" @@ -32,7 +33,6 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "index.h" typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype); typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result); @@ -540,12 +540,12 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { int32_t restore = pInfo->restore; pInfo->restore = false; - + while (restore || ((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) { if (restore) { restore = false; } - + char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -626,8 +626,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { } static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SSysTableScanInfo* pInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { @@ -1100,8 +1100,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { } static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SSysTableScanInfo* pInfo = pOperator->info; @@ -1288,11 +1288,16 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + int8_t firstMetaCursor = 0; SSysTableScanInfo* pInfo = pOperator->info; if (pInfo->pCur == NULL) { pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); + firstMetaCursor = 1; + } + if (!firstMetaCursor) { + pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0); } blockDataCleanup(pInfo->pRes); @@ -1436,12 +1441,14 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { numOfRows = 0; if (pInfo->pRes->info.rows > 0) { + pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur); break; } } } if (numOfRows > 0) { + pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur); p->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; @@ -1485,7 +1492,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { } else { if (pInfo->showRewrite == false) { if (pCondition != NULL && pInfo->pIdx == NULL) { - SSTabFltArg arg = {.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; + SSTabFltArg arg = { + .pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); idx->init = 0; @@ -1827,7 +1835,7 @@ void destroySysScanOperator(void* param) { pInfo->pIdx = NULL; } - if(pInfo->pSchema) { + if (pInfo->pSchema) { taosHashCleanup(pInfo->pSchema); pInfo->pSchema = NULL; } @@ -2144,7 +2152,7 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { return -1; } -static int32_t doGetTableRowSize(SReadHandle *pHandle, uint64_t uid, int32_t* rowLen, const char* idstr) { +static int32_t doGetTableRowSize(SReadHandle* pHandle, uint64_t uid, int32_t* rowLen, const char* idstr) { *rowLen = 0; SMetaReader mr = {0}; @@ -2194,17 +2202,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { SBlockDistInfo* pBlockScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; - int32_t code = doGetTableRowSize(&pBlockScanInfo->readHandle, pBlockScanInfo->uid, - (int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); + int32_t code = doGetTableRowSize(&pBlockScanInfo->readHandle, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize, + GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t) pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle); + blockDistInfo.numOfInmemRows = (int32_t)pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle); SSDataBlock* pBlock = pBlockScanInfo->pResBlock; @@ -2289,7 +2297,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi size_t num = tableListGetSize(pTableListInfo); void* pList = tableListGetInfo(pTableListInfo, 0); - code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); + code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, + (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); cleanupQueryTableDataCond(&cond); if (code != 0) { goto _error; @@ -2316,4 +2325,4 @@ _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; -} \ No newline at end of file +}