提交 d02746d8 编写于 作者: M Minglei Jin

fix(query/sys): pause/resume with sys scan operator

上级 dfebe434
...@@ -16,13 +16,13 @@ ...@@ -16,13 +16,13 @@
#ifndef TDENGINE_STORAGEAPI_H #ifndef TDENGINE_STORAGEAPI_H
#define TDENGINE_STORAGEAPI_H #define TDENGINE_STORAGEAPI_H
#include "tsimplehash.h" #include "function.h"
#include "tscalablebf.h" #include "index.h"
#include "taosdef.h" #include "taosdef.h"
#include "tmsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "index.h" #include "tmsg.h"
#include "function.h" #include "tscalablebf.h"
#include "tsimplehash.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -46,7 +46,7 @@ typedef struct SMetaEntry { ...@@ -46,7 +46,7 @@ typedef struct SMetaEntry {
int8_t type; int8_t type;
int8_t flags; // TODO: need refactor? int8_t flags; // TODO: need refactor?
tb_uid_t uid; tb_uid_t uid;
char * name; char* name;
union { union {
struct { struct {
SSchemaWrapper schemaRow; SSchemaWrapper schemaRow;
...@@ -57,43 +57,45 @@ typedef struct SMetaEntry { ...@@ -57,43 +57,45 @@ typedef struct SMetaEntry {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen;
char * comment; char* comment;
tb_uid_t suid; tb_uid_t suid;
uint8_t *pTags; uint8_t* pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t ctime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen;
char * comment; char* comment;
int32_t ncid; // next column id int32_t ncid; // next column id
SSchemaWrapper schemaRow; SSchemaWrapper schemaRow;
} ntbEntry; } ntbEntry;
struct { struct {
STSma *tsma; STSma* tsma;
} smaEntry; } smaEntry;
}; };
uint8_t *pBuf; uint8_t* pBuf;
} SMetaEntry; } SMetaEntry;
typedef struct SMetaReader { typedef struct SMetaReader {
int32_t flags; int32_t flags;
void * pMeta; void* pMeta;
SDecoder coder; SDecoder coder;
SMetaEntry me; SMetaEntry me;
void * pBuf; void* pBuf;
int32_t szBuf; int32_t szBuf;
struct SStoreMeta* pAPI; struct SStoreMeta* pAPI;
} SMetaReader; } SMetaReader;
typedef struct SMTbCursor { typedef struct SMTbCursor {
void * pDbc; void* pMeta;
void * pKey; void* pDbc;
void * pVal; void* pKey;
void* pVal;
int32_t kLen; int32_t kLen;
int32_t vLen; int32_t vLen;
SMetaReader mr; SMetaReader mr;
int8_t paused;
} SMTbCursor; } SMTbCursor;
typedef struct SRowBuffPos { typedef struct SRowBuffPos {
...@@ -107,22 +109,22 @@ typedef struct SRowBuffPos { ...@@ -107,22 +109,22 @@ typedef struct SRowBuffPos {
typedef struct SMetaTableInfo { typedef struct SMetaTableInfo {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
SSchemaWrapper *schema; SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN];
} SMetaTableInfo; } SMetaTableInfo;
typedef struct SSnapContext { typedef struct SSnapContext {
SMeta * pMeta; // todo remove it SMeta* pMeta; // todo remove it
int64_t snapVersion; int64_t snapVersion;
void * pCur; void* pCur;
int64_t suid; int64_t suid;
int8_t subType; int8_t subType;
SHashObj * idVersion; SHashObj* idVersion;
SHashObj * suidInfo; SHashObj* suidInfo;
SArray * idList; SArray* idList;
int32_t index; int32_t index;
bool withMeta; bool withMeta;
bool queryMeta; // true-get meta, false-get data bool queryMeta; // true-get meta, false-get data
} SSnapContext; } SSnapContext;
typedef struct { typedef struct {
...@@ -139,10 +141,9 @@ typedef struct { ...@@ -139,10 +141,9 @@ typedef struct {
// int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id); // int32_t tqReaderSeek(STqReader *pReader, int64_t ver, const char *id);
// bool tqNextBlockInWal(STqReader* pReader, const char* idstr); // bool tqNextBlockInWal(STqReader* pReader, const char* idstr);
// bool tqNextBlockImpl(STqReader *pReader, const char* idstr); // bool tqNextBlockImpl(STqReader *pReader, const char* idstr);
// int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid); // int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t
// SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); // *uid); SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx); int32_t setForSnapShot(SSnapContext
// int32_t setForSnapShot(SSnapContext *ctx, int64_t uid); // *ctx, int64_t uid); int32_t destroySnapContext(SSnapContext *ctx);
// int32_t destroySnapContext(SSnapContext *ctx);
// clang-format off // clang-format off
/*-------------------------------------------------new api format---------------------------------------------------*/ /*-------------------------------------------------new api format---------------------------------------------------*/
...@@ -219,16 +220,16 @@ typedef struct SStoreTqReader { ...@@ -219,16 +220,16 @@ typedef struct SStoreTqReader {
bool (*tqReaderIsQueriedTable)(); bool (*tqReaderIsQueriedTable)();
bool (*tqReaderCurrentBlockConsumed)(); bool (*tqReaderCurrentBlockConsumed)();
struct SWalReader *(*tqReaderGetWalReader)(); // todo remove it struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
bool (*tqReaderNextBlockFilterOut)(); bool (*tqReaderNextBlockFilterOut)();
} SStoreTqReader; } SStoreTqReader;
typedef struct SStoreSnapshotFn { typedef struct SStoreSnapshotFn {
int32_t (*createSnapshot)(SSnapContext *ctx, int64_t uid); int32_t (*createSnapshot)(SSnapContext* ctx, int64_t uid);
int32_t (*destroySnapshot)(SSnapContext *ctx); int32_t (*destroySnapshot)(SSnapContext* ctx);
SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx); SMetaTableInfo (*getMetaTableInfoFromSnapshot)(SSnapContext* ctx);
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid); int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn; } SStoreSnapshotFn;
...@@ -252,42 +253,54 @@ int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, in ...@@ -252,42 +253,54 @@ int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, in
int32_t payloadLen, double selectivityRatio); int32_t payloadLen, double selectivityRatio);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t
payloadLen);
*/ */
typedef struct SStoreMeta { typedef struct SStoreMeta {
SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
int32_t (*cursorNext)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorNext void (*pauseTableMetaCursor)(SMTbCursor* pTbCur); // metaPauseTbCursor
int32_t (*cursorPrev)(SMTbCursor *pTbCur, ETableType jumpTableType); // metaTbCursorPrev void (*resumeTableMetaCursor)(SMTbCursor* pTbCur, int8_t first); // metaResumeTbCursor
int32_t (*cursorNext)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorNext
int32_t (*getTableTags)(void *pVnode, uint64_t suid, SArray *uidList); int32_t (*cursorPrev)(SMTbCursor* pTbCur, ETableType jumpTableType); // metaTbCursorPrev
int32_t (*getTableTagsByUid)(void *pVnode, int64_t suid, SArray *uidList);
const void *(*extractTagVal)(const void *tag, int16_t type, STagVal *tagVal); // todo remove it int32_t (*getTableTags)(void* pVnode, uint64_t suid, SArray* uidList);
int32_t (*getTableTagsByUid)(void* pVnode, int64_t suid, SArray* uidList);
int32_t (*getTableUidByName)(void *pVnode, char *tbName, uint64_t *uid); const void* (*extractTagVal)(const void* tag, int16_t type, STagVal* tagVal); // todo remove it
int32_t (*getTableTypeByName)(void *pVnode, char *tbName, ETableType *tbType);
int32_t (*getTableNameByUid)(void *pVnode, uint64_t uid, char *tbName); int32_t (*getTableUidByName)(void* pVnode, char* tbName, uint64_t* uid);
bool (*isTableExisted)(void *pVnode, tb_uid_t uid); int32_t (*getTableTypeByName)(void* pVnode, char* tbName, ETableType* tbType);
int32_t (*getTableNameByUid)(void* pVnode, uint64_t uid, char* tbName);
int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); bool (*isTableExisted)(void* pVnode, tb_uid_t uid);
int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen);
int32_t (*metaGetCachedTbGroup)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes); int32_t (*metaPutTbGroupToCache)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen, double selectivityRatio); int32_t payloadLen);
void *(*storeGetIndexInfo)(); int32_t (*getCachedTableList)(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
void *(*getInvertIndex)(void* pVnode); bool* acquireRes);
int32_t (*getChildTableList)(void *pVnode, int64_t suid, SArray *list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter] int32_t (*putCachedTableList)(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList int32_t payloadLen, double selectivityRatio);
void *storeGetVersionRange;
void *storeGetLastTimestamp; void* (*storeGetIndexInfo)();
void* (*getInvertIndex)(void* pVnode);
int32_t (*getTableSchema)(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbGetTableSchema int32_t (*getChildTableList)(
void* pVnode, int64_t suid,
SArray* list); // support filter and non-filter cases. [vnodeGetCtbIdList & vnodeGetCtbIdListByFilter]
int32_t (*storeGetTableList)(void* pVnode, int8_t type, SArray* pList); // vnodeGetStbIdList & vnodeGetAllTableList
void* storeGetVersionRange;
void* storeGetLastTimestamp;
int32_t (*getTableSchema)(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tsdbGetTableSchema
// db name, vgId, numOfTables, numOfSTables // db name, vgId, numOfTables, numOfSTables
int32_t (*getNumOfChildTables)(void* pVnode, int64_t uid, int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo); int32_t (*getNumOfChildTables)(
void (*getBasicInfo)(void *pVnode, const char **dbname, int32_t *vgId, int64_t* numOfTables, int64_t* numOfNormalTables);// vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) & metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta); void* pVnode, int64_t uid,
int64_t* numOfTables); // int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo);
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode); int64_t (*getNumOfRowsInMem)(void* pVnode);
/** /**
...@@ -298,24 +311,24 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list); ...@@ -298,24 +311,24 @@ int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
} SStoreMeta; } SStoreMeta;
typedef struct SStoreMetaReader { typedef struct SStoreMetaReader {
void (*initReader)(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); void (*initReader)(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI);
void (*clearReader)(SMetaReader *pReader); void (*clearReader)(SMetaReader* pReader);
void (*readerReleaseLock)(SMetaReader *pReader); void (*readerReleaseLock)(SMetaReader* pReader);
int32_t (*getTableEntryByUid)(SMetaReader *pReader, tb_uid_t uid); int32_t (*getTableEntryByUid)(SMetaReader* pReader, tb_uid_t uid);
int32_t (*getTableEntryByName)(SMetaReader *pReader, const char *name); int32_t (*getTableEntryByName)(SMetaReader* pReader, const char* name);
int32_t (*getEntryGetUidCache)(SMetaReader *pReader, tb_uid_t uid); int32_t (*getEntryGetUidCache)(SMetaReader* pReader, tb_uid_t uid);
} SStoreMetaReader; } SStoreMetaReader;
typedef struct SUpdateInfo { typedef struct SUpdateInfo {
SArray *pTsBuckets; SArray* pTsBuckets;
uint64_t numBuckets; uint64_t numBuckets;
SArray *pTsSBFs; SArray* pTsSBFs;
uint64_t numSBFs; uint64_t numSBFs;
int64_t interval; int64_t interval;
int64_t watermark; int64_t watermark;
TSKEY minTS; TSKEY minTS;
SScalableBf *pCloseWinSBF; SScalableBf* pCloseWinSBF;
SHashObj *pMap; SHashObj* pMap;
uint64_t maxDataVersion; uint64_t maxDataVersion;
} SUpdateInfo; } SUpdateInfo;
...@@ -334,15 +347,15 @@ typedef struct SStateStore { ...@@ -334,15 +347,15 @@ typedef struct SStateStore {
int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal); int32_t (*streamStateReleaseBuf)(SStreamState* pState, const SWinKey* key, void* pVal);
void (*streamStateFreeVal)(void* val); void (*streamStateFreeVal)(void* val);
int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key); bool (*streamStateCheck)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal); int32_t (*streamStateGetByPos)(SStreamState* pState, void* pos, void** pVal);
int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateDel)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateClear)(SStreamState* pState); int32_t (*streamStateClear)(SStreamState* pState);
void (*streamStateSetNumber)(SStreamState* pState, int32_t number); void (*streamStateSetNumber)(SStreamState* pState, int32_t number);
int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen); int32_t (*streamStateSaveInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
...@@ -353,36 +366,37 @@ typedef struct SStateStore { ...@@ -353,36 +366,37 @@ typedef struct SStateStore {
int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur); int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur);
int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur); int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur);
SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key); SStreamStateCur* (*streamStateGetAndCheckCur)(SStreamState* pState, SWinKey* key);
SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key); SStreamStateCur* (*streamStateSeekKeyNext)(SStreamState* pState, const SWinKey* key);
SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key); SStreamStateCur* (*streamStateFillSeekKeyNext)(SStreamState* pState, const SWinKey* key);
SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key); SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
void (*streamStateFreeCur)(SStreamStateCur* pCur); void (*streamStateFreeCur)(SStreamStateCur* pCur);
int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen); int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen);
int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionGet)(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key); int32_t (*streamStateSessionDel)(SStreamState* pState, const SSessionKey* key);
int32_t (*streamStateSessionClear)(SStreamState* pState); int32_t (*streamStateSessionClear)(SStreamState* pState);
int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionGetKVByCur)(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t (*streamStateStateAddIfNotExist)(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
bool (*updateInfoIsUpdated)(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
bool (*updateInfoIsTableInserted)(SUpdateInfo *pInfo, int64_t tbUid); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
void (*updateInfoDestroy)(SUpdateInfo *pInfo); void (*updateInfoDestroy)(SUpdateInfo* pInfo);
SUpdateInfo* (*updateInfoInitP)(SInterval *pInterval, int64_t watermark); SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo *pInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo *pInfo); void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
int32_t (*updateInfoDeserialize)(void *buf, int32_t bufLen, SUpdateInfo *pInfo); int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key); SStreamStateCur* (*streamStateSessionSeekKeyCurrentPrev)(SStreamState* pState, const SSessionKey* key);
...@@ -396,11 +410,11 @@ typedef struct SStateStore { ...@@ -396,11 +410,11 @@ typedef struct SStateStore {
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void (*streamStateClose)(SStreamState* pState, bool remove); void (*streamStateClose)(SStreamState* pState, bool remove);
int32_t (*streamStateBegin)(SStreamState* pState); int32_t (*streamStateBegin)(SStreamState* pState);
int32_t (*streamStateCommit)(SStreamState* pState); int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove); void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark); int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
} SStateStore; } SStateStore;
typedef struct SStorageAPI { typedef struct SStorageAPI {
......
...@@ -107,10 +107,12 @@ struct SQueryNode { ...@@ -107,10 +107,12 @@ struct SQueryNode {
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg; typedef SVCreateTSmaReq SSmaCfg;
SMTbCursor *metaOpenTbCursor(void *pVnode); SMTbCursor* metaOpenTbCursor(void* pVnode);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor* pTbCur);
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType); void metaPauseTbCursor(SMTbCursor* pTbCur);
int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType); void metaResumeTbCursor(SMTbCursor* pTbCur, int8_t first);
int32_t metaTbCursorNext(SMTbCursor* pTbCur, ETableType jumpTableType);
int32_t metaTbCursorPrev(SMTbCursor* pTbCur, ETableType jumpTableType);
#endif #endif
...@@ -154,8 +156,8 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in ...@@ -154,8 +156,8 @@ int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, in
int metaGetTableEntryByName(SMetaReader* pReader, const char* name); int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
int metaAlterCache(SMeta* pMeta, int32_t nPage); int metaAlterCache(SMeta* pMeta, int32_t nPage);
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid); int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid);
int32_t metaTbGroupCacheClear(SMeta *pMeta, uint64_t suid); int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid);
int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq); int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
...@@ -175,7 +177,7 @@ void* metaGetIdx(SMeta* pMeta); ...@@ -175,7 +177,7 @@ void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
...@@ -491,7 +493,6 @@ struct SCompactInfo { ...@@ -491,7 +493,6 @@ struct SCompactInfo {
void initStorageAPI(SStorageAPI* pAPI); void initStorageAPI(SStorageAPI* pAPI);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#include "osMemory.h" #include "osMemory.h"
#include "tencode.h" #include "tencode.h"
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) { void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta; SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
metaReaderInit(pReader, pMeta, flags); metaReaderInit(pReader, pMeta, flags);
pReader->pAPI = pAPI; pReader->pAPI = pAPI;
} }
...@@ -71,7 +71,7 @@ _err: ...@@ -71,7 +71,7 @@ _err:
} }
bool metaIsTableExist(void *pVnode, tb_uid_t uid) { bool metaIsTableExist(void *pVnode, tb_uid_t uid) {
SVnode* pVnodeObj = pVnode; SVnode *pVnodeObj = pVnode;
metaRLock(pVnodeObj->pMeta); // query uid.idx metaRLock(pVnodeObj->pMeta); // query uid.idx
if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) { if (tdbTbGet(pVnodeObj->pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
...@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { ...@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int code = 0; int code = 0;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid); code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) { if (code < 0) {
metaReaderClear(&mr); metaReaderClear(&mr);
...@@ -179,7 +179,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { ...@@ -179,7 +179,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
SMetaReader *pReader = &mr; SMetaReader *pReader = &mr;
// query name.idx // query name.idx
if (tdbTbGet(((SMeta*)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { if (tdbTbGet(((SMeta *)pReader->pMeta)->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
metaReaderClear(&mr); metaReaderClear(&mr);
return -1; return -1;
...@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { ...@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) { int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
int code = 0; int code = 0;
SMetaReader mr = {0}; SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
code = metaGetTableEntryByName(&mr, tbName); code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type; if (code == 0) *tbType = mr.me.type;
...@@ -221,12 +221,13 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) { ...@@ -221,12 +221,13 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
return NULL; return NULL;
} }
SVnode* pVnodeObj = pVnode; SVnode *pVnodeObj = pVnode;
metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0); // metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc); // tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
pTbCur->pMeta = pVnodeObj->pMeta;
pTbCur->paused = 1;
metaResumeTbCursor(pTbCur, 1);
return pTbCur; return pTbCur;
} }
...@@ -234,14 +235,45 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { ...@@ -234,14 +235,45 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
if (pTbCur) { if (pTbCur) {
tdbFree(pTbCur->pKey); tdbFree(pTbCur->pKey);
tdbFree(pTbCur->pVal); tdbFree(pTbCur->pVal);
metaReaderClear(&pTbCur->mr); if (!pTbCur->paused) {
if (pTbCur->pDbc) { metaReaderClear(&pTbCur->mr);
tdbTbcClose((TBC *)pTbCur->pDbc); if (pTbCur->pDbc) {
tdbTbcClose((TBC *)pTbCur->pDbc);
}
} }
taosMemoryFree(pTbCur); taosMemoryFree(pTbCur);
} }
} }
void metaPauseTbCursor(SMTbCursor *pTbCur) {
if (!pTbCur->paused) {
metaReaderClear(&pTbCur->mr);
tdbTbcClose((TBC *)pTbCur->pDbc);
pTbCur->paused = 1;
}
}
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
if (pTbCur->paused) {
metaReaderInit(&pTbCur->mr, pTbCur->pMeta, 0);
tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
if (first) {
tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
} else {
int c = 0;
tdbTbcMoveTo(pTbCur->pDbc, pTbCur->pKey, pTbCur->kLen, &c);
if (c < 0) {
tdbTbcMoveToPrev(pTbCur->pDbc);
} else {
tdbTbcMoveToNext(pTbCur->pDbc);
}
}
pTbCur->paused = 0;
}
}
int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) { int32_t metaTbCursorNext(SMTbCursor *pTbCur, ETableType jumpTableType) {
int ret; int ret;
void *pBuf; void *pBuf;
...@@ -974,7 +1006,7 @@ typedef struct { ...@@ -974,7 +1006,7 @@ typedef struct {
} SIdxCursor; } SIdxCursor;
int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta; SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg; SMetaFltParam *param = arg;
int32_t ret = 0; int32_t ret = 0;
...@@ -1034,7 +1066,7 @@ END: ...@@ -1034,7 +1066,7 @@ END:
} }
int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) { int32_t metaFilterTableName(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta; SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg; SMetaFltParam *param = arg;
int32_t ret = 0; int32_t ret = 0;
char *buf = NULL; char *buf = NULL;
...@@ -1101,7 +1133,7 @@ END: ...@@ -1101,7 +1133,7 @@ END:
return ret; return ret;
} }
int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) { int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta; SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg; SMetaFltParam *param = arg;
int32_t ret = 0; int32_t ret = 0;
char *buf = NULL; char *buf = NULL;
...@@ -1132,7 +1164,7 @@ END: ...@@ -1132,7 +1164,7 @@ END:
return 0; return 0;
} }
int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) { int32_t metaFilterTableIds(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
SMeta *pMeta = ((SVnode*)pVnode)->pMeta; SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
SMetaFltParam *param = arg; SMetaFltParam *param = arg;
SMetaEntry oStbEntry = {0}; SMetaEntry oStbEntry = {0};
...@@ -1318,7 +1350,7 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi ...@@ -1318,7 +1350,7 @@ static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, voi
} }
int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) { int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
SMeta* pMeta = ((SVnode*) pVnode)->pMeta; SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
const int32_t LIMIT = 128; const int32_t LIMIT = 128;
int32_t isLock = false; int32_t isLock = false;
...@@ -1350,8 +1382,8 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) { ...@@ -1350,8 +1382,8 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
return 0; return 0;
} }
int32_t metaGetTableTags(void* pVnode, uint64_t suid, SArray *pUidTagInfo) { int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode*)pVnode)->pMeta, suid, 1); SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode *)pVnode)->pMeta, suid, 1);
// If len > 0 means there already have uids, and we only want the // If len > 0 means there already have uids, and we only want the
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept // tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept
...@@ -1456,11 +1488,11 @@ _exit: ...@@ -1456,11 +1488,11 @@ _exit:
return code; return code;
} }
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t* numOfTables) { int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables) {
int32_t code = 0; int32_t code = 0;
*numOfTables = 0; *numOfTables = 0;
SVnode* pVnodeObj = pVnode; SVnode *pVnodeObj = pVnode;
metaRLock(pVnodeObj->pMeta); metaRLock(pVnodeObj->pMeta);
// fast path: search cache // fast path: search cache
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "meta.h"
#include "storageapi.h" #include "storageapi.h"
#include "vnodeInt.h"
#include "tstreamUpdate.h" #include "tstreamUpdate.h"
#include "meta.h" #include "vnodeInt.h"
static void initTsdbReaderAPI(TsdReader* pReader); static void initTsdbReaderAPI(TsdReader* pReader);
static void initMetadataAPI(SStoreMeta* pMeta); static void initMetadataAPI(SStoreMeta* pMeta);
...@@ -56,10 +56,10 @@ void initTsdbReaderAPI(TsdReader* pReader) { ...@@ -56,10 +56,10 @@ void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderResetStatus = tsdbReaderReset; pReader->tsdReaderResetStatus = tsdbReaderReset;
pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo; pReader->tsdReaderGetDataBlockDistInfo = tsdbGetFileBlocksDistInfo;
pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable; // todo this function should be moved away
pReader->tsdSetQueryTableList = tsdbSetTableList; pReader->tsdSetQueryTableList = tsdbSetTableList;
pReader->tsdSetReaderTaskId = (void (*)(void *, const char *))tsdbReaderSetId; pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId;
} }
void initMetadataAPI(SStoreMeta* pMeta) { void initMetadataAPI(SStoreMeta* pMeta) {
...@@ -67,6 +67,8 @@ void initMetadataAPI(SStoreMeta* pMeta) { ...@@ -67,6 +67,8 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->openTableMetaCursor = metaOpenTbCursor; pMeta->openTableMetaCursor = metaOpenTbCursor;
pMeta->closeTableMetaCursor = metaCloseTbCursor; pMeta->closeTableMetaCursor = metaCloseTbCursor;
pMeta->pauseTableMetaCursor = metaPauseTbCursor;
pMeta->resumeTableMetaCursor = metaResumeTbCursor;
pMeta->cursorNext = metaTbCursorNext; pMeta->cursorNext = metaTbCursorNext;
pMeta->cursorPrev = metaTbCursorPrev; pMeta->cursorPrev = metaTbCursorPrev;
...@@ -78,7 +80,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { ...@@ -78,7 +80,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->storeGetIndexInfo = vnodeGetIdx; pMeta->storeGetIndexInfo = vnodeGetIdx;
pMeta->getInvertIndex = vnodeGetIvtIdx; pMeta->getInvertIndex = vnodeGetIvtIdx;
pMeta->extractTagVal = (const void *(*)(const void *, int16_t, STagVal *))metaGetTableTagVal; pMeta->extractTagVal = (const void* (*)(const void*, int16_t, STagVal*))metaGetTableTagVal;
pMeta->getTableTags = metaGetTableTags; pMeta->getTableTags = metaGetTableTags;
pMeta->getTableTagsByUid = metaGetTableTagsByUids; pMeta->getTableTagsByUid = metaGetTableTagsByUids;
...@@ -86,7 +88,7 @@ void initMetadataAPI(SStoreMeta* pMeta) { ...@@ -86,7 +88,7 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->getTableTypeByName = metaGetTableTypeByName; pMeta->getTableTypeByName = metaGetTableTypeByName;
pMeta->getTableNameByUid = metaGetTableNameByUid; pMeta->getTableNameByUid = metaGetTableNameByUid;
pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor pMeta->getTableSchema = tsdbGetTableSchema; // todo refactor
pMeta->storeGetTableList = vnodeGetTableList; pMeta->storeGetTableList = vnodeGetTableList;
pMeta->getCachedTableList = metaGetCachedTableUidList; pMeta->getCachedTableList = metaGetCachedTableUidList;
...@@ -106,7 +108,7 @@ void initTqAPI(SStoreTqReader* pTq) { ...@@ -106,7 +108,7 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqReaderNextBlockInWal = tqNextBlockInWal; pTq->tqReaderNextBlockInWal = tqNextBlockInWal;
pTq->tqNextBlockImpl = tqNextBlockImpl;// todo remove it pTq->tqNextBlockImpl = tqNextBlockImpl; // todo remove it
pTq->tqReaderAddTables = tqReaderAddTbUidList; pTq->tqReaderAddTables = tqReaderAddTbUidList;
pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList; pTq->tqReaderSetQueryTableList = tqReaderSetTbUidList;
...@@ -116,10 +118,10 @@ void initTqAPI(SStoreTqReader* pTq) { ...@@ -116,10 +118,10 @@ void initTqAPI(SStoreTqReader* pTq) {
pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable; pTq->tqReaderIsQueriedTable = tqReaderIsQueriedTable;
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed; pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
pTq->tqGetResultBlock = tqGetResultBlock; pTq->tqGetResultBlock = tqGetResultBlock;
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut; pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
...@@ -199,7 +201,7 @@ void initStateStoreAPI(SStateStore* pStore) { ...@@ -199,7 +201,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateClose = streamStateClose; pStore->streamStateClose = streamStateClose;
pStore->streamStateBegin = streamStateBegin; pStore->streamStateBegin = streamStateBegin;
pStore->streamStateCommit = streamStateCommit; pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy= streamStateDestroy; pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint; pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
} }
...@@ -239,4 +241,4 @@ void initSnapshotFn(SStoreSnapshotFn* pSnapshot) { ...@@ -239,4 +241,4 @@ void initSnapshotFn(SStoreSnapshotFn* pSnapshot) {
pSnapshot->destroySnapshot = destroySnapContext; pSnapshot->destroySnapshot = destroySnapContext;
pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot; pSnapshot->getMetaTableInfoFromSnapshot = getMetaTableInfoFromSnapshot;
pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot; pSnapshot->getTableInfoFromSnapshot = getTableInfoFromSnapshot;
} }
\ No newline at end of file
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include "tdatablock.h" #include "tdatablock.h"
#include "tmsg.h" #include "tmsg.h"
#include "index.h"
#include "operator.h" #include "operator.h"
#include "query.h" #include "query.h"
#include "querytask.h" #include "querytask.h"
...@@ -32,7 +33,6 @@ ...@@ -32,7 +33,6 @@
#include "tcompare.h" #include "tcompare.h"
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
#include "index.h"
typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype); typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype);
typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result); typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result);
...@@ -540,12 +540,12 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { ...@@ -540,12 +540,12 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
int32_t restore = pInfo->restore; int32_t restore = pInfo->restore;
pInfo->restore = false; pInfo->restore = false;
while (restore || ((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) { while (restore || ((ret = pAPI->metaFn.cursorNext(pInfo->pCur, TSDB_TABLE_MAX)) == 0)) {
if (restore) { if (restore) {
restore = false; restore = false;
} }
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char tableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
...@@ -626,8 +626,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) { ...@@ -626,8 +626,8 @@ static SSDataBlock* sysTableScanUserCols(SOperatorInfo* pOperator) {
} }
static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
...@@ -1100,8 +1100,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { ...@@ -1100,8 +1100,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
} }
static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
...@@ -1288,11 +1288,16 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { ...@@ -1288,11 +1288,16 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int8_t firstMetaCursor = 0;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode); pInfo->pCur = pAPI->metaFn.openTableMetaCursor(pInfo->readHandle.vnode);
firstMetaCursor = 1;
}
if (!firstMetaCursor) {
pAPI->metaFn.resumeTableMetaCursor(pInfo->pCur, 0);
} }
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
...@@ -1436,12 +1441,14 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { ...@@ -1436,12 +1441,14 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
numOfRows = 0; numOfRows = 0;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
break; break;
} }
} }
} }
if (numOfRows > 0) { if (numOfRows > 0) {
pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur);
p->info.rows = numOfRows; p->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
...@@ -1485,7 +1492,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { ...@@ -1485,7 +1492,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
} else { } else {
if (pInfo->showRewrite == false) { if (pInfo->showRewrite == false) {
if (pCondition != NULL && pInfo->pIdx == NULL) { if (pCondition != NULL && pInfo->pIdx == NULL) {
SSTabFltArg arg = {.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; SSTabFltArg arg = {
.pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI};
SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex));
idx->init = 0; idx->init = 0;
...@@ -1827,7 +1835,7 @@ void destroySysScanOperator(void* param) { ...@@ -1827,7 +1835,7 @@ void destroySysScanOperator(void* param) {
pInfo->pIdx = NULL; pInfo->pIdx = NULL;
} }
if(pInfo->pSchema) { if (pInfo->pSchema) {
taosHashCleanup(pInfo->pSchema); taosHashCleanup(pInfo->pSchema);
pInfo->pSchema = NULL; pInfo->pSchema = NULL;
} }
...@@ -2144,7 +2152,7 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { ...@@ -2144,7 +2152,7 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) {
return -1; return -1;
} }
static int32_t doGetTableRowSize(SReadHandle *pHandle, uint64_t uid, int32_t* rowLen, const char* idstr) { static int32_t doGetTableRowSize(SReadHandle* pHandle, uint64_t uid, int32_t* rowLen, const char* idstr) {
*rowLen = 0; *rowLen = 0;
SMetaReader mr = {0}; SMetaReader mr = {0};
...@@ -2194,17 +2202,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { ...@@ -2194,17 +2202,17 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
SBlockDistInfo* pBlockScanInfo = pOperator->info; SBlockDistInfo* pBlockScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN};
int32_t code = doGetTableRowSize(&pBlockScanInfo->readHandle, pBlockScanInfo->uid, int32_t code = doGetTableRowSize(&pBlockScanInfo->readHandle, pBlockScanInfo->uid, (int32_t*)&blockDistInfo.rowSize,
(int32_t*)&blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); pAPI->tsdReader.tsdReaderGetDataBlockDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
blockDistInfo.numOfInmemRows = (int32_t) pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle); blockDistInfo.numOfInmemRows = (int32_t)pAPI->tsdReader.tsdReaderGetNumOfInMemRows(pBlockScanInfo->pHandle);
SSDataBlock* pBlock = pBlockScanInfo->pResBlock; SSDataBlock* pBlock = pBlockScanInfo->pResBlock;
...@@ -2289,7 +2297,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi ...@@ -2289,7 +2297,8 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo); size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0); void* pList = tableListGetInfo(pTableListInfo, 0);
code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, (void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL); code = readHandle->api.tsdReader.tsdReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock,
(void**)&pInfo->pHandle, pTaskInfo->id.str, false, NULL);
cleanupQueryTableDataCond(&cond); cleanupQueryTableDataCond(&cond);
if (code != 0) { if (code != 0) {
goto _error; goto _error;
...@@ -2316,4 +2325,4 @@ _error: ...@@ -2316,4 +2325,4 @@ _error:
taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
return NULL; return NULL;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册